|
|
|
@ -21,6 +21,7 @@
|
|
|
|
|
package net.yacy.cora.federate.solr.connector;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Collection;
|
|
|
|
|
import java.util.HashSet;
|
|
|
|
|
import java.util.Iterator;
|
|
|
|
@ -87,11 +88,33 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
SolrInputDocument doc;
|
|
|
|
|
try {
|
|
|
|
|
while ((doc = ConcurrentUpdateSolrConnector.this.updateQueue.take()) != POISON_DOCUMENT) {
|
|
|
|
|
try {
|
|
|
|
|
int getmore = ConcurrentUpdateSolrConnector.this.updateQueue.size();
|
|
|
|
|
if (getmore > 0) {
|
|
|
|
|
// accumulate a collection of documents because that is better to send at once to a remote server
|
|
|
|
|
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(getmore + 1);
|
|
|
|
|
docs.add(doc);
|
|
|
|
|
updateIdCache((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()));
|
|
|
|
|
ConcurrentUpdateSolrConnector.this.connector.add(doc);
|
|
|
|
|
} catch (final IOException e) {
|
|
|
|
|
ConcurrentLog.logException(e);
|
|
|
|
|
for (int i = 0; i < getmore; i++) {
|
|
|
|
|
SolrInputDocument d = ConcurrentUpdateSolrConnector.this.updateQueue.take();
|
|
|
|
|
if (d == POISON_DOCUMENT) break;
|
|
|
|
|
docs.add(d);
|
|
|
|
|
updateIdCache((String) d.getFieldValue(CollectionSchema.id.getSolrFieldName()));
|
|
|
|
|
}
|
|
|
|
|
//ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending " + docs.size() + " documents to solr");
|
|
|
|
|
try {
|
|
|
|
|
ConcurrentUpdateSolrConnector.this.connector.add(docs);
|
|
|
|
|
} catch (final IOException e) {
|
|
|
|
|
ConcurrentLog.logException(e);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// if there is only a single document, send this directly to solr
|
|
|
|
|
//ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending one document to solr");
|
|
|
|
|
try {
|
|
|
|
|
updateIdCache((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()));
|
|
|
|
|
ConcurrentUpdateSolrConnector.this.connector.add(doc);
|
|
|
|
|
} catch (final IOException e) {
|
|
|
|
|
ConcurrentLog.logException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (final InterruptedException e) {
|
|
|
|
|