removed 'later' tactic because it used too much RAM, reduced number of

soft commits, reduced caching size of search events, ensured that solr
results are processed before connection is closed to keep that stuff not
too long in RAM
pull/1/head
Michael Peter Christen 12 years ago
parent 5344a1c5f7
commit 0c1a018bbd

@ -314,7 +314,7 @@
searcher to be opened to make those changes visible.
-->
<autoCommit>
<maxTime>15000</maxTime>
<maxTime>10000</maxTime>
<openSearcher>false</openSearcher>
</autoCommit>
@ -325,7 +325,7 @@
-->
<!--
<autoSoftCommit>
<maxTime>1000</maxTime>
<maxTime>3000</maxTime>
</autoSoftCommit>
-->

@ -146,7 +146,7 @@ public final class crawlReceipt {
if ("fill".equals(result)) try {
// put new entry into database
sb.index.fulltext().putMetadataLater(entry);
sb.index.fulltext().putMetadata(entry);
ResultURLs.stack(ASCII.String(entry.url().hash()), entry.url().getHost(), youare.getBytes(), iam.getBytes(), EventOrigin.REMOTE_RECEIPTS);
sb.crawlQueues.delegatedURL.remove(entry.hash()); // the delegated work has been done
if (log.isInfo()) log.logInfo("crawlReceipt: RECEIVED RECEIPT from " + otherPeerName + " for URL " + ASCII.String(entry.hash()) + ":" + entry.url().toNormalform(false));

@ -154,7 +154,7 @@ public final class transferURL {
// write entry to database
if (Network.log.isFine()) Network.log.logFine("Accepting URL from peer " + otherPeerName + ": " + lEntry.url().toNormalform(true));
try {
sb.index.fulltext().putMetadataLater(lEntry);
sb.index.fulltext().putMetadata(lEntry);
ResultURLs.stack(ASCII.String(lEntry.url().hash()), lEntry.url().getHost(), iam.getBytes(), iam.getBytes(), EventOrigin.DHT_TRANSFER);
if (Network.log.isFine()) Network.log.logFine("transferURL: received URL '" + lEntry.url().toNormalform(false) + "' from peer " + otherPeerName);
received++;

@ -82,10 +82,13 @@ public class RemoteSolrConnector extends SolrServerConnector implements SolrConn
ResponseParser responseParser = new XMLResponseParser();
request.setResponseParser(responseParser);
long t = System.currentTimeMillis();
NamedList<Object> result;
NamedList<Object> result = null;
try {
result = server.request(request);
} catch (Throwable e) {
throw new IOException(e.getMessage());
/*
Log.logException(e);
server = instance.getServer(this.corename);
super.init(server);
try {
@ -93,6 +96,7 @@ public class RemoteSolrConnector extends SolrServerConnector implements SolrConn
} catch (Throwable e1) {
throw new IOException(e1.getMessage());
}
*/
}
QueryResponse response = new QueryResponse(result, server);
response.setElapsedTime(System.currentTimeMillis() - t);

@ -860,7 +860,7 @@ public final class Protocol {
for (URIMetadataRow entry: storeDocs) {
try {
event.query.getSegment().fulltext().putMetadataLater(entry);
event.query.getSegment().fulltext().putMetadata(entry);
} catch (IOException e) {
Log.logException(e);
}
@ -1079,6 +1079,8 @@ public final class Protocol {
target = event.peers.mySeed();
localsearch = false;
}
RemoteInstance instance = null;
SolrConnector solrConnector = null;
SolrDocumentList docList = null;
QueryResponse rsp = null;
if (localsearch) {
@ -1093,8 +1095,8 @@ public final class Protocol {
} else {
try {
String address = target == event.peers.mySeed() ? "localhost:" + target.getPort() : target.getPublicAddress();
RemoteInstance instance = new RemoteInstance("http://" + address, null, "solr"); // this is a 'patch configuration' which considers 'solr' as default collection
SolrConnector solrConnector = new RemoteSolrConnector(instance, "solr");
instance = new RemoteInstance("http://" + address, null, "solr"); // this is a 'patch configuration' which considers 'solr' as default collection
solrConnector = new RemoteSolrConnector(instance, "solr");
rsp = solrConnector.getResponseByParams(solrQuery);
docList = rsp.getResults();
solrConnector.close();
@ -1105,7 +1107,7 @@ public final class Protocol {
return -1;
}
}
// evaluate facets
Map<String, ReversibleScoreMap<String>> facets = new HashMap<String, ReversibleScoreMap<String>>(event.query.facetfields.size());
for (String field: event.query.facetfields) {
@ -1149,7 +1151,7 @@ public final class Protocol {
Network.log.logInfo("SEARCH (solr), returned " + docList.size() + " out of " + docList.getNumFound() + " documents from " + (target == null ? "shard" : ("peer " + target.hash + ":" + target.getName())));
int term = count;
final Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(docList.size());
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(docList.size());
for (final SolrDocument doc: docList) {
if ( term-- <= 0 ) {
break; // do not process more that requested (in case that evil peers fill us up with rubbish)
@ -1213,15 +1215,22 @@ public final class Protocol {
Network.log.logInfo("local search (solr): localpeer sent " + container.get(0).size() + "/" + docList.size() + " references");
} else {
// learn the documents, this can be done later
for (SolrInputDocument doc: docs) {
event.query.getSegment().fulltext().putDocumentLater(doc);
try {
event.query.getSegment().fulltext().putDocuments(docs); docs.clear(); docs = null;
event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, (int) docList.getNumFound());
event.addFinalize();
event.addExpectedRemoteReferences(-count);
Network.log.logInfo("remote search (solr): peer " + target.getName() + " sent " + (container.size() == 0 ? 0 : container.get(0).size()) + "/" + docList.size() + " references");
} catch (IOException e) {
Log.logException(e);
}
event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, (int) docList.getNumFound());
event.addFinalize();
event.addExpectedRemoteReferences(-count);
Network.log.logInfo("remote search (solr): peer " + target.getName() + " sent " + (container.size() == 0 ? 0 : container.get(0).size()) + "/" + docList.size() + " references");
}
return docList.size();
final int dls = docList.size();
docList.clear();
docList = null;
if (solrConnector != null) solrConnector.close();
if (instance != null) instance.close();
return dls;
}
public static Map<String, String> permissionMessage(final SeedDB seedDB, final String targetHash) {

@ -81,8 +81,6 @@ import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import org.apache.pdfbox.cos.COSName;
import org.apache.pdfbox.pdmodel.font.PDFont;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
@ -950,21 +948,6 @@ public final class Switchboard extends serverSwitch {
10000,
Long.MAX_VALUE),
60000); // all 5 Minutes, wait 1 minute until first run
deployThread(
SwitchboardConstants.SEARCHRESULT,
"Search Result Flush",
"A thread that stores search results from other peers into the own index.",
null,
new InstantBusyThread(
this,
SwitchboardConstants.SEARCHRESULT_METHOD_START,
SwitchboardConstants.SEARCHRESULT_METHOD_JOBCOUNT,
SwitchboardConstants.SEARCHRESULT_METHOD_FREEMEM,
20000,
Long.MAX_VALUE,
0,
Long.MAX_VALUE),
30000);
deployThread(
SwitchboardConstants.SURROGATES,
"Surrogates",
@ -1998,24 +1981,9 @@ public final class Switchboard extends serverSwitch {
return false;
}
public int searchresultQueueSize() {
return this.index.fulltext().pendingInputDocuments();
}
public void searchresultFreeMem() {
// do nothing
}
public boolean searchresultProcess() {
int count = Math.min(100, 1 + this.index.fulltext().pendingInputDocuments() / 100);
if (MemoryControl.shortStatus()) count = this.index.fulltext().pendingInputDocuments();
try {
return this.index.fulltext().processPendingInputDocuments(count) > 0;
} catch (IOException e) {
return false;
}
}
public int cleanupJobSize() {
int c = 1; // "es gibt immer was zu tun"

@ -98,8 +98,6 @@ public final class Fulltext {
private InstanceMirror solrInstances;
private final CollectionConfiguration collectionConfiguration;
private final WebgraphConfiguration webgraphConfiguration;
private final LinkedBlockingQueue<URIMetadataRow> pendingCollectionInputRows;
private final LinkedBlockingQueue<SolrInputDocument> pendingCollectionInputDocuments;
protected Fulltext(final File segmentPath, final CollectionConfiguration collectionConfiguration, final WebgraphConfiguration webgraphConfiguration) {
this.segmentPath = segmentPath;
@ -110,8 +108,6 @@ public final class Fulltext {
this.solrInstances = new InstanceMirror();
this.collectionConfiguration = collectionConfiguration;
this.webgraphConfiguration = webgraphConfiguration;
this.pendingCollectionInputRows = new LinkedBlockingQueue<URIMetadataRow>();
this.pendingCollectionInputDocuments = new LinkedBlockingQueue<SolrInputDocument>();
}
/**
@ -281,7 +277,7 @@ public final class Fulltext {
long t = System.currentTimeMillis();
if (t - this.collectionSizeLastAccess < 1000) return this.collectionSizeLastValue;
long size = this.urlIndexFile == null ? 0 : this.urlIndexFile.size();
size += this.getDefaultConnector().getSize();
size += this.solrInstances.getDefaultMirrorConnector().getSize();
this.collectionSizeLastAccess = t;
this.collectionSizeLastValue = size;
return size;
@ -304,7 +300,11 @@ public final class Fulltext {
this.solrInstances.close();
}
private long lastCommit = 0;
public void commit(boolean softCommit) {
long t = System.currentTimeMillis();
if (lastCommit + 10000 > t) return;
lastCommit = t;
getDefaultConnector().commit(softCommit);
getWebgraphConnector().commit(softCommit);
}
@ -321,24 +321,7 @@ public final class Fulltext {
}
public DigestURI getURL(final byte[] urlHash) {
if (urlHash == null) return null;
// try to get the data from the delayed cache; this happens if we retrieve this from a fresh search result
String u = ASCII.String(urlHash);
for (URIMetadataRow entry: this.pendingCollectionInputRows) {
if (u.equals(ASCII.String(entry.hash()))) {
if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {} // migration
return entry.url();
}
}
for (SolrInputDocument doc: this.pendingCollectionInputDocuments) {
if (u.equals(doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))) {
if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {} // migration
String url = (String) doc.getFieldValue(CollectionSchema.sku.getSolrFieldName());
if (url != null) try {return new DigestURI(url);} catch (MalformedURLException e) {}
}
}
if (urlHash == null || this.getDefaultConnector() == null) return null;
String x;
try {
@ -372,23 +355,6 @@ public final class Fulltext {
private URIMetadataNode getMetadata(final byte[] urlHash, WordReferenceVars wre, long weight) {
String u = ASCII.String(urlHash);
// try to get the data from the delayed cache; this happens if we retrieve this from a fresh search result
for (URIMetadataRow entry: this.pendingCollectionInputRows) {
if (u.equals(ASCII.String(entry.hash()))) {
if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {} // migration
SolrDocument sd = this.collectionConfiguration.toSolrDocument(getDefaultConfiguration().metadata2solr(entry));
return new URIMetadataNode(sd, wre, weight);
}
}
for (SolrInputDocument doc: this.pendingCollectionInputDocuments) {
if (u.equals(doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))) {
if (this.urlIndexFile != null) try {this.urlIndexFile.remove(urlHash);} catch (IOException e) {} // migration
SolrDocument sd = this.collectionConfiguration.toSolrDocument(doc);
return new URIMetadataNode(sd, wre, weight);
}
}
// get the metadata from Solr
try {
SolrDocument doc = this.getDefaultConnector().getDocumentById(u);
@ -443,63 +409,6 @@ public final class Fulltext {
if (MemoryControl.shortStatus()) clearCache();
}
public void putDocumentLater(final SolrInputDocument doc) {
if (MemoryControl.shortStatus()) {
try {
putDocument(doc);
return;
} catch (IOException ee) {
Log.logException(ee);
}
}
try {
this.pendingCollectionInputDocuments.put(doc);
} catch (InterruptedException e) {
try {
putDocument(doc);
} catch (IOException ee) {
Log.logException(ee);
}
}
}
public int pendingInputDocuments() {
return this.pendingCollectionInputRows.size() + this.pendingCollectionInputDocuments.size();
}
public int processPendingInputDocuments(int count) throws IOException {
if (count == 0) return 0;
boolean shortMemStatus = MemoryControl.shortStatus();
if (!shortMemStatus || this.pendingCollectionInputDocuments.size() < count) {
pendingRows2Docs(count);
}
SolrInputDocument doc;
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(count);
while ((shortMemStatus || count-- > 0) && (doc = this.pendingCollectionInputDocuments.poll()) != null) {
docs.add(doc);
}
if (docs.size() > 0) this.putDocuments(docs);
return docs.size();
}
private void pendingRows2Docs(int count) throws IOException {
URIMetadataRow entry;
while (count-- > 0 && (entry = this.pendingCollectionInputRows.poll()) != null) {
byte[] idb = entry.hash();
String id = ASCII.String(idb);
try {
if (this.urlIndexFile != null) this.urlIndexFile.remove(idb);
// because node entries are richer than metadata entries we must check if they exist to prevent that they are overwritten
SolrDocument sd = this.getDefaultConnector().getDocumentById(id);
if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) {
putDocumentLater(getDefaultConfiguration().metadata2solr(entry));
}
} catch (SolrException e) {
throw new IOException(e.getMessage(), e);
}
}
}
public void putEdges(final Collection<SolrInputDocument> edges) throws IOException {
if (edges == null || edges.size() == 0) return;
try {
@ -528,22 +437,6 @@ public final class Fulltext {
if (MemoryControl.shortStatus()) clearCache();
}
public void putMetadataLater(final URIMetadataRow entry) throws IOException {
if (MemoryControl.shortStatus()) {
putMetadata(entry);
return;
}
try {
this.pendingCollectionInputRows.put(entry);
} catch (InterruptedException e) {
try {
putMetadata(entry);
} catch (IOException ee) {
Log.logException(ee);
}
}
}
/**
* using a fragment of the url hash (6 bytes: bytes 6 to 11) it is possible to address all urls from a specific domain
* here such a fragment can be used to delete all these domains at once
@ -704,12 +597,6 @@ public final class Fulltext {
@Deprecated
public boolean exists(final String urlHash) {
if (urlHash == null) return false;
for (URIMetadataRow entry: this.pendingCollectionInputRows) {
if (urlHash.equals(ASCII.String(entry.hash()))) return true;
}
for (SolrInputDocument doc: this.pendingCollectionInputDocuments) {
if (urlHash.equals(doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))) return true;
}
try {
if (this.getDefaultConnector().existsById(urlHash)) return true;
} catch (final Throwable e) {
@ -729,16 +616,6 @@ public final class Fulltext {
HashSet<String> e = new HashSet<String>();
if (ids == null || ids.size() == 0) return e;
Collection<String> idsC = new HashSet<String>();
for (String id: ids) {
for (URIMetadataRow entry: this.pendingCollectionInputRows) {
if (id.equals(ASCII.String(entry.hash()))) {e.add(id); continue;}
}
for (SolrInputDocument doc: this.pendingCollectionInputDocuments) {
if (id.equals(doc.getFieldValue(CollectionSchema.id.getSolrFieldName()))) {e.add(id); continue;}
}
if (this.urlIndexFile != null && this.urlIndexFile.has(ASCII.getBytes(id))) {e.add(id); continue;}
idsC.add(id);
}
try {
Set<String> e1 = this.getDefaultConnector().existsByIds(idsC);
e.addAll(e1);

@ -284,6 +284,7 @@ public class Segment {
* @return the number of references for this word.
*/
public int getWordCountGuess(String word) {
if (this.fulltext.getDefaultConnector() == null) return 0;
if (word == null || word.indexOf(':') >= 0 || word.indexOf(' ') >= 0 || word.indexOf('/') >= 0) return 0;
if (this.termIndex != null) {
int count = this.termIndex.count(Word.word2hash(word));

@ -194,7 +194,7 @@ public final class SearchEvent {
this.workTables = workTables;
this.query = query;
this.loader = loader;
this.nodeStack = new WeakPriorityBlockingQueue<URIMetadataNode>(300, false);
this.nodeStack = new WeakPriorityBlockingQueue<URIMetadataNode>(100, false);
this.maxExpectedRemoteReferences = new AtomicInteger(0);
this.expectedRemoteReferences = new AtomicInteger(0);
// prepare configured search navigation

Loading…
Cancel
Save