made index storage from DHT search result concurrently. This prevents

blocking by high CPU usage during search. Also: removed query from Solr
for DHT search results; results are taken from the pending queue.
pull/1/head
Michael Peter Christen 12 years ago
parent f13c0b2abd
commit 3b1d9dc884

@ -604,6 +604,9 @@ collection=user
70_surrogates_idlesleep=10000
70_surrogates_busysleep=0
70_surrogates_memprereq=12582912
80_searchresult_idlesleep=10000
80_searchresult_busysleep=500
80_searchresult_memprereq=0
90_cleanup_idlesleep=300000
90_cleanup_busysleep=300000
90_cleanup_memprereq=0

@ -79,10 +79,6 @@ public class URIMetadataNode {
private String snippet = null;
private WordReferenceVars word = null; // this is only used if the url is transported via remote search requests
public URIMetadataNode(final SolrInputDocument doc) {
this(ClientUtils.toSolrDocument(doc));
}
public URIMetadataNode(final SolrDocument doc) {
this.doc = doc;
this.snippet = "";
@ -98,12 +94,6 @@ public class URIMetadataNode {
}
}
public URIMetadataNode(final SolrInputDocument doc, final WordReferenceVars searchedWord, final long ranking) {
this(ClientUtils.toSolrDocument(doc));
this.word = searchedWord;
this.ranking = ranking;
}
public URIMetadataNode(final SolrDocument doc, final WordReferenceVars searchedWord, final long ranking) {
this(doc);
this.word = searchedWord;

@ -836,10 +836,12 @@ public final class Protocol {
}
}
try {
event.query.getSegment().fulltext().putMetadata(storeDocs);
} catch ( final IOException e ) {
Network.log.logWarning("could not store search result", e);
for (URIMetadataRow entry: storeDocs) {
try {
event.query.getSegment().fulltext().putMetadataLater(entry);
} catch (IOException e) {
Log.logException(e);
}
}
// store remote result to local result container
@ -1172,19 +1174,9 @@ public final class Protocol {
event.addExpectedRemoteReferences(-count);
Network.log.logInfo("local search (solr): localpeer sent " + container.get(0).size() + "/" + docList.size() + " references");
} else {
// learn the documents
if (docs.size() > 0) {
// this can be done later, do that concurrently
new Thread() {
public void run() {
try {Thread.sleep(5000 + 3 * (System.currentTimeMillis() % 1000));} catch (InterruptedException e) {}
try {
event.query.getSegment().fulltext().putDocuments(docs);
} catch ( final IOException e ) {
Network.log.logWarning("could not store search result", e);
}
}
}.start();
// learn the documents, this can be done later
for (SolrInputDocument doc: docs) {
event.query.getSegment().fulltext().putDocumentLater(doc);
}
event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, (int) docList.getNumFound());
event.addFinalize();

@ -954,6 +954,21 @@ public final class Switchboard extends serverSwitch {
10000,
Long.MAX_VALUE),
60000); // all 5 Minutes, wait 1 minute until first run
deployThread(
SwitchboardConstants.SEARCHRESULT,
"Search Result Flush",
"A thread that stores search results from other peers into the own index.",
null,
new InstantBusyThread(
this,
SwitchboardConstants.SEARCHRESULT_METHOD_START,
SwitchboardConstants.SEARCHRESULT_METHOD_JOBCOUNT,
SwitchboardConstants.SEARCHRESULT_METHOD_FREEMEM,
20000,
Long.MAX_VALUE,
0,
Long.MAX_VALUE),
30000);
deployThread(
SwitchboardConstants.SURROGATES,
"Surrogates",
@ -1982,6 +1997,25 @@ public final class Switchboard extends serverSwitch {
return false;
}
public int searchresultQueueSize() {
return this.index.fulltext().pendingInputDocuments();
}
public void searchresultFreeMem() {
// do nothing
}
public boolean searchresultProcess() {
int count = Math.min(100, 1 + this.index.fulltext().pendingInputDocuments() / 100);
if (MemoryControl.shortStatus()) count = this.index.fulltext().pendingInputDocuments();
try {
return this.index.fulltext().processPendingInputDocuments(count) > 0;
} catch (IOException e) {
return false;
}
}
public int cleanupJobSize() {
int c = 1; // "es gibt immer was zu tun"
if ( (this.crawlQueues.delegatedURL.stackSize() > 1000) ) {
@ -2772,7 +2806,8 @@ public final class Switchboard extends serverSwitch {
initiatorPeer.setAlternativeAddress(this.clusterhashes.get(queueEntry.initiator()));
}
// start a thread for receipt sending to avoid a blocking here
new Thread(new receiptSending(initiatorPeer, new URIMetadataNode(newEntry)), "sending receipt to " + ASCII.String(queueEntry.initiator())).start();
SolrDocument sd = this.index.fulltext().getDefaultConfiguration().toSolrDocument(newEntry);
new Thread(new receiptSending(initiatorPeer, new URIMetadataNode(sd)), "sending receipt to " + ASCII.String(queueEntry.initiator())).start();
}
}
}

@ -119,10 +119,10 @@ public final class SwitchboardConstants {
public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_FREEMEM = null;
public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_IDLESLEEP = "62_remotetriggeredcrawl_idlesleep";
public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_BUSYSLEEP = "62_remotetriggeredcrawl_busysleep";
// 80_indexing
// 70_surrogates
/**
* <p><code>public static final String <strong>SURROGATES</strong> = "70_surrogates"</code></p>
* <p>A thread that polls the SURROGATES path and puts all Documents in one surroagte file into the indexing queue.</p>
* <p>A thread that polls the SURROGATES path and puts all Documents in one surrogate file into the indexing queue.</p>
*/
public static final String SURROGATES = "70_surrogates";
public static final String SURROGATES_MEMPREREQ = "70_surrogates_memprereq";
@ -131,6 +131,18 @@ public final class SwitchboardConstants {
public static final String SURROGATES_METHOD_START = "surrogateProcess";
public static final String SURROGATES_METHOD_JOBCOUNT = "surrogateQueueSize";
public static final String SURROGATES_METHOD_FREEMEM = "surrogateFreeMem";
// 80_search_result_processing
/**
* <p><code>public static final String <strong>SEARCHRESULT</strong> = "80_searchresult"</code></p>
* <p>A thread that stores search results from other peers into the own index.</p>
*/
public static final String SEARCHRESULT = "80_searchresult";
public static final String SEARCHRESULT_MEMPREREQ = "80_searchresult_memprereq";
public static final String SEARCHRESULT_IDLESLEEP = "80_searchresult_idlesleep";
public static final String SEARCHRESULT_BUSYSLEEP = "80_searchresult_busysleep";
public static final String SEARCHRESULT_METHOD_START = "searchresultProcess";
public static final String SEARCHRESULT_METHOD_JOBCOUNT = "searchresultQueueSize";
public static final String SEARCHRESULT_METHOD_FREEMEM = "searchresultFreeMem";
// 90_cleanup
/**
* <p><code>public static final String <strong>CLEANUP</strong> = "90_cleanup"</code></p>

@ -35,6 +35,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@ -95,6 +96,7 @@ public final class Fulltext {
private InstanceMirror solrInstances;
private final CollectionConfiguration collectionConfiguration;
private final WebgraphConfiguration webgraphConfiguration;
private final LinkedBlockingQueue<SolrInputDocument> pendingCollectionInputDocuments;
protected Fulltext(final File segmentPath, final CollectionConfiguration collectionConfiguration, final WebgraphConfiguration webgraphConfiguration) {
this.segmentPath = segmentPath;
@ -105,6 +107,7 @@ public final class Fulltext {
this.solrInstances = new InstanceMirror();
this.collectionConfiguration = collectionConfiguration;
this.webgraphConfiguration = webgraphConfiguration;
this.pendingCollectionInputDocuments = new LinkedBlockingQueue<SolrInputDocument>();
}
/**
@ -331,10 +334,21 @@ public final class Fulltext {
}
private URIMetadataNode getMetadata(final byte[] urlHash, WordReferenceVars wre, long weight) {
String u = ASCII.String(urlHash);
// try to get the data from the delayed cache; this happens if we retrieve this from a fresh search result
for (SolrInputDocument doc: this.pendingCollectionInputDocuments) {
String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
if (id != null && id.equals(u)) {
if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {}
SolrDocument sd = this.collectionConfiguration.toSolrDocument(doc);
return new URIMetadataNode(sd, wre, weight);
}
}
// get the metadata from Solr
try {
SolrDocument doc = this.getDefaultConnector().getById(ASCII.String(urlHash));
SolrDocument doc = this.getDefaultConnector().getById(u);
if (doc != null) {
if (this.urlIndexFile != null) this.urlIndexFile.remove(urlHash);
return new URIMetadataNode(doc, wre, weight);
@ -351,7 +365,8 @@ public final class Fulltext {
URIMetadataRow row = new URIMetadataRow(entry, wre);
SolrInputDocument solrInput = this.collectionConfiguration.metadata2solr(row);
this.putDocument(solrInput);
return new URIMetadataNode(solrInput, wre, weight);
SolrDocument sd = this.collectionConfiguration.toSolrDocument(solrInput);
return new URIMetadataNode(sd, wre, weight);
} catch (final IOException e) {
Log.logException(e);
}
@ -390,6 +405,39 @@ public final class Fulltext {
if (MemoryControl.shortStatus()) clearCache();
}
public void putDocumentLater(final SolrInputDocument doc) {
try {
this.pendingCollectionInputDocuments.put(doc);
} catch (InterruptedException e) {
try {
putDocument(doc);
} catch (IOException ee) {
Log.logException(ee);
}
}
}
public int pendingInputDocuments() {
return this.pendingCollectionInputDocuments.size();
}
public int processPendingInputDocuments(int count) throws IOException {
if (count == 0) return 0;
if (count == 1) {
SolrInputDocument doc = this.pendingCollectionInputDocuments.poll();
if (doc == null) return 0;
this.putDocument(doc);
return 1;
}
SolrInputDocument doc;
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(count);
while (count-- > 0 && (doc = this.pendingCollectionInputDocuments.poll()) != null) {
docs.add(doc);
}
if (docs.size() > 0) this.putDocuments(docs);
return docs.size();
}
public void putEdges(final Collection<SolrInputDocument> edges) throws IOException {
try {
this.getWebgraphConnector().add(edges);
@ -399,21 +447,18 @@ public final class Fulltext {
this.statsDump = null;
if (MemoryControl.shortStatus()) clearCache();
}
public void putMetadata(final URIMetadataRow entry) throws IOException {
byte[] idb = entry.hash();
//String id = ASCII.String(idb);
String id = ASCII.String(idb);
try {
if (this.urlIndexFile != null) this.urlIndexFile.remove(idb);
//SolrDocument sd = this.getDefaultConnector().getById(id);
//if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) {
if (this.collectionConfiguration.contains(CollectionSchema.ip_s)) {
// ip_s needs a dns lookup which causes blockings during search here
this.getDefaultConnector().add(getDefaultConfiguration().metadata2solr(entry));
} else synchronized (this.solrInstances) {
this.getDefaultConnector().add(getDefaultConfiguration().metadata2solr(entry));
}
//}
// because node entries are richer than metadata entries we must check if they exist to prevent that they are overwritten
SolrDocument sd = this.getDefaultConnector().getById(id);
if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) {
SolrInputDocument doc = getDefaultConfiguration().metadata2solr(entry);
putDocument(doc);
}
} catch (SolrException e) {
throw new IOException(e.getMessage(), e);
}
@ -421,18 +466,30 @@ public final class Fulltext {
if (MemoryControl.shortStatus()) clearCache();
}
public void putMetadata(final Collection<URIMetadataRow> entries) throws IOException {
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(entries.size());
for (URIMetadataRow entry: entries) {
byte[] idb = entry.hash();
//String id = ASCII.String(idb);
public void putMetadataLater(final URIMetadataRow entry) throws IOException {
byte[] idb = entry.hash();
String id = ASCII.String(idb);
try {
if (this.urlIndexFile != null) this.urlIndexFile.remove(idb);
//SolrDocument sd = this.getDefaultConnector().getById(id, CollectionSchema.last_modified.getSolrFieldName(), CollectionSchema.load_date_dt.getSolrFieldName());
//if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) {
docs.add(getDefaultConfiguration().metadata2solr(entry));
//}
// because node entries are richer than metadata entries we must check if they exist to prevent that they are overwritten
SolrDocument sd = this.getDefaultConnector().getById(id);
if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) {
SolrInputDocument doc = getDefaultConfiguration().metadata2solr(entry);
try {
this.pendingCollectionInputDocuments.put(doc);
} catch (InterruptedException e) {
try {
putDocument(doc);
} catch (IOException ee) {
Log.logException(ee);
}
}
}
} catch (SolrException e) {
throw new IOException(e.getMessage(), e);
}
putDocuments(docs);
this.statsDump = null;
if (MemoryControl.shortStatus()) clearCache();
}
/**

@ -70,6 +70,7 @@ import net.yacy.kelondro.util.ByteBuffer;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
public class CollectionConfiguration extends SchemaConfiguration implements Serializable {
@ -160,6 +161,20 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
return sid;
}
public SolrDocument toSolrDocument(SolrInputDocument doc) {
SolrDocument sid = new SolrDocument();
Set<String> omitFields = new HashSet<String>(3);
omitFields.add(CollectionSchema.author_sxt.getSolrFieldName());
omitFields.add(CollectionSchema.coordinate_p_0_coordinate.getSolrFieldName());
omitFields.add(CollectionSchema.coordinate_p_1_coordinate.getSolrFieldName());
for (SolrInputField field: doc) {
if (this.contains(field.getName()) && !omitFields.contains(field.getName())) { // check each field if enabled in local Solr schema
sid.setField(field.getName(), field.getValue());
}
}
return sid;
}
public SolrInputDocument metadata2solr(final URIMetadataRow md) {
final SolrInputDocument doc = new SolrInputDocument();

Loading…
Cancel
Save