diff --git a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java index a0daa9edf..0bded3cae 100644 --- a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java @@ -506,8 +506,11 @@ public abstract class AbstractSolrConnector implements SolrConnector { docOut.setField(CollectionSchema.id.name(), docIn.getFieldValue(CollectionSchema.id.name())); for (Entry entry: docIn.entrySet()) { if (entry.getKey().equals(CollectionSchema.id.name())) continue; + SolrInputField sif = entry.getValue(); Map partialUpdate = new HashMap<>(1); - partialUpdate.put("set", entry.getValue()); + Object value = sif.getValue(); + docOut.removeField(entry.getKey()); + partialUpdate.put("set", value); docOut.setField(entry.getKey(), partialUpdate); } return docOut; diff --git a/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java index 821a79b7d..40da08537 100644 --- a/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/EmbeddedSolrConnector.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import net.yacy.cora.federate.solr.instance.EmbeddedInstance; @@ -261,11 +262,26 @@ public class EmbeddedSolrConnector extends SolrServerConnector implements SolrCo return sdl; } + /** + * The following schemaFieldCache is a hack-patch of a Solr internal request which is really slow. + * The Solr-internal method is flexible because it may respond on a real-time schema change, but that + * effectively never happens. In our case the schema declaration against Solr never changes. + */ + private final Map schemaFieldCache = new ConcurrentHashMap<>(); + private final SchemaField getSchemaField(final String fieldName) { + SchemaField sf = schemaFieldCache.get(fieldName); + if (sf == null) { + sf = this.core.getLatestSchema().getFieldOrNull(fieldName); + schemaFieldCache.put(fieldName, sf); + } + return sf; + } + public SolrDocument doc2SolrDoc(Document doc) { SolrDocument solrDoc = new SolrDocument(); for (IndexableField field : doc) { String fieldName = field.name(); - SchemaField sf = this.core.getLatestSchema().getFieldOrNull(fieldName); + SchemaField sf = getSchemaField(fieldName); // hack-patch of this.core.getLatestSchema().getFieldOrNull(fieldName); makes it a lot faster!! Object val = null; try { FieldType ft = null; diff --git a/source/net/yacy/cora/federate/solr/connector/SolrServerConnector.java b/source/net/yacy/cora/federate/solr/connector/SolrServerConnector.java index ac8cd6d07..7cccfa9d1 100644 --- a/source/net/yacy/cora/federate/solr/connector/SolrServerConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/SolrServerConnector.java @@ -208,9 +208,9 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen @Override public void add(final SolrInputDocument solrdoc) throws IOException, SolrException { if (this.server == null) return; + if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict" synchronized (this.server) { try { - if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict" this.server.add(solrdoc, -1); } catch (final Throwable e) { clearCaches(); // prevent further OOM if this was caused by OOM @@ -245,11 +245,11 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen @Override public void add(final Collection solrdocs) throws IOException, SolrException { if (this.server == null) return; + for (SolrInputDocument solrdoc : solrdocs) { + if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict" + } synchronized (this.server) { try { - for (SolrInputDocument solrdoc : solrdocs) { - if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict" - } this.server.add(solrdocs, -1); } catch (final Throwable e) { clearCaches(); // prevent further OOM if this was caused by OOM diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index 7205229f6..ce357f192 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -1148,7 +1148,12 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri final long count = segment.fulltext().getWebgraphConnector().getCountByQuery(patchquery); int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4)); ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency); - final BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(patchquery, WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true); + final BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery( + patchquery, + WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", + 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true + // TODO: add field list and do partial updates + ); final AtomicInteger proccount = new AtomicInteger(0); Thread[] t = new Thread[concurrency]; for (final AtomicInteger i = new AtomicInteger(0); i.get() < t.length; i.incrementAndGet()) { @@ -1256,7 +1261,25 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false CollectionSchema.url_protocol_s.getSolrFieldName() + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false : null, // null sort is faster! - 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true); + 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true, + CollectionSchema.id.getSolrFieldName(), + CollectionSchema.sku.getSolrFieldName(), + CollectionSchema.harvestkey_s.getSolrFieldName(), + CollectionSchema.process_sxt.getSolrFieldName(), + CollectionSchema.canonical_equal_sku_b.getSolrFieldName(), + CollectionSchema.canonical_s.getSolrFieldName(), + CollectionSchema.exact_signature_l.getSolrFieldName(), + CollectionSchema.fuzzy_signature_l.getSolrFieldName(), + CollectionSchema.title_exact_signature_l.getSolrFieldName(), + CollectionSchema.description_exact_signature_l.getSolrFieldName(), + CollectionSchema.host_id_s.getSolrFieldName(), + CollectionSchema.host_s.getSolrFieldName(), + CollectionSchema.host_subdomain_s.getSolrFieldName(), + CollectionSchema.url_chars_i.getSolrFieldName(), + CollectionSchema.url_protocol_s.getSolrFieldName(), + CollectionSchema.httpstatus_i.getSolrFieldName(), + CollectionSchema.inboundlinkscount_i.getSolrFieldName(), + CollectionSchema.robots_i.getSolrFieldName()); final AtomicInteger proccount = new AtomicInteger(); final AtomicInteger proccount_referencechange = new AtomicInteger(); final AtomicInteger proccount_citationchange = new AtomicInteger(); @@ -1282,8 +1305,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri try { DigestURL url = new DigestURL(u, ASCII.getBytes(i)); byte[] id = url.hash(); - SolrInputDocument sid = collection.toSolrInputDocument(doc, omitFields); - + SolrInputDocument sid = new SolrInputDocument(); //collection.toSolrInputDocument(doc, omitFields); + sid.setField(CollectionSchema.id.getSolrFieldName(), i); for (Object tag: proctags) try { // switch over tag types @@ -1323,17 +1346,17 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } // all processing steps checked, remove the processing and harvesting key - sid.removeField(CollectionSchema.process_sxt.getSolrFieldName()); - sid.removeField(CollectionSchema.harvestkey_s.getSolrFieldName()); + sid.setField(CollectionSchema.process_sxt.getSolrFieldName(), null); // setting this to null will cause a removal when doing a partial update + sid.setField(CollectionSchema.harvestkey_s.getSolrFieldName(), null); // send back to index //collectionConnector.deleteById(i); - collectionConnector.add(sid); + collectionConnector.update(sid); - int thiscount = proccount.incrementAndGet(); allcount.incrementAndGet(); + long thiscount = proccount.incrementAndGet(); allcount.incrementAndGet(); if (thiscount % 100 == 0) { postprocessingActivity = "postprocessed " + thiscount + " from " + count + " collection documents; " + - (thiscount * 60000 / (System.currentTimeMillis() - start)) + " ppm; " + + (thiscount * 60000L / (System.currentTimeMillis() - start)) + " ppm; " + ((System.currentTimeMillis() - start) * (count - thiscount) / thiscount / 60000) + " minutes remaining"; ConcurrentLog.info("CollectionConfiguration", postprocessingActivity); }