From 3b1d9dc884f4a5c8ca47e2f1f7e16bbe77c98497 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Sat, 2 Mar 2013 10:25:52 +0100 Subject: [PATCH] made index storage from DHT search result concurrently. This prevents blocking by high CPU usage during search. Also: removed query from Solr for DHT search results; results are taken from the pending queue. --- defaults/yacy.init | 3 + .../kelondro/data/meta/URIMetadataNode.java | 10 -- source/net/yacy/peers/Protocol.java | 26 ++--- source/net/yacy/search/Switchboard.java | 37 +++++- .../net/yacy/search/SwitchboardConstants.java | 16 ++- source/net/yacy/search/index/Fulltext.java | 105 ++++++++++++++---- .../schema/CollectionConfiguration.java | 15 +++ 7 files changed, 158 insertions(+), 54 deletions(-) diff --git a/defaults/yacy.init b/defaults/yacy.init index 0b928bb42..4996e8818 100644 --- a/defaults/yacy.init +++ b/defaults/yacy.init @@ -604,6 +604,9 @@ collection=user 70_surrogates_idlesleep=10000 70_surrogates_busysleep=0 70_surrogates_memprereq=12582912 +80_searchresult_idlesleep=10000 +80_searchresult_busysleep=500 +80_searchresult_memprereq=0 90_cleanup_idlesleep=300000 90_cleanup_busysleep=300000 90_cleanup_memprereq=0 diff --git a/source/net/yacy/kelondro/data/meta/URIMetadataNode.java b/source/net/yacy/kelondro/data/meta/URIMetadataNode.java index c80d21039..c9f809225 100644 --- a/source/net/yacy/kelondro/data/meta/URIMetadataNode.java +++ b/source/net/yacy/kelondro/data/meta/URIMetadataNode.java @@ -79,10 +79,6 @@ public class URIMetadataNode { private String snippet = null; private WordReferenceVars word = null; // this is only used if the url is transported via remote search requests - public URIMetadataNode(final SolrInputDocument doc) { - this(ClientUtils.toSolrDocument(doc)); - } - public URIMetadataNode(final SolrDocument doc) { this.doc = doc; this.snippet = ""; @@ -98,12 +94,6 @@ public class URIMetadataNode { } } - public URIMetadataNode(final SolrInputDocument doc, final WordReferenceVars searchedWord, final long ranking) { - this(ClientUtils.toSolrDocument(doc)); - this.word = searchedWord; - this.ranking = ranking; - } - public URIMetadataNode(final SolrDocument doc, final WordReferenceVars searchedWord, final long ranking) { this(doc); this.word = searchedWord; diff --git a/source/net/yacy/peers/Protocol.java b/source/net/yacy/peers/Protocol.java index aa5002fc2..6dbc08d5e 100644 --- a/source/net/yacy/peers/Protocol.java +++ b/source/net/yacy/peers/Protocol.java @@ -836,10 +836,12 @@ public final class Protocol { } } - try { - event.query.getSegment().fulltext().putMetadata(storeDocs); - } catch ( final IOException e ) { - Network.log.logWarning("could not store search result", e); + for (URIMetadataRow entry: storeDocs) { + try { + event.query.getSegment().fulltext().putMetadataLater(entry); + } catch (IOException e) { + Log.logException(e); + } } // store remote result to local result container @@ -1172,19 +1174,9 @@ public final class Protocol { event.addExpectedRemoteReferences(-count); Network.log.logInfo("local search (solr): localpeer sent " + container.get(0).size() + "/" + docList.size() + " references"); } else { - // learn the documents - if (docs.size() > 0) { - // this can be done later, do that concurrently - new Thread() { - public void run() { - try {Thread.sleep(5000 + 3 * (System.currentTimeMillis() % 1000));} catch (InterruptedException e) {} - try { - event.query.getSegment().fulltext().putDocuments(docs); - } catch ( final IOException e ) { - Network.log.logWarning("could not store search result", e); - } - } - }.start(); + // learn the documents, this can be done later + for (SolrInputDocument doc: docs) { + event.query.getSegment().fulltext().putDocumentLater(doc); } event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, (int) docList.getNumFound()); event.addFinalize(); diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 49b8635dd..0eff8cc9c 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -954,6 +954,21 @@ public final class Switchboard extends serverSwitch { 10000, Long.MAX_VALUE), 60000); // all 5 Minutes, wait 1 minute until first run + deployThread( + SwitchboardConstants.SEARCHRESULT, + "Search Result Flush", + "A thread that stores search results from other peers into the own index.", + null, + new InstantBusyThread( + this, + SwitchboardConstants.SEARCHRESULT_METHOD_START, + SwitchboardConstants.SEARCHRESULT_METHOD_JOBCOUNT, + SwitchboardConstants.SEARCHRESULT_METHOD_FREEMEM, + 20000, + Long.MAX_VALUE, + 0, + Long.MAX_VALUE), + 30000); deployThread( SwitchboardConstants.SURROGATES, "Surrogates", @@ -1982,6 +1997,25 @@ public final class Switchboard extends serverSwitch { return false; } + + public int searchresultQueueSize() { + return this.index.fulltext().pendingInputDocuments(); + } + + public void searchresultFreeMem() { + // do nothing + } + + public boolean searchresultProcess() { + int count = Math.min(100, 1 + this.index.fulltext().pendingInputDocuments() / 100); + if (MemoryControl.shortStatus()) count = this.index.fulltext().pendingInputDocuments(); + try { + return this.index.fulltext().processPendingInputDocuments(count) > 0; + } catch (IOException e) { + return false; + } + } + public int cleanupJobSize() { int c = 1; // "es gibt immer was zu tun" if ( (this.crawlQueues.delegatedURL.stackSize() > 1000) ) { @@ -2772,7 +2806,8 @@ public final class Switchboard extends serverSwitch { initiatorPeer.setAlternativeAddress(this.clusterhashes.get(queueEntry.initiator())); } // start a thread for receipt sending to avoid a blocking here - new Thread(new receiptSending(initiatorPeer, new URIMetadataNode(newEntry)), "sending receipt to " + ASCII.String(queueEntry.initiator())).start(); + SolrDocument sd = this.index.fulltext().getDefaultConfiguration().toSolrDocument(newEntry); + new Thread(new receiptSending(initiatorPeer, new URIMetadataNode(sd)), "sending receipt to " + ASCII.String(queueEntry.initiator())).start(); } } } diff --git a/source/net/yacy/search/SwitchboardConstants.java b/source/net/yacy/search/SwitchboardConstants.java index 8275b6047..042751a4d 100644 --- a/source/net/yacy/search/SwitchboardConstants.java +++ b/source/net/yacy/search/SwitchboardConstants.java @@ -119,10 +119,10 @@ public final class SwitchboardConstants { public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_FREEMEM = null; public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_IDLESLEEP = "62_remotetriggeredcrawl_idlesleep"; public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_BUSYSLEEP = "62_remotetriggeredcrawl_busysleep"; - // 80_indexing + // 70_surrogates /** *

public static final String SURROGATES = "70_surrogates"

- *

A thread that polls the SURROGATES path and puts all Documents in one surroagte file into the indexing queue.

+ *

A thread that polls the SURROGATES path and puts all Documents in one surrogate file into the indexing queue.

*/ public static final String SURROGATES = "70_surrogates"; public static final String SURROGATES_MEMPREREQ = "70_surrogates_memprereq"; @@ -131,6 +131,18 @@ public final class SwitchboardConstants { public static final String SURROGATES_METHOD_START = "surrogateProcess"; public static final String SURROGATES_METHOD_JOBCOUNT = "surrogateQueueSize"; public static final String SURROGATES_METHOD_FREEMEM = "surrogateFreeMem"; + // 80_search_result_processing + /** + *

public static final String SEARCHRESULT = "80_searchresult"

+ *

A thread that stores search results from other peers into the own index.

+ */ + public static final String SEARCHRESULT = "80_searchresult"; + public static final String SEARCHRESULT_MEMPREREQ = "80_searchresult_memprereq"; + public static final String SEARCHRESULT_IDLESLEEP = "80_searchresult_idlesleep"; + public static final String SEARCHRESULT_BUSYSLEEP = "80_searchresult_busysleep"; + public static final String SEARCHRESULT_METHOD_START = "searchresultProcess"; + public static final String SEARCHRESULT_METHOD_JOBCOUNT = "searchresultQueueSize"; + public static final String SEARCHRESULT_METHOD_FREEMEM = "searchresultFreeMem"; // 90_cleanup /** *

public static final String CLEANUP = "90_cleanup"

diff --git a/source/net/yacy/search/index/Fulltext.java b/source/net/yacy/search/index/Fulltext.java index d2ad1b776..71e4de4ef 100644 --- a/source/net/yacy/search/index/Fulltext.java +++ b/source/net/yacy/search/index/Fulltext.java @@ -35,6 +35,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; @@ -95,6 +96,7 @@ public final class Fulltext { private InstanceMirror solrInstances; private final CollectionConfiguration collectionConfiguration; private final WebgraphConfiguration webgraphConfiguration; + private final LinkedBlockingQueue pendingCollectionInputDocuments; protected Fulltext(final File segmentPath, final CollectionConfiguration collectionConfiguration, final WebgraphConfiguration webgraphConfiguration) { this.segmentPath = segmentPath; @@ -105,6 +107,7 @@ public final class Fulltext { this.solrInstances = new InstanceMirror(); this.collectionConfiguration = collectionConfiguration; this.webgraphConfiguration = webgraphConfiguration; + this.pendingCollectionInputDocuments = new LinkedBlockingQueue(); } /** @@ -331,10 +334,21 @@ public final class Fulltext { } private URIMetadataNode getMetadata(final byte[] urlHash, WordReferenceVars wre, long weight) { - + String u = ASCII.String(urlHash); + + // try to get the data from the delayed cache; this happens if we retrieve this from a fresh search result + for (SolrInputDocument doc: this.pendingCollectionInputDocuments) { + String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); + if (id != null && id.equals(u)) { + if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {} + SolrDocument sd = this.collectionConfiguration.toSolrDocument(doc); + return new URIMetadataNode(sd, wre, weight); + } + } + // get the metadata from Solr try { - SolrDocument doc = this.getDefaultConnector().getById(ASCII.String(urlHash)); + SolrDocument doc = this.getDefaultConnector().getById(u); if (doc != null) { if (this.urlIndexFile != null) this.urlIndexFile.remove(urlHash); return new URIMetadataNode(doc, wre, weight); @@ -351,7 +365,8 @@ public final class Fulltext { URIMetadataRow row = new URIMetadataRow(entry, wre); SolrInputDocument solrInput = this.collectionConfiguration.metadata2solr(row); this.putDocument(solrInput); - return new URIMetadataNode(solrInput, wre, weight); + SolrDocument sd = this.collectionConfiguration.toSolrDocument(solrInput); + return new URIMetadataNode(sd, wre, weight); } catch (final IOException e) { Log.logException(e); } @@ -390,6 +405,39 @@ public final class Fulltext { if (MemoryControl.shortStatus()) clearCache(); } + public void putDocumentLater(final SolrInputDocument doc) { + try { + this.pendingCollectionInputDocuments.put(doc); + } catch (InterruptedException e) { + try { + putDocument(doc); + } catch (IOException ee) { + Log.logException(ee); + } + } + } + + public int pendingInputDocuments() { + return this.pendingCollectionInputDocuments.size(); + } + + public int processPendingInputDocuments(int count) throws IOException { + if (count == 0) return 0; + if (count == 1) { + SolrInputDocument doc = this.pendingCollectionInputDocuments.poll(); + if (doc == null) return 0; + this.putDocument(doc); + return 1; + } + SolrInputDocument doc; + Collection docs = new ArrayList(count); + while (count-- > 0 && (doc = this.pendingCollectionInputDocuments.poll()) != null) { + docs.add(doc); + } + if (docs.size() > 0) this.putDocuments(docs); + return docs.size(); + } + public void putEdges(final Collection edges) throws IOException { try { this.getWebgraphConnector().add(edges); @@ -399,21 +447,18 @@ public final class Fulltext { this.statsDump = null; if (MemoryControl.shortStatus()) clearCache(); } - + public void putMetadata(final URIMetadataRow entry) throws IOException { byte[] idb = entry.hash(); - //String id = ASCII.String(idb); + String id = ASCII.String(idb); try { if (this.urlIndexFile != null) this.urlIndexFile.remove(idb); - //SolrDocument sd = this.getDefaultConnector().getById(id); - //if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) { - if (this.collectionConfiguration.contains(CollectionSchema.ip_s)) { - // ip_s needs a dns lookup which causes blockings during search here - this.getDefaultConnector().add(getDefaultConfiguration().metadata2solr(entry)); - } else synchronized (this.solrInstances) { - this.getDefaultConnector().add(getDefaultConfiguration().metadata2solr(entry)); - } - //} + // because node entries are richer than metadata entries we must check if they exist to prevent that they are overwritten + SolrDocument sd = this.getDefaultConnector().getById(id); + if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) { + SolrInputDocument doc = getDefaultConfiguration().metadata2solr(entry); + putDocument(doc); + } } catch (SolrException e) { throw new IOException(e.getMessage(), e); } @@ -421,18 +466,30 @@ public final class Fulltext { if (MemoryControl.shortStatus()) clearCache(); } - public void putMetadata(final Collection entries) throws IOException { - Collection docs = new ArrayList(entries.size()); - for (URIMetadataRow entry: entries) { - byte[] idb = entry.hash(); - //String id = ASCII.String(idb); + public void putMetadataLater(final URIMetadataRow entry) throws IOException { + byte[] idb = entry.hash(); + String id = ASCII.String(idb); + try { if (this.urlIndexFile != null) this.urlIndexFile.remove(idb); - //SolrDocument sd = this.getDefaultConnector().getById(id, CollectionSchema.last_modified.getSolrFieldName(), CollectionSchema.load_date_dt.getSolrFieldName()); - //if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) { - docs.add(getDefaultConfiguration().metadata2solr(entry)); - //} + // because node entries are richer than metadata entries we must check if they exist to prevent that they are overwritten + SolrDocument sd = this.getDefaultConnector().getById(id); + if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) { + SolrInputDocument doc = getDefaultConfiguration().metadata2solr(entry); + try { + this.pendingCollectionInputDocuments.put(doc); + } catch (InterruptedException e) { + try { + putDocument(doc); + } catch (IOException ee) { + Log.logException(ee); + } + } + } + } catch (SolrException e) { + throw new IOException(e.getMessage(), e); } - putDocuments(docs); + this.statsDump = null; + if (MemoryControl.shortStatus()) clearCache(); } /** diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index 67370d25b..bbcc582a7 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -70,6 +70,7 @@ import net.yacy.kelondro.util.ByteBuffer; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrInputField; public class CollectionConfiguration extends SchemaConfiguration implements Serializable { @@ -160,6 +161,20 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri return sid; } + public SolrDocument toSolrDocument(SolrInputDocument doc) { + SolrDocument sid = new SolrDocument(); + Set omitFields = new HashSet(3); + omitFields.add(CollectionSchema.author_sxt.getSolrFieldName()); + omitFields.add(CollectionSchema.coordinate_p_0_coordinate.getSolrFieldName()); + omitFields.add(CollectionSchema.coordinate_p_1_coordinate.getSolrFieldName()); + for (SolrInputField field: doc) { + if (this.contains(field.getName()) && !omitFields.contains(field.getName())) { // check each field if enabled in local Solr schema + sid.setField(field.getName(), field.getValue()); + } + } + return sid; + } + public SolrInputDocument metadata2solr(final URIMetadataRow md) { final SolrInputDocument doc = new SolrInputDocument();