From d80418f1b120dbc32444f67113884b4de44215b8 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Tue, 14 Oct 2014 12:19:59 +0200 Subject: [PATCH] added partial updates to solr during postprocessing: during postprocessing the solr documents are now not completely retrieved. instead, only fiels, needed for the postprocessing are extracted. When Solr document are written, this is done using partial updates. This increases postprocessing speed by about 50% for embedded Solr configurations. For external Solr configurations the enhancement should be much higher because the postprocessing with remote Solr is very slow. When doing partial updates to a remote Solr, this method should perform much better than before, it is expected that this is even much higher than the increase with local Solr. --- .../solr/connector/AbstractSolrConnector.java | 5 ++- .../solr/connector/EmbeddedSolrConnector.java | 18 +++++++- .../solr/connector/SolrServerConnector.java | 8 ++-- .../schema/CollectionConfiguration.java | 41 +++++++++++++++---- 4 files changed, 57 insertions(+), 15 deletions(-) diff --git a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java index a0daa9edf..0bded3cae 100644 --- a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java @@ -506,8 +506,11 @@ public abstract class AbstractSolrConnector implements SolrConnector { docOut.setField(CollectionSchema.id.name(), docIn.getFieldValue(CollectionSchema.id.name())); for (Entry entry: docIn.entrySet()) { if (entry.getKey().equals(CollectionSchema.id.name())) continue; + SolrInputField sif = entry.getValue(); Map partialUpdate = new HashMap<>(1); - partialUpdate.put("set", entry.getValue()); + Object value = sif.getValue(); + docOut.removeField(entry.getKey()); + partialUpdate.put("set", value); docOut.setField(entry.getKey(), partialUpdate); } return docOut; diff --git a/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java index 821a79b7d..40da08537 100644 --- a/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import net.yacy.cora.federate.solr.instance.EmbeddedInstance; @@ -261,11 +262,26 @@ public class EmbeddedSolrConnector extends SolrServerConnector implements SolrCo return sdl; } + /** + * The following schemaFieldCache is a hack-patch of a Solr internal request which is really slow. + * The Solr-internal method is flexible because it may respond on a real-time schema change, but that + * effectively never happens. In our case the schema declaration against Solr never changes. + */ + private final Map schemaFieldCache = new ConcurrentHashMap<>(); + private final SchemaField getSchemaField(final String fieldName) { + SchemaField sf = schemaFieldCache.get(fieldName); + if (sf == null) { + sf = this.core.getLatestSchema().getFieldOrNull(fieldName); + schemaFieldCache.put(fieldName, sf); + } + return sf; + } + public SolrDocument doc2SolrDoc(Document doc) { SolrDocument solrDoc = new SolrDocument(); for (IndexableField field : doc) { String fieldName = field.name(); - SchemaField sf = this.core.getLatestSchema().getFieldOrNull(fieldName); + SchemaField sf = getSchemaField(fieldName); // hack-patch of this.core.getLatestSchema().getFieldOrNull(fieldName); makes it a lot faster!! Object val = null; try { FieldType ft = null; diff --git a/source/net/yacy/cora/federate/solr/connector/SolrServerConnector.java b/source/net/yacy/cora/federate/solr/connector/SolrServerConnector.java index ac8cd6d07..7cccfa9d1 100644 --- a/source/net/yacy/cora/federate/solr/connector/SolrServerConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/SolrServerConnector.java @@ -208,9 +208,9 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen @Override public void add(final SolrInputDocument solrdoc) throws IOException, SolrException { if (this.server == null) return; + if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict" synchronized (this.server) { try { - if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict" this.server.add(solrdoc, -1); } catch (final Throwable e) { clearCaches(); // prevent further OOM if this was caused by OOM @@ -245,11 +245,11 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen @Override public void add(final Collection solrdocs) throws IOException, SolrException { if (this.server == null) return; + for (SolrInputDocument solrdoc : solrdocs) { + if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict" + } synchronized (this.server) { try { - for (SolrInputDocument solrdoc : solrdocs) { - if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict" - } this.server.add(solrdocs, -1); } catch (final Throwable e) { clearCaches(); // prevent further OOM if this was caused by OOM diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index 7205229f6..ce357f192 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -1148,7 +1148,12 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri final long count = segment.fulltext().getWebgraphConnector().getCountByQuery(patchquery); int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4)); ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency); - final BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(patchquery, WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true); + final BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery( + patchquery, + WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", + 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true + // TODO: add field list and do partial updates + ); final AtomicInteger proccount = new AtomicInteger(0); Thread[] t = new Thread[concurrency]; for (final AtomicInteger i = new AtomicInteger(0); i.get() < t.length; i.incrementAndGet()) { @@ -1256,7 +1261,25 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false CollectionSchema.url_protocol_s.getSolrFieldName() + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false : null, // null sort is faster! - 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true); + 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true, + CollectionSchema.id.getSolrFieldName(), + CollectionSchema.sku.getSolrFieldName(), + CollectionSchema.harvestkey_s.getSolrFieldName(), + CollectionSchema.process_sxt.getSolrFieldName(), + CollectionSchema.canonical_equal_sku_b.getSolrFieldName(), + CollectionSchema.canonical_s.getSolrFieldName(), + CollectionSchema.exact_signature_l.getSolrFieldName(), + CollectionSchema.fuzzy_signature_l.getSolrFieldName(), + CollectionSchema.title_exact_signature_l.getSolrFieldName(), + CollectionSchema.description_exact_signature_l.getSolrFieldName(), + CollectionSchema.host_id_s.getSolrFieldName(), + CollectionSchema.host_s.getSolrFieldName(), + CollectionSchema.host_subdomain_s.getSolrFieldName(), + CollectionSchema.url_chars_i.getSolrFieldName(), + CollectionSchema.url_protocol_s.getSolrFieldName(), + CollectionSchema.httpstatus_i.getSolrFieldName(), + CollectionSchema.inboundlinkscount_i.getSolrFieldName(), + CollectionSchema.robots_i.getSolrFieldName()); final AtomicInteger proccount = new AtomicInteger(); final AtomicInteger proccount_referencechange = new AtomicInteger(); final AtomicInteger proccount_citationchange = new AtomicInteger(); @@ -1282,8 +1305,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri try { DigestURL url = new DigestURL(u, ASCII.getBytes(i)); byte[] id = url.hash(); - SolrInputDocument sid = collection.toSolrInputDocument(doc, omitFields); - + SolrInputDocument sid = new SolrInputDocument(); //collection.toSolrInputDocument(doc, omitFields); + sid.setField(CollectionSchema.id.getSolrFieldName(), i); for (Object tag: proctags) try { // switch over tag types @@ -1323,17 +1346,17 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } // all processing steps checked, remove the processing and harvesting key - sid.removeField(CollectionSchema.process_sxt.getSolrFieldName()); - sid.removeField(CollectionSchema.harvestkey_s.getSolrFieldName()); + sid.setField(CollectionSchema.process_sxt.getSolrFieldName(), null); // setting this to null will cause a removal when doing a partial update + sid.setField(CollectionSchema.harvestkey_s.getSolrFieldName(), null); // send back to index //collectionConnector.deleteById(i); - collectionConnector.add(sid); + collectionConnector.update(sid); - int thiscount = proccount.incrementAndGet(); allcount.incrementAndGet(); + long thiscount = proccount.incrementAndGet(); allcount.incrementAndGet(); if (thiscount % 100 == 0) { postprocessingActivity = "postprocessed " + thiscount + " from " + count + " collection documents; " + - (thiscount * 60000 / (System.currentTimeMillis() - start)) + " ppm; " + + (thiscount * 60000L / (System.currentTimeMillis() - start)) + " ppm; " + ((System.currentTimeMillis() - start) * (count - thiscount) / thiscount / 60000) + " minutes remaining"; ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); }