removed double concurrency to put Solr documents into the index. The

writings to the solr index are also buffered in
ConcurrentUpdateSolrConnector
pull/1/head
Michael Peter Christen 11 years ago
parent 99635e15b4
commit 7640834b37

@ -166,7 +166,7 @@ public class SchemaConfiguration extends Configuration implements Serializable {
// switch attribute in existing document
SolrInputDocument sidContext = segment.fulltext().getDefaultConfiguration().toSolrInputDocument(doc);
sidContext.setField(uniquefield.getSolrFieldName(), false);
segment.putDocumentInQueue(sidContext);
segment.putDocument(sidContext);
changed = true;
} else {
sid.setField(uniquefield.getSolrFieldName(), true);

@ -1209,7 +1209,7 @@ public final class Protocol {
Network.log.info("local search (solr): localpeer sent " + container.size() + "/" + numFound + " references");
} else {
for (SolrInputDocument doc: docs) {
event.query.getSegment().putDocumentInQueue(doc);
event.query.getSegment().putDocument(doc);
}
docs.clear(); docs = null;
event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, numFound);

@ -133,7 +133,7 @@ import org.apache.solr.common.SolrInputDocument;
for (SolrDocument doc : xdocs) {
SolrInputDocument idoc = colcfg.toSolrInputDocument(doc);
Switchboard.getSwitchboard().index.putDocumentInQueue(idoc);
Switchboard.getSwitchboard().index.putDocument(idoc);
processed++;
}
}

@ -77,9 +77,7 @@ import net.yacy.kelondro.rwi.ReferenceFactory;
import net.yacy.kelondro.util.Bitfield;
import net.yacy.kelondro.util.ISO639;
import net.yacy.kelondro.util.MemoryControl;
import net.yacy.kelondro.workflow.WorkflowProcessor;
import net.yacy.repository.LoaderDispatcher;
import net.yacy.search.StorageQueueEntry;
import net.yacy.search.query.SearchEvent;
import net.yacy.search.schema.CollectionConfiguration;
import net.yacy.search.schema.CollectionSchema;
@ -117,7 +115,6 @@ public class Segment {
protected final Fulltext fulltext;
protected IndexCell<WordReference> termIndex;
protected IndexCell<CitationReference> urlCitationIndex;
private WorkflowProcessor<StorageQueueEntry> indexingPutDocumentProcessor;
/**
* create a new Segment
@ -136,16 +133,6 @@ public class Segment {
this.fulltext = new Fulltext(segmentPath, archivePath, collectionConfiguration, webgraphConfiguration);
this.termIndex = null;
this.urlCitationIndex = null;
this.indexingPutDocumentProcessor = new WorkflowProcessor<StorageQueueEntry>(
"putDocument",
"solr document put queueing",
new String[] {},
this,
"putDocument",
30,
null,
1);
}
public boolean connectedRWI() {
@ -543,7 +530,6 @@ public class Segment {
}
public synchronized void close() {
this.indexingPutDocumentProcessor.shutdown();
if (this.termIndex != null) this.termIndex.close();
if (this.fulltext != null) this.fulltext.close();
if (this.urlCitationIndex != null) this.urlCitationIndex.close();
@ -607,22 +593,13 @@ public class Segment {
* @param queueEntry
* @throws IOException
*/
public void putDocument(final StorageQueueEntry queueEntry) {
public void putDocument(final SolrInputDocument queueEntry) {
try {
this.fulltext().putDocument(queueEntry.queueEntry);
this.fulltext().putDocument(queueEntry);
} catch (final IOException e) {
ConcurrentLog.logException(e);
}
}
/**
* put a solr document into the index. This is the right point to call when enqueueing of solr document is wanted.
* This method exist to prevent that solr is filled concurrently with data which makes it fail or throw strange exceptions.
* @param queueEntry
*/
public void putDocumentInQueue(final SolrInputDocument queueEntry) {
this.indexingPutDocumentProcessor.enQueue(new StorageQueueEntry(queueEntry));
}
public SolrInputDocument storeDocument(
final DigestURL url,
@ -659,7 +636,7 @@ public class Segment {
// STORE TO SOLR
String error = null;
this.putDocumentInQueue(vector);
this.putDocument(vector);
List<SolrInputDocument> webgraph = vector.getWebgraphDocuments();
if (webgraph != null && webgraph.size() > 0) {

Loading…
Cancel
Save