You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
222 lines
9.4 KiB
222 lines
9.4 KiB
package net.yacy.search.index;
|
|
/**
|
|
* ReindexSolrBusyThread
|
|
* Copyright 2013 by Michael Peter Christen
|
|
* First released 13.05.2013 at http://yacy.net
|
|
*
|
|
* This library is free software; you can redistribute it and/or
|
|
* modify it under the terms of the GNU Lesser General Public
|
|
* License as published by the Free Software Foundation; either
|
|
* version 2.1 of the License, or (at your option) any later version.
|
|
*
|
|
* This library is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
* Lesser General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU Lesser General Public License
|
|
* along with this program in the file lgpl21.txt
|
|
* If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
|
import net.yacy.cora.federate.solr.connector.AbstractSolrConnector;
|
|
import net.yacy.cora.federate.solr.connector.SolrConnector;
|
|
import net.yacy.cora.sorting.OrderedScoreMap;
|
|
import net.yacy.cora.util.ConcurrentLog;
|
|
import net.yacy.kelondro.data.meta.URIMetadataNode;
|
|
import net.yacy.kelondro.workflow.AbstractBusyThread;
|
|
import net.yacy.search.Switchboard;
|
|
import net.yacy.search.schema.CollectionConfiguration;
|
|
|
|
import org.apache.solr.common.SolrDocument;
|
|
import org.apache.solr.common.SolrDocumentList;
|
|
import org.apache.solr.common.SolrInputDocument;
|
|
|
|
|
|
/**
|
|
* Reindex selected documents of embedded Solr index.
|
|
* As the <b>toSolrInputDocument</b> acts only on current schema fields
|
|
* this can be used to remove obsolete fields physically from index
|
|
*
|
|
* can be deployed as BusyThread which is periodically called by system allowing easy interruption
|
|
* after each reindex chunk of 100 documents.
|
|
* If queue is empty this removes itself from list of servers workerthreads list
|
|
* Process: - initialize with one or more select queries
|
|
* - deploy as BusyThread (or call job repeatedly until it returns false)
|
|
* - job reindexes on each call chunk of 100 documents
|
|
*
|
|
* The thread uses internally a score map for the reindex queries this promotes fields with a low
|
|
* number of documents to get reindexed first.
|
|
*/
|
|
public class ReindexSolrBusyThread extends AbstractBusyThread {
|
|
|
|
public final static String THREAD_NAME = "reindexSolr";
|
|
|
|
SolrConnector esc;
|
|
final CollectionConfiguration colcfg; // collection config
|
|
int processed = 0; // total number of reindexed documents
|
|
int docstoreindex = 0; // documents found to reindex for current query
|
|
Semaphore sem = new Semaphore(1);
|
|
OrderedScoreMap<String> querylist = new OrderedScoreMap<String>(null); // list of select statements to reindex with number of documents as score
|
|
String currentquery = null;
|
|
int start = 0; // startindex
|
|
int chunksize = 100; // number of documents to reindex per cycle
|
|
|
|
/**
|
|
* @param query = a solr query to select documents to reindex (like h5_txt:[* TO *])
|
|
*/
|
|
public ReindexSolrBusyThread(String query) {
|
|
super(100,0);
|
|
this.esc = Switchboard.getSwitchboard().index.fulltext().getDefaultConnector();
|
|
this.colcfg = Switchboard.getSwitchboard().index.fulltext().getDefaultConfiguration();
|
|
|
|
if (Switchboard.getSwitchboard().getThread(ReindexSolrBusyThread.THREAD_NAME) != null) {
|
|
this.interrupt(); // only one active reindex job should exist
|
|
} else {
|
|
if (query != null) {
|
|
this.querylist.set(query, 0);
|
|
}
|
|
}
|
|
setName(ReindexSolrBusyThread.THREAD_NAME);
|
|
this.setPriority(Thread.MIN_PRIORITY);
|
|
|
|
}
|
|
|
|
/**
|
|
* add a query selecting documents to reindex
|
|
*/
|
|
public void addSelectQuery(String query) {
|
|
if (query != null && !query.isEmpty()) {
|
|
querylist.set(query, 0);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* add a fieldname to select documents to reindex all documents
|
|
* containing the given fieldname are reindexed
|
|
*
|
|
* @param field a solr fieldname
|
|
*/
|
|
public void addSelectFieldname(String field) {
|
|
if (field != null && !field.isEmpty()) {
|
|
querylist.set(field + AbstractSolrConnector.CATCHALL_DTERM, 0);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* each call reindexes a chunk of 100 documents until all selected documents are reindexed
|
|
* @return false if no documents selected
|
|
*/
|
|
@Override
|
|
public boolean job() {
|
|
boolean ret = true;
|
|
if (esc != null && colcfg != null && !querylist.isEmpty()) {
|
|
|
|
if (sem.tryAcquire()) { // allow only one working cycle
|
|
try {
|
|
currentquery = querylist.keys(true).next(); // get next query with lowest number of documents found
|
|
SolrDocumentList xdocs = esc.getDocumentListByQuery(currentquery, null, start, chunksize);
|
|
|
|
if (xdocs.size() == 0) { // no documents returned = all of current query reindexed (or eventual start to large)
|
|
|
|
if (start > 0) { // if previous cycle reindexed, commit to prevent reindex of same documents
|
|
esc.commit(true);
|
|
start = 0;
|
|
} else { // if start == 0 and nothing found, query can be deleted for sure
|
|
querylist.delete(currentquery); // remove current query
|
|
}
|
|
|
|
if (chunksize < 100) { // try to increase chunksize (if reduced by freemem)
|
|
chunksize = chunksize + 10;
|
|
}
|
|
} else {
|
|
docstoreindex = (int) xdocs.getNumFound();
|
|
ConcurrentLog.info("MIGRATION-REINDEX", "reindex docs with query=" + currentquery + " found=" + docstoreindex + " start=" + start);
|
|
start = start + chunksize;
|
|
querylist.set(currentquery, docstoreindex);
|
|
for (SolrDocument doc : xdocs) {
|
|
URIMetadataNode pdoc = new URIMetadataNode(doc); // use Metadata as it verifies correct/current Doc.ID
|
|
SolrInputDocument idoc = colcfg.toSolrInputDocument(pdoc);
|
|
Switchboard.getSwitchboard().index.putDocument(idoc);
|
|
processed++;
|
|
}
|
|
if (xdocs.size() >= docstoreindex) { // number processed docs >= found docs -> end condition for this query as no more docs avail-
|
|
querylist.delete(currentquery); // 2017-02-27 added on occurence of 21 docs found 21 processed but somehow on next call 21 docs again found (some commit issue ??)
|
|
}
|
|
}
|
|
} catch (final IOException ex) {
|
|
ConcurrentLog.warn("MIGRATION-REINDEX", "remove following query from list due to error, q=" + currentquery);
|
|
querylist.delete(currentquery);
|
|
ConcurrentLog.logException(ex);
|
|
} finally {
|
|
sem.release();
|
|
}
|
|
}
|
|
} else {
|
|
ret = false;
|
|
}
|
|
|
|
if (querylist.isEmpty()) { // if all processed remove from scheduled list (and terminate thread)
|
|
Switchboard.getSwitchboard().terminateThread(ReindexSolrBusyThread.THREAD_NAME, false);
|
|
ret = false;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
|
|
@Override
|
|
public void terminate(final boolean waitFor) {
|
|
querylist.clear();
|
|
// if interrupted without finished commit to reflect latest changes
|
|
if (docstoreindex > 0 && processed > 0) {
|
|
esc.commit(true);
|
|
}
|
|
super.terminate(waitFor);
|
|
}
|
|
|
|
/**
|
|
* @return total number of processed documents
|
|
*/
|
|
public int getProcessed() {
|
|
return processed;
|
|
}
|
|
|
|
/**
|
|
* @return the currently processed Solr select query
|
|
*/
|
|
public String getCurrentQuery() {
|
|
return querylist.isEmpty() ? "" : currentquery;
|
|
}
|
|
|
|
/**
|
|
* @return copy of all Solr select queries in the queue or null if empty
|
|
*/
|
|
public OrderedScoreMap<String> getQueryList() {
|
|
return querylist;
|
|
}
|
|
|
|
/**
|
|
* @return number of currently selected (found) documents
|
|
*/
|
|
@Override
|
|
public int getJobCount() {
|
|
return docstoreindex;
|
|
}
|
|
|
|
@Override
|
|
public void freemem() {
|
|
// reduce number of docs processed in one job cycle
|
|
if (chunksize > 2) {
|
|
this.chunksize = this.chunksize / 2;
|
|
}
|
|
esc.commit(true);
|
|
start = 0;
|
|
}
|
|
|
|
}
|
|
|