From 7640834b3792bd0facffea740890ed49622cdfd2 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Wed, 26 Feb 2014 22:21:00 +0100 Subject: [PATCH] removed double concurrency to put Solr documents into the index. The writings to the solr index are also buffered in ConcurrentUpdateSolrConnector --- .../federate/solr/SchemaConfiguration.java | 2 +- source/net/yacy/peers/Protocol.java | 2 +- .../search/index/ReindexSolrBusyThread.java | 2 +- source/net/yacy/search/index/Segment.java | 29 ++----------------- 4 files changed, 6 insertions(+), 29 deletions(-) diff --git a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java index 1b2f832ab..da1a07438 100644 --- a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java +++ b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java @@ -166,7 +166,7 @@ public class SchemaConfiguration extends Configuration implements Serializable { // switch attribute in existing document SolrInputDocument sidContext = segment.fulltext().getDefaultConfiguration().toSolrInputDocument(doc); sidContext.setField(uniquefield.getSolrFieldName(), false); - segment.putDocumentInQueue(sidContext); + segment.putDocument(sidContext); changed = true; } else { sid.setField(uniquefield.getSolrFieldName(), true); diff --git a/source/net/yacy/peers/Protocol.java b/source/net/yacy/peers/Protocol.java index dcc997174..137083a0c 100644 --- a/source/net/yacy/peers/Protocol.java +++ b/source/net/yacy/peers/Protocol.java @@ -1209,7 +1209,7 @@ public final class Protocol { Network.log.info("local search (solr): localpeer sent " + container.size() + "/" + numFound + " references"); } else { for (SolrInputDocument doc: docs) { - event.query.getSegment().putDocumentInQueue(doc); + event.query.getSegment().putDocument(doc); } docs.clear(); docs = null; event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, numFound); diff --git a/source/net/yacy/search/index/ReindexSolrBusyThread.java b/source/net/yacy/search/index/ReindexSolrBusyThread.java index c176ceea0..bc1bd2f7b 100644 --- a/source/net/yacy/search/index/ReindexSolrBusyThread.java +++ b/source/net/yacy/search/index/ReindexSolrBusyThread.java @@ -133,7 +133,7 @@ import org.apache.solr.common.SolrInputDocument; for (SolrDocument doc : xdocs) { SolrInputDocument idoc = colcfg.toSolrInputDocument(doc); - Switchboard.getSwitchboard().index.putDocumentInQueue(idoc); + Switchboard.getSwitchboard().index.putDocument(idoc); processed++; } } diff --git a/source/net/yacy/search/index/Segment.java b/source/net/yacy/search/index/Segment.java index 2a282e219..e397a14f9 100644 --- a/source/net/yacy/search/index/Segment.java +++ b/source/net/yacy/search/index/Segment.java @@ -77,9 +77,7 @@ import net.yacy.kelondro.rwi.ReferenceFactory; import net.yacy.kelondro.util.Bitfield; import net.yacy.kelondro.util.ISO639; import net.yacy.kelondro.util.MemoryControl; -import net.yacy.kelondro.workflow.WorkflowProcessor; import net.yacy.repository.LoaderDispatcher; -import net.yacy.search.StorageQueueEntry; import net.yacy.search.query.SearchEvent; import net.yacy.search.schema.CollectionConfiguration; import net.yacy.search.schema.CollectionSchema; @@ -117,7 +115,6 @@ public class Segment { protected final Fulltext fulltext; protected IndexCell termIndex; protected IndexCell urlCitationIndex; - private WorkflowProcessor indexingPutDocumentProcessor; /** * create a new Segment @@ -136,16 +133,6 @@ public class Segment { this.fulltext = new Fulltext(segmentPath, archivePath, collectionConfiguration, webgraphConfiguration); this.termIndex = null; this.urlCitationIndex = null; - - this.indexingPutDocumentProcessor = new WorkflowProcessor( - "putDocument", - "solr document put queueing", - new String[] {}, - this, - "putDocument", - 30, - null, - 1); } public boolean connectedRWI() { @@ -543,7 +530,6 @@ public class Segment { } public synchronized void close() { - this.indexingPutDocumentProcessor.shutdown(); if (this.termIndex != null) this.termIndex.close(); if (this.fulltext != null) this.fulltext.close(); if (this.urlCitationIndex != null) this.urlCitationIndex.close(); @@ -607,22 +593,13 @@ public class Segment { * @param queueEntry * @throws IOException */ - public void putDocument(final StorageQueueEntry queueEntry) { + public void putDocument(final SolrInputDocument queueEntry) { try { - this.fulltext().putDocument(queueEntry.queueEntry); + this.fulltext().putDocument(queueEntry); } catch (final IOException e) { ConcurrentLog.logException(e); } } - - /** - * put a solr document into the index. This is the right point to call when enqueueing of solr document is wanted. - * This method exist to prevent that solr is filled concurrently with data which makes it fail or throw strange exceptions. - * @param queueEntry - */ - public void putDocumentInQueue(final SolrInputDocument queueEntry) { - this.indexingPutDocumentProcessor.enQueue(new StorageQueueEntry(queueEntry)); - } public SolrInputDocument storeDocument( final DigestURL url, @@ -659,7 +636,7 @@ public class Segment { // STORE TO SOLR String error = null; - this.putDocumentInQueue(vector); + this.putDocument(vector); List webgraph = vector.getWebgraphDocuments(); if (webgraph != null && webgraph.size() > 0) {