diff --git a/defaults/yacy.init b/defaults/yacy.init
index 0b928bb42..4996e8818 100644
--- a/defaults/yacy.init
+++ b/defaults/yacy.init
@@ -604,6 +604,9 @@ collection=user
70_surrogates_idlesleep=10000
70_surrogates_busysleep=0
70_surrogates_memprereq=12582912
+80_searchresult_idlesleep=10000
+80_searchresult_busysleep=500
+80_searchresult_memprereq=0
90_cleanup_idlesleep=300000
90_cleanup_busysleep=300000
90_cleanup_memprereq=0
diff --git a/source/net/yacy/kelondro/data/meta/URIMetadataNode.java b/source/net/yacy/kelondro/data/meta/URIMetadataNode.java
index c80d21039..c9f809225 100644
--- a/source/net/yacy/kelondro/data/meta/URIMetadataNode.java
+++ b/source/net/yacy/kelondro/data/meta/URIMetadataNode.java
@@ -79,10 +79,6 @@ public class URIMetadataNode {
private String snippet = null;
private WordReferenceVars word = null; // this is only used if the url is transported via remote search requests
- public URIMetadataNode(final SolrInputDocument doc) {
- this(ClientUtils.toSolrDocument(doc));
- }
-
public URIMetadataNode(final SolrDocument doc) {
this.doc = doc;
this.snippet = "";
@@ -98,12 +94,6 @@ public class URIMetadataNode {
}
}
- public URIMetadataNode(final SolrInputDocument doc, final WordReferenceVars searchedWord, final long ranking) {
- this(ClientUtils.toSolrDocument(doc));
- this.word = searchedWord;
- this.ranking = ranking;
- }
-
public URIMetadataNode(final SolrDocument doc, final WordReferenceVars searchedWord, final long ranking) {
this(doc);
this.word = searchedWord;
diff --git a/source/net/yacy/peers/Protocol.java b/source/net/yacy/peers/Protocol.java
index aa5002fc2..6dbc08d5e 100644
--- a/source/net/yacy/peers/Protocol.java
+++ b/source/net/yacy/peers/Protocol.java
@@ -836,10 +836,12 @@ public final class Protocol {
}
}
- try {
- event.query.getSegment().fulltext().putMetadata(storeDocs);
- } catch ( final IOException e ) {
- Network.log.logWarning("could not store search result", e);
+ for (URIMetadataRow entry: storeDocs) {
+ try {
+ event.query.getSegment().fulltext().putMetadataLater(entry);
+ } catch (IOException e) {
+ Log.logException(e);
+ }
}
// store remote result to local result container
@@ -1172,19 +1174,9 @@ public final class Protocol {
event.addExpectedRemoteReferences(-count);
Network.log.logInfo("local search (solr): localpeer sent " + container.get(0).size() + "/" + docList.size() + " references");
} else {
- // learn the documents
- if (docs.size() > 0) {
- // this can be done later, do that concurrently
- new Thread() {
- public void run() {
- try {Thread.sleep(5000 + 3 * (System.currentTimeMillis() % 1000));} catch (InterruptedException e) {}
- try {
- event.query.getSegment().fulltext().putDocuments(docs);
- } catch ( final IOException e ) {
- Network.log.logWarning("could not store search result", e);
- }
- }
- }.start();
+ // learn the documents, this can be done later
+ for (SolrInputDocument doc: docs) {
+ event.query.getSegment().fulltext().putDocumentLater(doc);
}
event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, (int) docList.getNumFound());
event.addFinalize();
diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java
index 49b8635dd..0eff8cc9c 100644
--- a/source/net/yacy/search/Switchboard.java
+++ b/source/net/yacy/search/Switchboard.java
@@ -954,6 +954,21 @@ public final class Switchboard extends serverSwitch {
10000,
Long.MAX_VALUE),
60000); // all 5 Minutes, wait 1 minute until first run
+ deployThread(
+ SwitchboardConstants.SEARCHRESULT,
+ "Search Result Flush",
+ "A thread that stores search results from other peers into the own index.",
+ null,
+ new InstantBusyThread(
+ this,
+ SwitchboardConstants.SEARCHRESULT_METHOD_START,
+ SwitchboardConstants.SEARCHRESULT_METHOD_JOBCOUNT,
+ SwitchboardConstants.SEARCHRESULT_METHOD_FREEMEM,
+ 20000,
+ Long.MAX_VALUE,
+ 0,
+ Long.MAX_VALUE),
+ 30000);
deployThread(
SwitchboardConstants.SURROGATES,
"Surrogates",
@@ -1982,6 +1997,25 @@ public final class Switchboard extends serverSwitch {
return false;
}
+
+ public int searchresultQueueSize() {
+ return this.index.fulltext().pendingInputDocuments();
+ }
+
+ public void searchresultFreeMem() {
+ // do nothing
+ }
+
+ public boolean searchresultProcess() {
+ int count = Math.min(100, 1 + this.index.fulltext().pendingInputDocuments() / 100);
+ if (MemoryControl.shortStatus()) count = this.index.fulltext().pendingInputDocuments();
+ try {
+ return this.index.fulltext().processPendingInputDocuments(count) > 0;
+ } catch (IOException e) {
+ return false;
+ }
+ }
+
public int cleanupJobSize() {
int c = 1; // "es gibt immer was zu tun"
if ( (this.crawlQueues.delegatedURL.stackSize() > 1000) ) {
@@ -2772,7 +2806,8 @@ public final class Switchboard extends serverSwitch {
initiatorPeer.setAlternativeAddress(this.clusterhashes.get(queueEntry.initiator()));
}
// start a thread for receipt sending to avoid a blocking here
- new Thread(new receiptSending(initiatorPeer, new URIMetadataNode(newEntry)), "sending receipt to " + ASCII.String(queueEntry.initiator())).start();
+ SolrDocument sd = this.index.fulltext().getDefaultConfiguration().toSolrDocument(newEntry);
+ new Thread(new receiptSending(initiatorPeer, new URIMetadataNode(sd)), "sending receipt to " + ASCII.String(queueEntry.initiator())).start();
}
}
}
diff --git a/source/net/yacy/search/SwitchboardConstants.java b/source/net/yacy/search/SwitchboardConstants.java
index 8275b6047..042751a4d 100644
--- a/source/net/yacy/search/SwitchboardConstants.java
+++ b/source/net/yacy/search/SwitchboardConstants.java
@@ -119,10 +119,10 @@ public final class SwitchboardConstants {
public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_FREEMEM = null;
public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_IDLESLEEP = "62_remotetriggeredcrawl_idlesleep";
public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_BUSYSLEEP = "62_remotetriggeredcrawl_busysleep";
- // 80_indexing
+ // 70_surrogates
/**
*
public static final String SURROGATES = "70_surrogates"
- * A thread that polls the SURROGATES path and puts all Documents in one surroagte file into the indexing queue.
+ * A thread that polls the SURROGATES path and puts all Documents in one surrogate file into the indexing queue.
*/
public static final String SURROGATES = "70_surrogates";
public static final String SURROGATES_MEMPREREQ = "70_surrogates_memprereq";
@@ -131,6 +131,18 @@ public final class SwitchboardConstants {
public static final String SURROGATES_METHOD_START = "surrogateProcess";
public static final String SURROGATES_METHOD_JOBCOUNT = "surrogateQueueSize";
public static final String SURROGATES_METHOD_FREEMEM = "surrogateFreeMem";
+ // 80_search_result_processing
+ /**
+ * public static final String SEARCHRESULT = "80_searchresult"
+ * A thread that stores search results from other peers into the own index.
+ */
+ public static final String SEARCHRESULT = "80_searchresult";
+ public static final String SEARCHRESULT_MEMPREREQ = "80_searchresult_memprereq";
+ public static final String SEARCHRESULT_IDLESLEEP = "80_searchresult_idlesleep";
+ public static final String SEARCHRESULT_BUSYSLEEP = "80_searchresult_busysleep";
+ public static final String SEARCHRESULT_METHOD_START = "searchresultProcess";
+ public static final String SEARCHRESULT_METHOD_JOBCOUNT = "searchresultQueueSize";
+ public static final String SEARCHRESULT_METHOD_FREEMEM = "searchresultFreeMem";
// 90_cleanup
/**
* public static final String CLEANUP = "90_cleanup"
diff --git a/source/net/yacy/search/index/Fulltext.java b/source/net/yacy/search/index/Fulltext.java
index d2ad1b776..71e4de4ef 100644
--- a/source/net/yacy/search/index/Fulltext.java
+++ b/source/net/yacy/search/index/Fulltext.java
@@ -35,6 +35,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@@ -95,6 +96,7 @@ public final class Fulltext {
private InstanceMirror solrInstances;
private final CollectionConfiguration collectionConfiguration;
private final WebgraphConfiguration webgraphConfiguration;
+ private final LinkedBlockingQueue pendingCollectionInputDocuments;
protected Fulltext(final File segmentPath, final CollectionConfiguration collectionConfiguration, final WebgraphConfiguration webgraphConfiguration) {
this.segmentPath = segmentPath;
@@ -105,6 +107,7 @@ public final class Fulltext {
this.solrInstances = new InstanceMirror();
this.collectionConfiguration = collectionConfiguration;
this.webgraphConfiguration = webgraphConfiguration;
+ this.pendingCollectionInputDocuments = new LinkedBlockingQueue();
}
/**
@@ -331,10 +334,21 @@ public final class Fulltext {
}
private URIMetadataNode getMetadata(final byte[] urlHash, WordReferenceVars wre, long weight) {
-
+ String u = ASCII.String(urlHash);
+
+ // try to get the data from the delayed cache; this happens if we retrieve this from a fresh search result
+ for (SolrInputDocument doc: this.pendingCollectionInputDocuments) {
+ String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
+ if (id != null && id.equals(u)) {
+ if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {}
+ SolrDocument sd = this.collectionConfiguration.toSolrDocument(doc);
+ return new URIMetadataNode(sd, wre, weight);
+ }
+ }
+
// get the metadata from Solr
try {
- SolrDocument doc = this.getDefaultConnector().getById(ASCII.String(urlHash));
+ SolrDocument doc = this.getDefaultConnector().getById(u);
if (doc != null) {
if (this.urlIndexFile != null) this.urlIndexFile.remove(urlHash);
return new URIMetadataNode(doc, wre, weight);
@@ -351,7 +365,8 @@ public final class Fulltext {
URIMetadataRow row = new URIMetadataRow(entry, wre);
SolrInputDocument solrInput = this.collectionConfiguration.metadata2solr(row);
this.putDocument(solrInput);
- return new URIMetadataNode(solrInput, wre, weight);
+ SolrDocument sd = this.collectionConfiguration.toSolrDocument(solrInput);
+ return new URIMetadataNode(sd, wre, weight);
} catch (final IOException e) {
Log.logException(e);
}
@@ -390,6 +405,39 @@ public final class Fulltext {
if (MemoryControl.shortStatus()) clearCache();
}
+ public void putDocumentLater(final SolrInputDocument doc) {
+ try {
+ this.pendingCollectionInputDocuments.put(doc);
+ } catch (InterruptedException e) {
+ try {
+ putDocument(doc);
+ } catch (IOException ee) {
+ Log.logException(ee);
+ }
+ }
+ }
+
+ public int pendingInputDocuments() {
+ return this.pendingCollectionInputDocuments.size();
+ }
+
+ public int processPendingInputDocuments(int count) throws IOException {
+ if (count == 0) return 0;
+ if (count == 1) {
+ SolrInputDocument doc = this.pendingCollectionInputDocuments.poll();
+ if (doc == null) return 0;
+ this.putDocument(doc);
+ return 1;
+ }
+ SolrInputDocument doc;
+ Collection docs = new ArrayList(count);
+ while (count-- > 0 && (doc = this.pendingCollectionInputDocuments.poll()) != null) {
+ docs.add(doc);
+ }
+ if (docs.size() > 0) this.putDocuments(docs);
+ return docs.size();
+ }
+
public void putEdges(final Collection edges) throws IOException {
try {
this.getWebgraphConnector().add(edges);
@@ -399,21 +447,18 @@ public final class Fulltext {
this.statsDump = null;
if (MemoryControl.shortStatus()) clearCache();
}
-
+
public void putMetadata(final URIMetadataRow entry) throws IOException {
byte[] idb = entry.hash();
- //String id = ASCII.String(idb);
+ String id = ASCII.String(idb);
try {
if (this.urlIndexFile != null) this.urlIndexFile.remove(idb);
- //SolrDocument sd = this.getDefaultConnector().getById(id);
- //if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) {
- if (this.collectionConfiguration.contains(CollectionSchema.ip_s)) {
- // ip_s needs a dns lookup which causes blockings during search here
- this.getDefaultConnector().add(getDefaultConfiguration().metadata2solr(entry));
- } else synchronized (this.solrInstances) {
- this.getDefaultConnector().add(getDefaultConfiguration().metadata2solr(entry));
- }
- //}
+ // because node entries are richer than metadata entries we must check if they exist to prevent that they are overwritten
+ SolrDocument sd = this.getDefaultConnector().getById(id);
+ if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) {
+ SolrInputDocument doc = getDefaultConfiguration().metadata2solr(entry);
+ putDocument(doc);
+ }
} catch (SolrException e) {
throw new IOException(e.getMessage(), e);
}
@@ -421,18 +466,30 @@ public final class Fulltext {
if (MemoryControl.shortStatus()) clearCache();
}
- public void putMetadata(final Collection entries) throws IOException {
- Collection docs = new ArrayList(entries.size());
- for (URIMetadataRow entry: entries) {
- byte[] idb = entry.hash();
- //String id = ASCII.String(idb);
+ public void putMetadataLater(final URIMetadataRow entry) throws IOException {
+ byte[] idb = entry.hash();
+ String id = ASCII.String(idb);
+ try {
if (this.urlIndexFile != null) this.urlIndexFile.remove(idb);
- //SolrDocument sd = this.getDefaultConnector().getById(id, CollectionSchema.last_modified.getSolrFieldName(), CollectionSchema.load_date_dt.getSolrFieldName());
- //if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) {
- docs.add(getDefaultConfiguration().metadata2solr(entry));
- //}
+ // because node entries are richer than metadata entries we must check if they exist to prevent that they are overwritten
+ SolrDocument sd = this.getDefaultConnector().getById(id);
+ if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) {
+ SolrInputDocument doc = getDefaultConfiguration().metadata2solr(entry);
+ try {
+ this.pendingCollectionInputDocuments.put(doc);
+ } catch (InterruptedException e) {
+ try {
+ putDocument(doc);
+ } catch (IOException ee) {
+ Log.logException(ee);
+ }
+ }
+ }
+ } catch (SolrException e) {
+ throw new IOException(e.getMessage(), e);
}
- putDocuments(docs);
+ this.statsDump = null;
+ if (MemoryControl.shortStatus()) clearCache();
}
/**
diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java
index 67370d25b..bbcc582a7 100644
--- a/source/net/yacy/search/schema/CollectionConfiguration.java
+++ b/source/net/yacy/search/schema/CollectionConfiguration.java
@@ -70,6 +70,7 @@ import net.yacy.kelondro.util.ByteBuffer;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
public class CollectionConfiguration extends SchemaConfiguration implements Serializable {
@@ -160,6 +161,20 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
return sid;
}
+ public SolrDocument toSolrDocument(SolrInputDocument doc) {
+ SolrDocument sid = new SolrDocument();
+ Set omitFields = new HashSet(3);
+ omitFields.add(CollectionSchema.author_sxt.getSolrFieldName());
+ omitFields.add(CollectionSchema.coordinate_p_0_coordinate.getSolrFieldName());
+ omitFields.add(CollectionSchema.coordinate_p_1_coordinate.getSolrFieldName());
+ for (SolrInputField field: doc) {
+ if (this.contains(field.getName()) && !omitFields.contains(field.getName())) { // check each field if enabled in local Solr schema
+ sid.setField(field.getName(), field.getValue());
+ }
+ }
+ return sid;
+ }
+
public SolrInputDocument metadata2solr(final URIMetadataRow md) {
final SolrInputDocument doc = new SolrInputDocument();