enhanced search process

pull/1/head
Michael Peter Christen 13 years ago
parent a71f751cc8
commit c602eaaf46

@ -65,11 +65,15 @@ public class IndexFederated_p {
try {
while ((s0 = r.readLine()) != null) {
s0 = s0.trim();
if (s0.length() > 0) s.append(s0).append(',');
if (s0.length() > 0) {
s.append(s0).append(',');
}
}
} catch (final IOException e1) {
}
if (s.length() > 0) s.setLength(s.length() - 1);
if (s.length() > 0) {
s.setLength(s.length() - 1);
}
solrurls = s.toString().trim();
env.setConfig("federated.service.solr.indexing.url", solrurls);
env.setConfig("federated.service.solr.indexing.sharding", post.get("solr.indexing.sharding", env.getConfig("federated.service.solr.indexing.sharding", "modulo-host-md5")));
@ -104,9 +108,13 @@ public class IndexFederated_p {
final boolean c = v != null && v.equals("checked");
try {
if (entry.enabled()) {
if (!c) scheme.disable(entry.key());
if (!c) {
scheme.disable(entry.key());
}
} else {
if (c) scheme.enable(entry.key());
if (c) {
scheme.enable(entry.key());
}
}
} catch (final IOException e) {}
}
@ -136,6 +144,7 @@ public class IndexFederated_p {
scheme = new SolrScheme(new File(env.getDataPath(), "DATA/SETTINGS/" + schemename));
}
final Iterator<ConfigurationSet.Entry> i = scheme.allIterator();
int c = 0;
boolean dark = false;
ConfigurationSet.Entry entry;

@ -399,7 +399,7 @@ public final class RWIProcess extends Thread
public boolean feedingIsFinished() {
return
this.feedersTerminated.intValue() > (this.remote ? 1 : 0) &&
this.feedersAlive.get() > 0 &&
this.feedersAlive.get() == 0 &&
(!this.remote || this.remote_indexCount > 0);
}

@ -285,7 +285,8 @@ public final class SearchEvent
this.peers,
this.workTables,
5000,
deleteIfSnippetFail);
deleteIfSnippetFail,
remote);
// clean up events
SearchEventCache.cleanupEvents(false);
@ -484,17 +485,6 @@ public final class SearchEvent
}
public ResultEntry oneResult(final int item, final long timeout) {
/*
if (this.query.domType == QueryParams.Searchdom.GLOBAL || this.query.domType == QueryParams.Searchdom.CLUSTER) {
// this is a search using remote search threads. Also the local
// search thread is started as background process
if (this.localSearchThread != null && this.localSearchThread.isAlive()) {
// in case that the local search takes longer than some other
// remote search requests, wait that the local process terminates first
try {this.localSearchThread.join(300);} catch (final InterruptedException e) {}
}
}
*/
return this.resultFetcher.oneResult(item, timeout);
}

@ -80,7 +80,7 @@ public class SnippetProcess {
long urlRetrievalAllTime;
long snippetComputationAllTime;
int taketimeout;
private final boolean deleteIfSnippetFail;
private final boolean deleteIfSnippetFail, remote;
private boolean cleanupState;
public SnippetProcess(
@ -90,7 +90,8 @@ public class SnippetProcess {
final SeedDB peers,
final WorkTables workTables,
final int taketimeout,
final boolean deleteIfSnippetFail) {
final boolean deleteIfSnippetFail,
final boolean remote) {
assert query != null;
this.loader = loader;
this.rankingProcess = rankedCache;
@ -99,6 +100,7 @@ public class SnippetProcess {
this.workTables = workTables;
this.taketimeout = taketimeout;
this.deleteIfSnippetFail = deleteIfSnippetFail;
this.remote = remote;
this.cleanupState = false;
this.urlRetrievalAllTime = 0;
@ -143,15 +145,24 @@ public class SnippetProcess {
// (happens if a search pages is accessed a second time)
final long finishTime = System.currentTimeMillis() + timeout;
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEvent.Type.ONERESULT, "started, item = " + item + ", available = " + this.result.sizeAvailable(), 0, 0), false);
//Log.logInfo("SnippetProcess", "*start method for item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
// we must wait some time until the first result page is full to get enough elements for ranking
final long waittimeout = System.currentTimeMillis() + 300;
if (item == 0) {
if (this.remote && item < 10 && !this.rankingProcess.feedingIsFinished()) {
// the first 10 results have a very special timing to get most of the remote results ordered
// before they are presented on the first lines .. yes sleeps seem to be bad. but how shall we predict how long other
// peers will take until they respond?
long sleep = item == 0 ? 600 : (10 - item) * 12; // the first result takes the longest time
//Log.logInfo("SnippetProcess", "SLEEP = " + sleep);
try { Thread.sleep(sleep); } catch (final InterruptedException e1) { Log.logException(e1); }
}
if (item < 5) {
while (
(!this.rankingProcess.feedingIsFinished() || this.rankingProcess.sizeQueue() > 0) &&
this.result.sizeAvailable() < 3 &&
System.currentTimeMillis() < waittimeout &&
anyWorkerAlive()
this.result.sizeAvailable() < item + 1 &&
System.currentTimeMillis() < waittimeout &&
anyWorkerAlive()
) {
// wait a little time to get first results in the search
try { Thread.sleep(10); } catch (final InterruptedException e1) {}
@ -169,10 +180,10 @@ public class SnippetProcess {
WeakPriorityBlockingQueue.Element<ResultEntry> entry = null;
while (System.currentTimeMillis() < finishTime) {
Log.logInfo("SnippetProcess", "item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
//Log.logInfo("SnippetProcess", "item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
if (!anyWorkerAlive() && !this.rankingProcess.isAlive() && this.result.sizeAvailable() + this.rankingProcess.sizeQueue() <= item && this.rankingProcess.feedingIsFinished()) {
Log.logInfo("SnippetProcess", "interrupted result fetching; item = " + item + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished());
//Log.logInfo("SnippetProcess", "interrupted result fetching; item = " + item + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished());
break; // the fail case
}
@ -191,7 +202,7 @@ public class SnippetProcess {
// finally, if there is something, return the result
if (entry == null) {
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEvent.Type.ONERESULT, "not found, item = " + item + ", available = " + this.result.sizeAvailable(), 0, 0), false);
Log.logInfo("SnippetProcess", "NO ENTRY computed; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
//Log.logInfo("SnippetProcess", "NO ENTRY computed; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
return null;
}
final ResultEntry re = entry.getElement();
@ -339,14 +350,9 @@ public class SnippetProcess {
public void deployWorker(int deployCount, final int neededResults) {
if (this.cleanupState)
{
return; // we do not start another worker if we are in cleanup state
}
if (this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0) {
return;
}
if (this.result.sizeAvailable() >= neededResults) {
if (this.cleanupState ||
(this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0) ||
this.result.sizeAvailable() >= neededResults) {
return;
}
Worker worker;
@ -354,15 +360,13 @@ public class SnippetProcess {
this.workerThreads = new Worker[deployCount];
synchronized(this.workerThreads) {
for (int i = 0; i < this.workerThreads.length; i++) {
if (this.result.sizeAvailable() >= neededResults ||
(this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0)) {
break;
}
worker = new Worker(i, 10000, this.query.snippetCacheStrategy, this.query.snippetMatcher, neededResults);
worker.start();
this.workerThreads[i] = worker;
if (this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0) {
break;
}
if (this.result.sizeAvailable() >= neededResults) {
break;
}
if (this.rankingProcess.expectMoreRemoteReferences()) {
long wait = this.rankingProcess.waitTimeRecommendation();
if (wait > 0) {
@ -376,27 +380,23 @@ public class SnippetProcess {
// if we find dead workers, reanimate them
synchronized(this.workerThreads) {
for (int i = 0; i < this.workerThreads.length; i++) {
if (deployCount <= 0) {
break;
}
if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) {
worker = new Worker(i, 10000, this.query.snippetCacheStrategy, this.query.snippetMatcher, neededResults);
worker.start();
this.workerThreads[i] = worker;
deployCount--;
}
if (this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0) {
break;
}
if (this.result.sizeAvailable() >= neededResults) {
break;
}
if (this.rankingProcess.expectMoreRemoteReferences()) {
long wait = this.rankingProcess.waitTimeRecommendation();
if (wait > 0) {
try {Thread.sleep(wait);} catch ( InterruptedException e ) {}
if (deployCount <= 0 ||
this.result.sizeAvailable() >= neededResults ||
(this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0)) {
break;
}
if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) {
worker = new Worker(i, 10000, this.query.snippetCacheStrategy, this.query.snippetMatcher, neededResults);
worker.start();
this.workerThreads[i] = worker;
deployCount--;
}
if (this.rankingProcess.expectMoreRemoteReferences()) {
long wait = this.rankingProcess.waitTimeRecommendation();
if (wait > 0) {
try {Thread.sleep(wait);} catch ( InterruptedException e ) {}
}
}
}
}
}
}
@ -488,6 +488,8 @@ public class SnippetProcess {
//Log.logWarning("ResultFetcher", "page == null");
break; // no more available
}
this.setName(page.url().toNormalform(true, false)); // to support debugging
if (SnippetProcess.this.query.filterfailurls && SnippetProcess.this.workTables.failURLsContains(page.hash())) {
continue;
}

Loading…
Cancel
Save