fix for search process (it was aborted too early during remote search)

pull/1/head
Michael Christen 13 years ago
parent e6d51363ee
commit eff966f396

@ -126,7 +126,9 @@ public class IndexControlRWIs_p
final String keystring = post.get("keystring", "").trim();
byte[] keyhash = post.get("keyhash", "").trim().getBytes();
if (keyhash == null || keyhash.length == 0) keyhash = Word.word2hash(keystring);
if (keyhash == null || keyhash.length == 0) {
keyhash = Word.word2hash(keystring);
}
prop.putHTML("keystring", keystring);
prop.putHTML("keyhash", ASCII.String(keyhash));
@ -724,7 +726,7 @@ public class IndexControlRWIs_p
final QueryParams query =
new QueryParams(ASCII.String(keyhash), -1, filter, segment, sb.getRanking(), "IndexControlRWIs_p");
final ReferenceOrder order = new ReferenceOrder(query.ranking, UTF8.getBytes(query.targetlang));
final RWIProcess ranked = new RWIProcess(query, order, Integer.MAX_VALUE);
final RWIProcess ranked = new RWIProcess(query, order, Integer.MAX_VALUE, false);
ranked.run();
if ( ranked.filteredCount() == 0 ) {

@ -223,7 +223,7 @@ public class DocumentIndex extends Segment
final QueryParams query =
new QueryParams(querystring, count, null, this, textRankingDefault, "DocumentIndex");
final ReferenceOrder order = new ReferenceOrder(query.ranking, UTF8.getBytes(query.targetlang));
final RWIProcess rankedCache = new RWIProcess(query, order, SearchEvent.max_results_preparation);
final RWIProcess rankedCache = new RWIProcess(query, order, SearchEvent.max_results_preparation, false);
rankedCache.start();
// search is running; retrieve results

@ -84,7 +84,7 @@ public final class RWIProcess extends Thread
private final AtomicInteger maxExpectedRemoteReferences, expectedRemoteReferences,
receivedRemoteReferences;
private final WeakPriorityBlockingQueue<WordReferenceVars> stack;
private final AtomicInteger feeders;
private final AtomicInteger feedersAlive, feedersTerminated;
private final ConcurrentHashMap<String, WeakPriorityBlockingQueue<WordReferenceVars>> doubleDomCache; // key = domhash (6 bytes); value = like stack
//private final HandleSet handover; // key = urlhash; used for double-check of urls that had been handed over to search process
@ -93,7 +93,7 @@ public final class RWIProcess extends Thread
private final ReferenceOrder order;
private final long startTime;
private boolean addRunning;
private boolean fresh;
private final boolean remote;
// navigation scores
private final ScoreMap<String> hostNavigator; // a counter for the appearance of the host hash
@ -102,7 +102,7 @@ public final class RWIProcess extends Thread
private final ScoreMap<String> protocolNavigator; // a counter for protocol types
private final ScoreMap<String> filetypeNavigator; // a counter for file types
public RWIProcess(final QueryParams query, final ReferenceOrder order, final int maxentries) {
public RWIProcess(final QueryParams query, final ReferenceOrder order, final int maxentries, final boolean remote) {
// we collect the urlhashes and construct a list with urlEntry objects
// attention: if minEntries is too high, this method will not terminate within the maxTime
// sortorder: 0 = hash, 1 = url, 2 = ranking
@ -112,6 +112,7 @@ public final class RWIProcess extends Thread
this.doubleDomCache = new ConcurrentHashMap<String, WeakPriorityBlockingQueue<WordReferenceVars>>();
this.query = query;
this.order = order;
this.remote = remote;
this.remote_peerCount = 0;
this.remote_resourceSize = 0;
this.remote_indexCount = 0;
@ -132,12 +133,12 @@ public final class RWIProcess extends Thread
this.protocolNavigator = new ConcurrentScoreMap<String>();
this.filetypeNavigator = new ConcurrentScoreMap<String>();
this.ref = new ConcurrentScoreMap<String>();
this.feeders = new AtomicInteger(0);
this.feedersAlive = new AtomicInteger(0);
this.feedersTerminated = new AtomicInteger(0);
this.startTime = System.currentTimeMillis();
this.maxExpectedRemoteReferences = new AtomicInteger(0);
this.expectedRemoteReferences = new AtomicInteger(0);
this.receivedRemoteReferences = new AtomicInteger(0);
this.fresh = true;
}
public void addExpectedRemoteReferences(int x) {
@ -242,7 +243,9 @@ public final class RWIProcess extends Thread
resourceName,
is,
System.currentTimeMillis() - timer), false);
if (!local) this.receivedRemoteReferences.addAndGet(is);
if (!local) {
this.receivedRemoteReferences.addAndGet(is);
}
// iterate over normalized entries and select some that are better than currently stored
timer = System.currentTimeMillis();
@ -384,17 +387,20 @@ public final class RWIProcess extends Thread
* method to signal the incoming stack that one feeder has terminated
*/
public void oneFeederTerminated() {
final int c = this.feeders.decrementAndGet();
this.feedersTerminated.incrementAndGet();
final int c = this.feedersAlive.decrementAndGet();
assert c >= 0 : "feeders = " + c;
}
public void oneFeederStarted() {
this.feeders.addAndGet(1);
this.fresh = false;
this.feedersAlive.addAndGet(1);
}
public boolean feedingIsFinished() {
return !this.fresh && this.feeders.get() <= 0;
return
this.feedersTerminated.intValue() > (this.remote ? 1 : 0) &&
this.feedersAlive.get() > 0 &&
(!this.remote || this.remote_indexCount > 0);
}
private boolean testFlags(final WordReference ientry) {

@ -147,7 +147,7 @@ public final class SearchEvent
// initialize a ranking process that is the target for data
// that is generated concurrently from local and global search threads
this.rankingProcess = new RWIProcess(this.query, this.order, max_results_preparation);
this.rankingProcess = new RWIProcess(this.query, this.order, max_results_preparation, remote);
// start a local search concurrently
this.rankingProcess.start();

@ -146,14 +146,16 @@ public class SnippetProcess {
// we must wait some time until the first result page is full to get enough elements for ranking
final long waittimeout = System.currentTimeMillis() + 300;
if (item == 0) while (
(!this.rankingProcess.feedingIsFinished() || this.rankingProcess.sizeQueue() > 0) &&
this.result.sizeAvailable() < 3 &&
System.currentTimeMillis() < waittimeout &&
anyWorkerAlive()
) {
// wait a little time to get first results in the search
try { Thread.sleep(10); } catch (final InterruptedException e1) {}
if (item == 0) {
while (
(!this.rankingProcess.feedingIsFinished() || this.rankingProcess.sizeQueue() > 0) &&
this.result.sizeAvailable() < 3 &&
System.currentTimeMillis() < waittimeout &&
anyWorkerAlive()
) {
// wait a little time to get first results in the search
try { Thread.sleep(10); } catch (final InterruptedException e1) {}
}
}
if (this.result.sizeAvailable() > item) {
@ -167,10 +169,10 @@ public class SnippetProcess {
WeakPriorityBlockingQueue.Element<ResultEntry> entry = null;
while (System.currentTimeMillis() < finishTime) {
//Log.logInfo("SnippetProcess", "item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
Log.logInfo("SnippetProcess", "item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
if (!anyWorkerAlive() && !this.rankingProcess.isAlive() && this.result.sizeAvailable() + this.rankingProcess.sizeQueue() <= item && this.rankingProcess.feedingIsFinished()) {
//Log.logInfo("SnippetProcess", "interrupted result fetching; item = " + item + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
Log.logInfo("SnippetProcess", "interrupted result fetching; item = " + item + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished());
break; // the fail case
}
@ -181,17 +183,23 @@ public class SnippetProcess {
}
try {entry = this.result.element(item, 50);} catch (final InterruptedException e) {break;}
if (entry != null) break;
if (entry != null) {
break;
}
}
// finally, if there is something, return the result
if (entry == null) {
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEvent.Type.ONERESULT, "not found, item = " + item + ", available = " + this.result.sizeAvailable(), 0, 0), false);
Log.logInfo("SnippetProcess", "NO ENTRY computed; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
return null;
}
final ResultEntry re = entry.getElement();
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEvent.Type.ONERESULT, "retrieved, item = " + item + ", available = " + this.result.sizeAvailable() + ": " + re.urlstring(), 0, 0), false);
if (item == this.query.offset + this.query.itemsPerPage - 1) stopAllWorker(); // we don't need more
if (item == this.query.offset + this.query.itemsPerPage - 1)
{
stopAllWorker(); // we don't need more
}
return re;
}
@ -204,16 +212,24 @@ public class SnippetProcess {
public MediaSnippet oneImage(final int item) {
// always look for a next object if there are way too less
if (this.images.sizeAvailable() <= item + 10) fillImagesCache();
if (this.images.sizeAvailable() <= item + 10) {
fillImagesCache();
}
// check if we already retrieved the item
if (this.images.sizeDrained() > item) return this.images.element(item).getElement();
if (this.images.sizeDrained() > item) {
return this.images.element(item).getElement();
}
// look again if there are not enough for presentation
while (this.images.sizeAvailable() <= item) {
if (fillImagesCache() == 0) break;
if (fillImagesCache() == 0) {
break;
}
}
if (this.images.sizeAvailable() <= item) {
return null;
}
if (this.images.sizeAvailable() <= item) return null;
// now take the specific item from the image stack
return this.images.element(item).getElement();
@ -222,7 +238,9 @@ public class SnippetProcess {
private int fillImagesCache() {
final ResultEntry result = nextResult();
int c = 0;
if (result == null) return c;
if (result == null) {
return c;
}
// iterate over all images in the result
final List<MediaSnippet> imagemedia = result.mediaSnippets();
if (imagemedia != null) {
@ -232,7 +250,9 @@ public class SnippetProcess {
header = Cache.getResponseHeader(ms.href.hash());
if (header != null) {
// this does not work for all urls since some of them may not be in the cache
if (header.mime().startsWith("text") || header.mime().startsWith("application")) continue feedloop;
if (header.mime().startsWith("text") || header.mime().startsWith("application")) {
continue feedloop;
}
}
this.images.put(new ReverseElement<MediaSnippet>(ms, ms.ranking)); // remove smallest in case of overflow
c++;
@ -260,14 +280,26 @@ public class SnippetProcess {
long r = 0;
// for media search: prefer pages with many links
if (this.query.contentdom == ContentDomain.IMAGE) r += rentry.limage() << this.query.ranking.coeff_cathasimage;
if (this.query.contentdom == ContentDomain.AUDIO) r += rentry.laudio() << this.query.ranking.coeff_cathasaudio;
if (this.query.contentdom == ContentDomain.VIDEO) r += rentry.lvideo() << this.query.ranking.coeff_cathasvideo;
if (this.query.contentdom == ContentDomain.APP ) r += rentry.lapp() << this.query.ranking.coeff_cathasapp;
if (this.query.contentdom == ContentDomain.IMAGE) {
r += rentry.limage() << this.query.ranking.coeff_cathasimage;
}
if (this.query.contentdom == ContentDomain.AUDIO) {
r += rentry.laudio() << this.query.ranking.coeff_cathasaudio;
}
if (this.query.contentdom == ContentDomain.VIDEO) {
r += rentry.lvideo() << this.query.ranking.coeff_cathasvideo;
}
if (this.query.contentdom == ContentDomain.APP ) {
r += rentry.lapp() << this.query.ranking.coeff_cathasapp;
}
// prefer hit with 'prefer' pattern
if (this.query.prefer.matcher(rentry.url().toNormalform(true, true)).matches()) r += 256 << this.query.ranking.coeff_prefer;
if (this.query.prefer.matcher(rentry.title()).matches()) r += 256 << this.query.ranking.coeff_prefer;
if (this.query.prefer.matcher(rentry.url().toNormalform(true, true)).matches()) {
r += 256 << this.query.ranking.coeff_prefer;
}
if (this.query.prefer.matcher(rentry.title()).matches()) {
r += 256 << this.query.ranking.coeff_prefer;
}
// apply 'common-sense' heuristic using references
final String urlstring = rentry.url().toNormalform(true, true);
@ -276,11 +308,15 @@ public class SnippetProcess {
int tc;
for (final String urlcomp : urlcomps) {
tc = topwords.get(urlcomp);
if (tc > 0) r += Math.max(1, tc) << this.query.ranking.coeff_urlcompintoplist;
if (tc > 0) {
r += Math.max(1, tc) << this.query.ranking.coeff_urlcompintoplist;
}
}
for (final String descrcomp : descrcomps) {
tc = topwords.get(descrcomp);
if (tc > 0) r += Math.max(1, tc) << this.query.ranking.coeff_descrcompintoplist;
if (tc > 0) {
r += Math.max(1, tc) << this.query.ranking.coeff_descrcompintoplist;
}
}
// apply query-in-result matching
@ -290,8 +326,12 @@ public class SnippetProcess {
byte[] queryhash;
while (shi.hasNext()) {
queryhash = shi.next();
if (urlcomph.has(queryhash)) r += 256 << this.query.ranking.coeff_appurl;
if (descrcomph.has(queryhash)) r += 256 << this.query.ranking.coeff_app_dc_title;
if (urlcomph.has(queryhash)) {
r += 256 << this.query.ranking.coeff_appurl;
}
if (descrcomph.has(queryhash)) {
r += 256 << this.query.ranking.coeff_app_dc_title;
}
}
return r;
@ -299,9 +339,16 @@ public class SnippetProcess {
public void deployWorker(int deployCount, final int neededResults) {
if (this.cleanupState) return; // we do not start another worker if we are in cleanup state
if (this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0) return;
if (this.result.sizeAvailable() >= neededResults) return;
if (this.cleanupState)
{
return; // we do not start another worker if we are in cleanup state
}
if (this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0) {
return;
}
if (this.result.sizeAvailable() >= neededResults) {
return;
}
Worker worker;
if (this.workerThreads == null) {
this.workerThreads = new Worker[deployCount];
@ -310,11 +357,17 @@ public class SnippetProcess {
worker = new Worker(i, 10000, this.query.snippetCacheStrategy, this.query.snippetMatcher, neededResults);
worker.start();
this.workerThreads[i] = worker;
if (this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0) break;
if (this.result.sizeAvailable() >= neededResults) break;
if (this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0) {
break;
}
if (this.result.sizeAvailable() >= neededResults) {
break;
}
if (this.rankingProcess.expectMoreRemoteReferences()) {
long wait = this.rankingProcess.waitTimeRecommendation();
if (wait > 0)try {Thread.sleep(wait);} catch ( InterruptedException e ) {}
if (wait > 0) {
try {Thread.sleep(wait);} catch ( InterruptedException e ) {}
}
}
}
}
@ -323,18 +376,26 @@ public class SnippetProcess {
// if we find dead workers, reanimate them
synchronized(this.workerThreads) {
for (int i = 0; i < this.workerThreads.length; i++) {
if (deployCount <= 0) break;
if (deployCount <= 0) {
break;
}
if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) {
worker = new Worker(i, 10000, this.query.snippetCacheStrategy, this.query.snippetMatcher, neededResults);
worker.start();
this.workerThreads[i] = worker;
deployCount--;
}
if (this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0) break;
if (this.result.sizeAvailable() >= neededResults) break;
if (this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0) {
break;
}
if (this.result.sizeAvailable() >= neededResults) {
break;
}
if (this.rankingProcess.expectMoreRemoteReferences()) {
long wait = this.rankingProcess.waitTimeRecommendation();
if (wait > 0)try {Thread.sleep(wait);} catch ( InterruptedException e ) {}
if (wait > 0) {
try {Thread.sleep(wait);} catch ( InterruptedException e ) {}
}
}
}
}
@ -344,7 +405,9 @@ public class SnippetProcess {
public void stopAllWorker() {
synchronized(this.workerThreads) {
for (int i = 0; i < this.workerThreads.length; i++) {
if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) continue;
if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) {
continue;
}
this.workerThreads[i].pleaseStop();
this.workerThreads[i].interrupt();
}
@ -352,12 +415,16 @@ public class SnippetProcess {
}
private boolean anyWorkerAlive() {
if (this.workerThreads == null || this.workerThreads.length == 0) return false;
if (this.workerThreads == null || this.workerThreads.length == 0) {
return false;
}
synchronized(this.workerThreads) {
for (final Worker workerThread : this.workerThreads) {
if ((workerThread != null) &&
(workerThread.isAlive()) &&
(workerThread.busytime() < 10000)) return true;
(workerThread.busytime() < 10000)) {
return true;
}
}
}
return false;
@ -421,23 +488,32 @@ public class SnippetProcess {
//Log.logWarning("ResultFetcher", "page == null");
break; // no more available
}
if (SnippetProcess.this.query.filterfailurls && SnippetProcess.this.workTables.failURLsContains(page.hash())) continue;
if (SnippetProcess.this.query.filterfailurls && SnippetProcess.this.workTables.failURLsContains(page.hash())) {
continue;
}
// in case that we have an attached solr, we load also the solr document
String solrContent = null;
if (this.solr != null) {
SolrDocument sd = null;
final SolrDocumentList sdl = this.solr.get("id:" + ASCII.String(page.hash()), 0, 1);
if (sdl.size() > 0) sd = sdl.get(0);
if (sd != null) solrContent = this.solr.getScheme().solrGetText(sd);
if (sdl.size() > 0) {
sd = sdl.get(0);
}
if (sd != null) {
solrContent = this.solr.getScheme().solrGetText(sd);
}
}
loops++;
resultEntry = fetchSnippet(page, solrContent, this.cacheStrategy); // does not fetch snippets if snippetMode == 0
if (resultEntry == null) continue; // the entry had some problems, cannot be used
//final String rawLine = resultEntry.textSnippet() == null ? null : resultEntry.textSnippet().getLineRaw();
//System.out.println("***SNIPPET*** raw='" + rawLine + "', pattern='" + this.snippetPattern.toString() + "'");
//if (rawLine != null && !this.snippetPattern.matcher(rawLine).matches()) continue;
if (resultEntry == null)
{
continue; // the entry had some problems, cannot be used
//final String rawLine = resultEntry.textSnippet() == null ? null : resultEntry.textSnippet().getLineRaw();
//System.out.println("***SNIPPET*** raw='" + rawLine + "', pattern='" + this.snippetPattern.toString() + "'");
//if (rawLine != null && !this.snippetPattern.matcher(rawLine).matches()) continue;
}
//if (result.contains(resultEntry)) continue;
SnippetProcess.this.urlRetrievalAllTime += resultEntry.dbRetrievalTime;
@ -449,7 +525,9 @@ public class SnippetProcess {
ranking += postRanking(resultEntry, SnippetProcess.this.rankingProcess.getTopicNavigator(10));
resultEntry.ranking = ranking;
SnippetProcess.this.result.put(new ReverseElement<ResultEntry>(resultEntry, ranking)); // remove smallest in case of overflow
if (nav_topics) SnippetProcess.this.rankingProcess.addTopics(resultEntry);
if (nav_topics) {
SnippetProcess.this.rankingProcess.addTopics(resultEntry);
}
}
//System.out.println("FINISHED WORKER " + id + " FOR " + this.neededResults + " RESULTS, loops = " + loops);
} catch (final Exception e) {
@ -481,7 +559,9 @@ public class SnippetProcess {
// find the url entry
long startTime = System.currentTimeMillis();
if (page == null) return null;
if (page == null) {
return null;
}
final long dbRetrievalTime = System.currentTimeMillis() - startTime;
if (cacheStrategy == null) {
@ -525,7 +605,9 @@ public class SnippetProcess {
} else {
// problems with snippet fetch
final String reason = "no text snippet; errorCode = " + snippet.getErrorCode();
if (this.deleteIfSnippetFail) this.workTables.failURLsRegisterMissingWord(this.query.getSegment().termIndex(), page.url(), this.query.queryHashes, reason);
if (this.deleteIfSnippetFail) {
this.workTables.failURLsRegisterMissingWord(this.query.getSegment().termIndex(), page.url(), this.query.queryHashes, reason);
}
Log.logInfo("SEARCH", "sorted out url " + page.url().toNormalform(true, false) + " during search: " + reason);
return null;
}
@ -544,7 +626,9 @@ public class SnippetProcess {
} else {
// problems with snippet fetch
final String reason = "no media snippet";
if (this.deleteIfSnippetFail) this.workTables.failURLsRegisterMissingWord(this.query.getSegment().termIndex(), page.url(), this.query.queryHashes, reason);
if (this.deleteIfSnippetFail) {
this.workTables.failURLsRegisterMissingWord(this.query.getSegment().termIndex(), page.url(), this.query.queryHashes, reason);
}
Log.logInfo("SEARCH", "sorted out url " + page.url().toNormalform(true, false) + " during search: " + reason);
return null;
}

Loading…
Cancel
Save