From 0c1a018bbde9c9e67bc000b6a3fd8dbd1706a6f3 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Wed, 29 May 2013 18:27:27 +0200 Subject: [PATCH] removed 'later' tactic because it used too much RAM, reduced number of soft commits, reduced caching size of search events, ensured that solr results are processed before connection is closed to keep that stuff not too long in RAM --- defaults/solr/solrconfig.xml | 4 +- htroot/yacy/crawlReceipt.java | 2 +- htroot/yacy/transferURL.java | 2 +- .../solr/connector/RemoteSolrConnector.java | 6 +- source/net/yacy/peers/Protocol.java | 33 +++-- source/net/yacy/search/Switchboard.java | 32 ----- source/net/yacy/search/index/Fulltext.java | 135 +----------------- source/net/yacy/search/index/Segment.java | 1 + source/net/yacy/search/query/SearchEvent.java | 2 +- 9 files changed, 38 insertions(+), 179 deletions(-) diff --git a/defaults/solr/solrconfig.xml b/defaults/solr/solrconfig.xml index 784ffb6e3..7fdaf2735 100755 --- a/defaults/solr/solrconfig.xml +++ b/defaults/solr/solrconfig.xml @@ -314,7 +314,7 @@ searcher to be opened to make those changes visible. --> - 15000 + 10000 false @@ -325,7 +325,7 @@ --> diff --git a/htroot/yacy/crawlReceipt.java b/htroot/yacy/crawlReceipt.java index d8980b323..fdf952781 100644 --- a/htroot/yacy/crawlReceipt.java +++ b/htroot/yacy/crawlReceipt.java @@ -146,7 +146,7 @@ public final class crawlReceipt { if ("fill".equals(result)) try { // put new entry into database - sb.index.fulltext().putMetadataLater(entry); + sb.index.fulltext().putMetadata(entry); ResultURLs.stack(ASCII.String(entry.url().hash()), entry.url().getHost(), youare.getBytes(), iam.getBytes(), EventOrigin.REMOTE_RECEIPTS); sb.crawlQueues.delegatedURL.remove(entry.hash()); // the delegated work has been done if (log.isInfo()) log.logInfo("crawlReceipt: RECEIVED RECEIPT from " + otherPeerName + " for URL " + ASCII.String(entry.hash()) + ":" + entry.url().toNormalform(false)); diff --git a/htroot/yacy/transferURL.java b/htroot/yacy/transferURL.java index 2647de94f..908fe66b3 100644 --- a/htroot/yacy/transferURL.java +++ b/htroot/yacy/transferURL.java @@ -154,7 +154,7 @@ public final class transferURL { // write entry to database if (Network.log.isFine()) Network.log.logFine("Accepting URL from peer " + otherPeerName + ": " + lEntry.url().toNormalform(true)); try { - sb.index.fulltext().putMetadataLater(lEntry); + sb.index.fulltext().putMetadata(lEntry); ResultURLs.stack(ASCII.String(lEntry.url().hash()), lEntry.url().getHost(), iam.getBytes(), iam.getBytes(), EventOrigin.DHT_TRANSFER); if (Network.log.isFine()) Network.log.logFine("transferURL: received URL '" + lEntry.url().toNormalform(false) + "' from peer " + otherPeerName); received++; diff --git a/source/net/yacy/cora/federate/solr/connector/RemoteSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/RemoteSolrConnector.java index 65e677652..eccde1f60 100644 --- a/source/net/yacy/cora/federate/solr/connector/RemoteSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/RemoteSolrConnector.java @@ -82,10 +82,13 @@ public class RemoteSolrConnector extends SolrServerConnector implements SolrConn ResponseParser responseParser = new XMLResponseParser(); request.setResponseParser(responseParser); long t = System.currentTimeMillis(); - NamedList result; + NamedList result = null; try { result = server.request(request); } catch (Throwable e) { + throw new IOException(e.getMessage()); + /* + Log.logException(e); server = instance.getServer(this.corename); super.init(server); try { @@ -93,6 +96,7 @@ public class RemoteSolrConnector extends SolrServerConnector implements SolrConn } catch (Throwable e1) { throw new IOException(e1.getMessage()); } + */ } QueryResponse response = new QueryResponse(result, server); response.setElapsedTime(System.currentTimeMillis() - t); diff --git a/source/net/yacy/peers/Protocol.java b/source/net/yacy/peers/Protocol.java index 562dddfb2..094db68f6 100644 --- a/source/net/yacy/peers/Protocol.java +++ b/source/net/yacy/peers/Protocol.java @@ -860,7 +860,7 @@ public final class Protocol { for (URIMetadataRow entry: storeDocs) { try { - event.query.getSegment().fulltext().putMetadataLater(entry); + event.query.getSegment().fulltext().putMetadata(entry); } catch (IOException e) { Log.logException(e); } @@ -1079,6 +1079,8 @@ public final class Protocol { target = event.peers.mySeed(); localsearch = false; } + RemoteInstance instance = null; + SolrConnector solrConnector = null; SolrDocumentList docList = null; QueryResponse rsp = null; if (localsearch) { @@ -1093,8 +1095,8 @@ public final class Protocol { } else { try { String address = target == event.peers.mySeed() ? "localhost:" + target.getPort() : target.getPublicAddress(); - RemoteInstance instance = new RemoteInstance("http://" + address, null, "solr"); // this is a 'patch configuration' which considers 'solr' as default collection - SolrConnector solrConnector = new RemoteSolrConnector(instance, "solr"); + instance = new RemoteInstance("http://" + address, null, "solr"); // this is a 'patch configuration' which considers 'solr' as default collection + solrConnector = new RemoteSolrConnector(instance, "solr"); rsp = solrConnector.getResponseByParams(solrQuery); docList = rsp.getResults(); solrConnector.close(); @@ -1105,7 +1107,7 @@ public final class Protocol { return -1; } } - + // evaluate facets Map> facets = new HashMap>(event.query.facetfields.size()); for (String field: event.query.facetfields) { @@ -1149,7 +1151,7 @@ public final class Protocol { Network.log.logInfo("SEARCH (solr), returned " + docList.size() + " out of " + docList.getNumFound() + " documents from " + (target == null ? "shard" : ("peer " + target.hash + ":" + target.getName()))); int term = count; - final Collection docs = new ArrayList(docList.size()); + Collection docs = new ArrayList(docList.size()); for (final SolrDocument doc: docList) { if ( term-- <= 0 ) { break; // do not process more that requested (in case that evil peers fill us up with rubbish) @@ -1213,15 +1215,22 @@ public final class Protocol { Network.log.logInfo("local search (solr): localpeer sent " + container.get(0).size() + "/" + docList.size() + " references"); } else { // learn the documents, this can be done later - for (SolrInputDocument doc: docs) { - event.query.getSegment().fulltext().putDocumentLater(doc); + try { + event.query.getSegment().fulltext().putDocuments(docs); docs.clear(); docs = null; + event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, (int) docList.getNumFound()); + event.addFinalize(); + event.addExpectedRemoteReferences(-count); + Network.log.logInfo("remote search (solr): peer " + target.getName() + " sent " + (container.size() == 0 ? 0 : container.get(0).size()) + "/" + docList.size() + " references"); + } catch (IOException e) { + Log.logException(e); } - event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, (int) docList.getNumFound()); - event.addFinalize(); - event.addExpectedRemoteReferences(-count); - Network.log.logInfo("remote search (solr): peer " + target.getName() + " sent " + (container.size() == 0 ? 0 : container.get(0).size()) + "/" + docList.size() + " references"); } - return docList.size(); + final int dls = docList.size(); + docList.clear(); + docList = null; + if (solrConnector != null) solrConnector.close(); + if (instance != null) instance.close(); + return dls; } public static Map permissionMessage(final SeedDB seedDB, final String targetHash) { diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 235a4100a..b823a751b 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -81,8 +81,6 @@ import java.util.zip.GZIPOutputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; -import org.apache.pdfbox.cos.COSName; -import org.apache.pdfbox.pdmodel.font.PDFont; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; @@ -950,21 +948,6 @@ public final class Switchboard extends serverSwitch { 10000, Long.MAX_VALUE), 60000); // all 5 Minutes, wait 1 minute until first run - deployThread( - SwitchboardConstants.SEARCHRESULT, - "Search Result Flush", - "A thread that stores search results from other peers into the own index.", - null, - new InstantBusyThread( - this, - SwitchboardConstants.SEARCHRESULT_METHOD_START, - SwitchboardConstants.SEARCHRESULT_METHOD_JOBCOUNT, - SwitchboardConstants.SEARCHRESULT_METHOD_FREEMEM, - 20000, - Long.MAX_VALUE, - 0, - Long.MAX_VALUE), - 30000); deployThread( SwitchboardConstants.SURROGATES, "Surrogates", @@ -1998,24 +1981,9 @@ public final class Switchboard extends serverSwitch { return false; } - - public int searchresultQueueSize() { - return this.index.fulltext().pendingInputDocuments(); - } - public void searchresultFreeMem() { // do nothing } - - public boolean searchresultProcess() { - int count = Math.min(100, 1 + this.index.fulltext().pendingInputDocuments() / 100); - if (MemoryControl.shortStatus()) count = this.index.fulltext().pendingInputDocuments(); - try { - return this.index.fulltext().processPendingInputDocuments(count) > 0; - } catch (IOException e) { - return false; - } - } public int cleanupJobSize() { int c = 1; // "es gibt immer was zu tun" diff --git a/source/net/yacy/search/index/Fulltext.java b/source/net/yacy/search/index/Fulltext.java index 3e61c6975..27fc03555 100644 --- a/source/net/yacy/search/index/Fulltext.java +++ b/source/net/yacy/search/index/Fulltext.java @@ -98,8 +98,6 @@ public final class Fulltext { private InstanceMirror solrInstances; private final CollectionConfiguration collectionConfiguration; private final WebgraphConfiguration webgraphConfiguration; - private final LinkedBlockingQueue pendingCollectionInputRows; - private final LinkedBlockingQueue pendingCollectionInputDocuments; protected Fulltext(final File segmentPath, final CollectionConfiguration collectionConfiguration, final WebgraphConfiguration webgraphConfiguration) { this.segmentPath = segmentPath; @@ -110,8 +108,6 @@ public final class Fulltext { this.solrInstances = new InstanceMirror(); this.collectionConfiguration = collectionConfiguration; this.webgraphConfiguration = webgraphConfiguration; - this.pendingCollectionInputRows = new LinkedBlockingQueue(); - this.pendingCollectionInputDocuments = new LinkedBlockingQueue(); } /** @@ -281,7 +277,7 @@ public final class Fulltext { long t = System.currentTimeMillis(); if (t - this.collectionSizeLastAccess < 1000) return this.collectionSizeLastValue; long size = this.urlIndexFile == null ? 0 : this.urlIndexFile.size(); - size += this.getDefaultConnector().getSize(); + size += this.solrInstances.getDefaultMirrorConnector().getSize(); this.collectionSizeLastAccess = t; this.collectionSizeLastValue = size; return size; @@ -304,7 +300,11 @@ public final class Fulltext { this.solrInstances.close(); } + private long lastCommit = 0; public void commit(boolean softCommit) { + long t = System.currentTimeMillis(); + if (lastCommit + 10000 > t) return; + lastCommit = t; getDefaultConnector().commit(softCommit); getWebgraphConnector().commit(softCommit); } @@ -321,24 +321,7 @@ public final class Fulltext { } public DigestURI getURL(final byte[] urlHash) { - if (urlHash == null) return null; - - // try to get the data from the delayed cache; this happens if we retrieve this from a fresh search result - String u = ASCII.String(urlHash); - for (URIMetadataRow entry: this.pendingCollectionInputRows) { - if (u.equals(ASCII.String(entry.hash()))) { - if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {} // migration - return entry.url(); - } - } - - for (SolrInputDocument doc: this.pendingCollectionInputDocuments) { - if (u.equals(doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))) { - if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {} // migration - String url = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName()); - if (url != null) try {return new DigestURI(url);} catch (MalformedURLException e) {} - } - } + if (urlHash == null || this.getDefaultConnector() == null) return null; String x; try { @@ -372,23 +355,6 @@ public final class Fulltext { private URIMetadataNode getMetadata(final byte[] urlHash, WordReferenceVars wre, long weight) { String u = ASCII.String(urlHash); - // try to get the data from the delayed cache; this happens if we retrieve this from a fresh search result - for (URIMetadataRow entry: this.pendingCollectionInputRows) { - if (u.equals(ASCII.String(entry.hash()))) { - if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {} // migration - SolrDocument sd = this.collectionConfiguration.toSolrDocument(getDefaultConfiguration().metadata2solr(entry)); - return new URIMetadataNode(sd, wre, weight); - } - } - - for (SolrInputDocument doc: this.pendingCollectionInputDocuments) { - if (u.equals(doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))) { - if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {} // migration - SolrDocument sd = this.collectionConfiguration.toSolrDocument(doc); - return new URIMetadataNode(sd, wre, weight); - } - } - // get the metadata from Solr try { SolrDocument doc = this.getDefaultConnector().getDocumentById(u); @@ -443,63 +409,6 @@ public final class Fulltext { if (MemoryControl.shortStatus()) clearCache(); } - public void putDocumentLater(final SolrInputDocument doc) { - if (MemoryControl.shortStatus()) { - try { - putDocument(doc); - return; - } catch (IOException ee) { - Log.logException(ee); - } - } - try { - this.pendingCollectionInputDocuments.put(doc); - } catch (InterruptedException e) { - try { - putDocument(doc); - } catch (IOException ee) { - Log.logException(ee); - } - } - } - - public int pendingInputDocuments() { - return this.pendingCollectionInputRows.size() + this.pendingCollectionInputDocuments.size(); - } - - public int processPendingInputDocuments(int count) throws IOException { - if (count == 0) return 0; - boolean shortMemStatus = MemoryControl.shortStatus(); - if (!shortMemStatus || this.pendingCollectionInputDocuments.size() < count) { - pendingRows2Docs(count); - } - SolrInputDocument doc; - Collection docs = new ArrayList(count); - while ((shortMemStatus || count-- > 0) && (doc = this.pendingCollectionInputDocuments.poll()) != null) { - docs.add(doc); - } - if (docs.size() > 0) this.putDocuments(docs); - return docs.size(); - } - - private void pendingRows2Docs(int count) throws IOException { - URIMetadataRow entry; - while (count-- > 0 && (entry = this.pendingCollectionInputRows.poll()) != null) { - byte[] idb = entry.hash(); - String id = ASCII.String(idb); - try { - if (this.urlIndexFile != null) this.urlIndexFile.remove(idb); - // because node entries are richer than metadata entries we must check if they exist to prevent that they are overwritten - SolrDocument sd = this.getDefaultConnector().getDocumentById(id); - if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) { - putDocumentLater(getDefaultConfiguration().metadata2solr(entry)); - } - } catch (SolrException e) { - throw new IOException(e.getMessage(), e); - } - } - } - public void putEdges(final Collection edges) throws IOException { if (edges == null || edges.size() == 0) return; try { @@ -528,22 +437,6 @@ public final class Fulltext { if (MemoryControl.shortStatus()) clearCache(); } - public void putMetadataLater(final URIMetadataRow entry) throws IOException { - if (MemoryControl.shortStatus()) { - putMetadata(entry); - return; - } - try { - this.pendingCollectionInputRows.put(entry); - } catch (InterruptedException e) { - try { - putMetadata(entry); - } catch (IOException ee) { - Log.logException(ee); - } - } - } - /** * using a fragment of the url hash (6 bytes: bytes 6 to 11) it is possible to address all urls from a specific domain * here such a fragment can be used to delete all these domains at once @@ -704,12 +597,6 @@ public final class Fulltext { @Deprecated public boolean exists(final String urlHash) { if (urlHash == null) return false; - for (URIMetadataRow entry: this.pendingCollectionInputRows) { - if (urlHash.equals(ASCII.String(entry.hash()))) return true; - } - for (SolrInputDocument doc: this.pendingCollectionInputDocuments) { - if (urlHash.equals(doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))) return true; - } try { if (this.getDefaultConnector().existsById(urlHash)) return true; } catch (final Throwable e) { @@ -729,16 +616,6 @@ public final class Fulltext { HashSet e = new HashSet(); if (ids == null || ids.size() == 0) return e; Collection idsC = new HashSet(); - for (String id: ids) { - for (URIMetadataRow entry: this.pendingCollectionInputRows) { - if (id.equals(ASCII.String(entry.hash()))) {e.add(id); continue;} - } - for (SolrInputDocument doc: this.pendingCollectionInputDocuments) { - if (id.equals(doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))) {e.add(id); continue;} - } - if (this.urlIndexFile != null && this.urlIndexFile.has(ASCII.getBytes(id))) {e.add(id); continue;} - idsC.add(id); - } try { Set e1 = this.getDefaultConnector().existsByIds(idsC); e.addAll(e1); diff --git a/source/net/yacy/search/index/Segment.java b/source/net/yacy/search/index/Segment.java index 8d534e1f5..60ad64567 100644 --- a/source/net/yacy/search/index/Segment.java +++ b/source/net/yacy/search/index/Segment.java @@ -284,6 +284,7 @@ public class Segment { * @return the number of references for this word. */ public int getWordCountGuess(String word) { + if (this.fulltext.getDefaultConnector() == null) return 0; if (word == null || word.indexOf(':') >= 0 || word.indexOf(' ') >= 0 || word.indexOf('/') >= 0) return 0; if (this.termIndex != null) { int count = this.termIndex.count(Word.word2hash(word)); diff --git a/source/net/yacy/search/query/SearchEvent.java b/source/net/yacy/search/query/SearchEvent.java index adc8017fc..3b101b093 100644 --- a/source/net/yacy/search/query/SearchEvent.java +++ b/source/net/yacy/search/query/SearchEvent.java @@ -194,7 +194,7 @@ public final class SearchEvent { this.workTables = workTables; this.query = query; this.loader = loader; - this.nodeStack = new WeakPriorityBlockingQueue(300, false); + this.nodeStack = new WeakPriorityBlockingQueue(100, false); this.maxExpectedRemoteReferences = new AtomicInteger(0); this.expectedRemoteReferences = new AtomicInteger(0); // prepare configured search navigation