From 7640834b3792bd0facffea740890ed49622cdfd2 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Wed, 26 Feb 2014 22:21:00 +0100 Subject: [PATCH 1/3] removed double concurrency to put Solr documents into the index. The writings to the solr index are also buffered in ConcurrentUpdateSolrConnector --- .../federate/solr/SchemaConfiguration.java | 2 +- source/net/yacy/peers/Protocol.java | 2 +- .../search/index/ReindexSolrBusyThread.java | 2 +- source/net/yacy/search/index/Segment.java | 29 ++----------------- 4 files changed, 6 insertions(+), 29 deletions(-) diff --git a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java index 1b2f832ab..da1a07438 100644 --- a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java +++ b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java @@ -166,7 +166,7 @@ public class SchemaConfiguration extends Configuration implements Serializable { // switch attribute in existing document SolrInputDocument sidContext = segment.fulltext().getDefaultConfiguration().toSolrInputDocument(doc); sidContext.setField(uniquefield.getSolrFieldName(), false); - segment.putDocumentInQueue(sidContext); + segment.putDocument(sidContext); changed = true; } else { sid.setField(uniquefield.getSolrFieldName(), true); diff --git a/source/net/yacy/peers/Protocol.java b/source/net/yacy/peers/Protocol.java index dcc997174..137083a0c 100644 --- a/source/net/yacy/peers/Protocol.java +++ b/source/net/yacy/peers/Protocol.java @@ -1209,7 +1209,7 @@ public final class Protocol { Network.log.info("local search (solr): localpeer sent " + container.size() + "/" + numFound + " references"); } else { for (SolrInputDocument doc: docs) { - event.query.getSegment().putDocumentInQueue(doc); + event.query.getSegment().putDocument(doc); } docs.clear(); docs = null; event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, numFound); diff --git a/source/net/yacy/search/index/ReindexSolrBusyThread.java b/source/net/yacy/search/index/ReindexSolrBusyThread.java index c176ceea0..bc1bd2f7b 100644 --- a/source/net/yacy/search/index/ReindexSolrBusyThread.java +++ b/source/net/yacy/search/index/ReindexSolrBusyThread.java @@ -133,7 +133,7 @@ import org.apache.solr.common.SolrInputDocument; for (SolrDocument doc : xdocs) { SolrInputDocument idoc = colcfg.toSolrInputDocument(doc); - Switchboard.getSwitchboard().index.putDocumentInQueue(idoc); + Switchboard.getSwitchboard().index.putDocument(idoc); processed++; } } diff --git a/source/net/yacy/search/index/Segment.java b/source/net/yacy/search/index/Segment.java index 2a282e219..e397a14f9 100644 --- a/source/net/yacy/search/index/Segment.java +++ b/source/net/yacy/search/index/Segment.java @@ -77,9 +77,7 @@ import net.yacy.kelondro.rwi.ReferenceFactory; import net.yacy.kelondro.util.Bitfield; import net.yacy.kelondro.util.ISO639; import net.yacy.kelondro.util.MemoryControl; -import net.yacy.kelondro.workflow.WorkflowProcessor; import net.yacy.repository.LoaderDispatcher; -import net.yacy.search.StorageQueueEntry; import net.yacy.search.query.SearchEvent; import net.yacy.search.schema.CollectionConfiguration; import net.yacy.search.schema.CollectionSchema; @@ -117,7 +115,6 @@ public class Segment { protected final Fulltext fulltext; protected IndexCell termIndex; protected IndexCell urlCitationIndex; - private WorkflowProcessor indexingPutDocumentProcessor; /** * create a new Segment @@ -136,16 +133,6 @@ public class Segment { this.fulltext = new Fulltext(segmentPath, archivePath, collectionConfiguration, webgraphConfiguration); this.termIndex = null; this.urlCitationIndex = null; - - this.indexingPutDocumentProcessor = new WorkflowProcessor( - "putDocument", - "solr document put queueing", - new String[] {}, - this, - "putDocument", - 30, - null, - 1); } public boolean connectedRWI() { @@ -543,7 +530,6 @@ public class Segment { } public synchronized void close() { - this.indexingPutDocumentProcessor.shutdown(); if (this.termIndex != null) this.termIndex.close(); if (this.fulltext != null) this.fulltext.close(); if (this.urlCitationIndex != null) this.urlCitationIndex.close(); @@ -607,22 +593,13 @@ public class Segment { * @param queueEntry * @throws IOException */ - public void putDocument(final StorageQueueEntry queueEntry) { + public void putDocument(final SolrInputDocument queueEntry) { try { - this.fulltext().putDocument(queueEntry.queueEntry); + this.fulltext().putDocument(queueEntry); } catch (final IOException e) { ConcurrentLog.logException(e); } } - - /** - * put a solr document into the index. This is the right point to call when enqueueing of solr document is wanted. - * This method exist to prevent that solr is filled concurrently with data which makes it fail or throw strange exceptions. - * @param queueEntry - */ - public void putDocumentInQueue(final SolrInputDocument queueEntry) { - this.indexingPutDocumentProcessor.enQueue(new StorageQueueEntry(queueEntry)); - } public SolrInputDocument storeDocument( final DigestURL url, @@ -659,7 +636,7 @@ public class Segment { // STORE TO SOLR String error = null; - this.putDocumentInQueue(vector); + this.putDocument(vector); List webgraph = vector.getWebgraphDocuments(); if (webgraph != null && webgraph.size() > 0) { From 90b47e83e65e9572c68ac2c44af488ccfbc5aef9 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Wed, 26 Feb 2014 22:47:16 +0100 Subject: [PATCH 2/3] fixed shutdown error when closing solr connectors --- .../federate/solr/instance/InstanceMirror.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java b/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java index 3642fa145..0a5955c5d 100644 --- a/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java +++ b/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java @@ -62,10 +62,13 @@ public class InstanceMirror { public void disconnectEmbedded() { if (this.embeddedSolrInstance == null) return; + for (EmbeddedSolrConnector connector: this.embeddedConnectorCache.values()) { + this.mirrorConnectorCache.values().remove(connector); + connector.close(); + } + this.embeddedConnectorCache.clear(); for (SolrConnector connector: this.mirrorConnectorCache.values()) connector.close(); this.mirrorConnectorCache.clear(); - for (EmbeddedSolrConnector connector: this.embeddedConnectorCache.values()) connector.close(); - this.embeddedConnectorCache.clear(); this.embeddedSolrInstance.close(); this.embeddedSolrInstance = null; } @@ -85,10 +88,13 @@ public class InstanceMirror { public void disconnectRemote() { if (this.remoteSolrInstance == null) return; + for (RemoteSolrConnector connector: this.remoteConnectorCache.values()) { + this.mirrorConnectorCache.values().remove(connector); + connector.close(); + } + this.remoteConnectorCache.clear(); for (SolrConnector connector: this.mirrorConnectorCache.values()) connector.close(); this.mirrorConnectorCache.clear(); - for (RemoteSolrConnector connector: this.remoteConnectorCache.values()) connector.close(); - this.remoteConnectorCache.clear(); this.remoteSolrInstance.close(); this.remoteSolrInstance = null; } From e1bf65c892e7baf6a5d3ab4529c867057ce17a6a Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Wed, 26 Feb 2014 23:02:56 +0100 Subject: [PATCH 3/3] added short memory protection during postprocessing --- .../yacy/search/schema/CollectionConfiguration.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index 85451fbe9..0a518bfee 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -79,6 +79,7 @@ import net.yacy.kelondro.data.meta.URIMetadataRow; import net.yacy.kelondro.index.RowHandleMap; import net.yacy.kelondro.rwi.ReferenceContainer; import net.yacy.kelondro.util.Bitfield; +import net.yacy.kelondro.util.MemoryControl; import net.yacy.search.index.Segment; import net.yacy.search.index.Segment.ClickdepthCache; import net.yacy.search.index.Segment.ReferenceReport; @@ -976,6 +977,10 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } } patchquerycountcheck++; + if (MemoryControl.shortStatus()) { + ConcurrentLog.warn("CollectionConfiguration", "terminated canonical collection during postprocessing because of short memory"); + break; + } } } catch (InterruptedException e) { ConcurrentLog.logException(e); @@ -992,12 +997,20 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri while (convergence_attempts++ < 30) { ConcurrentLog.info("CollectionConfiguration", "convergence step " + convergence_attempts + " for host " + host + " ..."); if (crh.convergenceStep()) break; + if (MemoryControl.shortStatus()) { + ConcurrentLog.warn("CollectionConfiguration", "terminated convergenceStep during postprocessing because of short memory"); + break; + } } ConcurrentLog.info("CollectionConfiguration", "convergence for host " + host + " after " + convergence_attempts + " steps"); // we have now the cr for all documents of a specific host; we store them for later use Map crn = crh.normalize(); //crh.log(crn); ranking.putAll(crn); // accumulate this here for usage in document update later + if (MemoryControl.shortStatus()) { + ConcurrentLog.warn("CollectionConfiguration", "terminated crn akkumulation during postprocessing because of short memory"); + break; + } countcheck++; } if (hostscore.size() != countcheck) ConcurrentLog.warn("CollectionConfiguration", "ambiguous host count: expected=" + hostscore.size() + ", counted=" + countcheck);