From 1168d09de8d7fe63c6fea9cae3d9737614a15d0c Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Thu, 1 Nov 2012 17:40:06 +0100 Subject: [PATCH] more refactoring - integrated the code of SnippetProcess into SearchEvent --- htroot/gsa/searchresult.java | 4 +- htroot/solr/select.java | 4 +- htroot/yacy/search.java | 6 +- htroot/yacysearch.java | 9 +- source/net/yacy/search/query/SearchEvent.java | 336 ++++++++++++++-- .../yacy/search/query/SearchEventCache.java | 2 +- .../net/yacy/search/query/SnippetProcess.java | 376 ------------------ .../net/yacy/search/query/SnippetWorker.java | 22 +- 8 files changed, 327 insertions(+), 432 deletions(-) delete mode 100644 source/net/yacy/search/query/SnippetProcess.java diff --git a/htroot/gsa/searchresult.java b/htroot/gsa/searchresult.java index 25b359b5d..57e1be997 100644 --- a/htroot/gsa/searchresult.java +++ b/htroot/gsa/searchresult.java @@ -37,7 +37,7 @@ import net.yacy.kelondro.logging.Log; import net.yacy.search.Switchboard; import net.yacy.search.query.AccessTracker; import net.yacy.search.query.QueryParams; -import net.yacy.search.query.SnippetProcess; +import net.yacy.search.query.SearchEvent; import net.yacy.server.serverObjects; import net.yacy.server.serverSwitch; @@ -115,7 +115,7 @@ public class searchresult { post.put("hl.alternateField", YaCySchema.description.getSolrFieldName()); post.put("hl.simple.pre", ""); post.put("hl.simple.post", ""); - post.put("hl.fragsize", Integer.toString(SnippetProcess.SNIPPET_MAX_LENGTH)); + post.put("hl.fragsize", Integer.toString(SearchEvent.SNIPPET_MAX_LENGTH)); GSAResponseWriter.Sort sort = new GSAResponseWriter.Sort(post.get(CommonParams.SORT, "")); String sorts = sort.toSolr(); if (sorts == null) { diff --git a/htroot/solr/select.java b/htroot/solr/select.java index 42c732680..533ec4e46 100644 --- a/htroot/solr/select.java +++ b/htroot/solr/select.java @@ -40,7 +40,7 @@ import net.yacy.kelondro.logging.Log; import net.yacy.search.Switchboard; import net.yacy.search.SwitchboardConstants; import net.yacy.search.query.AccessTracker; -import net.yacy.search.query.SnippetProcess; +import net.yacy.search.query.SearchEvent; import net.yacy.server.serverObjects; import net.yacy.server.serverSwitch; @@ -166,7 +166,7 @@ public class select { post.put("hl.fl", "text_t,h1,h2"); post.put("hl.simple.pre", ""); post.put("hl.simple.post", ""); - post.put("hl.fragsize", Integer.toString(SnippetProcess.SNIPPET_MAX_LENGTH)); + post.put("hl.fragsize", Integer.toString(SearchEvent.SNIPPET_MAX_LENGTH)); } // get the embedded connector diff --git a/htroot/yacy/search.java b/htroot/yacy/search.java index d559e7d8b..4fff73976 100644 --- a/htroot/yacy/search.java +++ b/htroot/yacy/search.java @@ -327,7 +327,7 @@ public final class search { joincount = theSearch.rankingProcess.rwiAvailableCount() - theSearch.rankingProcess.getMissCount() - theSearch.getSortOutCount(); prop.put("joincount", Integer.toString(joincount)); if (joincount != 0) { - accu = theSearch.result().completeResults(maxtime); + accu = theSearch.completeResults(maxtime); } if (joincount <= 0 || abstracts.isEmpty()) { prop.put("indexcount", ""); @@ -421,8 +421,8 @@ public final class search { theQuery.remotepeer = client == null ? null : sb.peers.lookupByIP(Domains.dnsResolve(client), -1, true, false, false); theQuery.resultcount = (theSearch == null) ? 0 : joincount; theQuery.searchtime = System.currentTimeMillis() - timestamp; - theQuery.urlretrievaltime = (theSearch == null) ? 0 : theSearch.result().getURLRetrievalTime(); - theQuery.snippetcomputationtime = (theSearch == null) ? 0 : theSearch.result().getSnippetComputationTime(); + theQuery.urlretrievaltime = (theSearch == null) ? 0 : theSearch.getURLRetrievalTime(); + theQuery.snippetcomputationtime = (theSearch == null) ? 0 : theSearch.getSnippetComputationTime(); AccessTracker.add(AccessTracker.Location.remote, theQuery); // update the search tracker diff --git a/htroot/yacysearch.java b/htroot/yacysearch.java index e6aff9cb0..f574bd212 100644 --- a/htroot/yacysearch.java +++ b/htroot/yacysearch.java @@ -81,7 +81,6 @@ import net.yacy.search.query.QueryParams; import net.yacy.search.query.SearchEvent; import net.yacy.search.query.SearchEventCache; import net.yacy.search.query.SearchEventType; -import net.yacy.search.query.SnippetProcess; import net.yacy.search.ranking.RankingProfile; import net.yacy.search.snippet.TextSnippet; import net.yacy.server.serverCore; @@ -709,7 +708,7 @@ public class yacysearch { try { Pattern.compile(urlmask); } catch ( final PatternSyntaxException ex ) { - SnippetProcess.log.logWarning("Illegal URL mask, not a valid regex: " + urlmask); + SearchEvent.log.logWarning("Illegal URL mask, not a valid regex: " + urlmask); prop.put("urlmaskerror", 1); prop.putHTML("urlmaskerror_urlmask", urlmask); urlmask = ".*"; @@ -718,7 +717,7 @@ public class yacysearch { try { Pattern.compile(prefermask); } catch ( final PatternSyntaxException ex ) { - SnippetProcess.log.logWarning("Illegal prefer mask, not a valid regex: " + prefermask); + SearchEvent.log.logWarning("Illegal prefer mask, not a valid regex: " + prefermask); prop.put("prefermaskerror", 1); prop.putHTML("prefermaskerror_prefermask", prefermask); prefermask = ""; @@ -854,8 +853,8 @@ public class yacysearch { - theSearch.rankingProcess.getMissCount() + theSearch.rankingProcess.getRemoteIndexCount(); theQuery.searchtime = System.currentTimeMillis() - timestamp; - theQuery.urlretrievaltime = theSearch.result().getURLRetrievalTime(); - theQuery.snippetcomputationtime = theSearch.result().getSnippetComputationTime(); + theQuery.urlretrievaltime = theSearch.getURLRetrievalTime(); + theQuery.snippetcomputationtime = theSearch.getSnippetComputationTime(); AccessTracker.add(AccessTracker.Location.local, theQuery); // check suggestions diff --git a/source/net/yacy/search/query/SearchEvent.java b/source/net/yacy/search/query/SearchEvent.java index 9ab27efb9..0b25d4566 100644 --- a/source/net/yacy/search/query/SearchEvent.java +++ b/source/net/yacy/search/query/SearchEvent.java @@ -47,13 +47,17 @@ import net.yacy.cora.lod.JenaTripleStore; import net.yacy.cora.lod.vocabulary.Tagging; import net.yacy.cora.lod.vocabulary.YaCyMetadata; import net.yacy.cora.order.Base64Order; +import net.yacy.cora.protocol.ResponseHeader; import net.yacy.cora.protocol.Scanner; import net.yacy.cora.sorting.ClusteredScoreMap; import net.yacy.cora.sorting.ConcurrentScoreMap; import net.yacy.cora.sorting.ScoreMap; import net.yacy.cora.sorting.WeakPriorityBlockingQueue; +import net.yacy.cora.sorting.WeakPriorityBlockingQueue.Element; import net.yacy.cora.sorting.WeakPriorityBlockingQueue.ReverseElement; +import net.yacy.cora.storage.HandleSet; import net.yacy.cora.util.SpaceExceededException; +import net.yacy.crawler.data.Cache; import net.yacy.data.WorkTables; import net.yacy.document.Condenser; import net.yacy.document.LargeNumberCache; @@ -64,6 +68,7 @@ import net.yacy.kelondro.data.word.Word; import net.yacy.kelondro.data.word.WordReference; import net.yacy.kelondro.data.word.WordReferenceFactory; import net.yacy.kelondro.data.word.WordReferenceVars; +import net.yacy.kelondro.index.RowHandleSet; import net.yacy.kelondro.logging.Log; import net.yacy.kelondro.rwi.ReferenceContainer; import net.yacy.kelondro.util.Bitfield; @@ -76,18 +81,22 @@ import net.yacy.repository.LoaderDispatcher; import net.yacy.repository.Blacklist.BlacklistType; import net.yacy.search.EventTracker; import net.yacy.search.Switchboard; +import net.yacy.search.snippet.MediaSnippet; import net.yacy.search.snippet.ResultEntry; public final class SearchEvent { + + public static Log log = new Log("SEARCH"); private static final long maxWaitPerResult = 30; - + public static final int SNIPPET_MAX_LENGTH = 220; + private final static int SNIPPET_WORKER_THREADS = Math.max(4, Runtime.getRuntime().availableProcessors() * 2); + private long eventTime; protected QueryParams query; public final SeedDB peers; - private final WorkTables workTables; + final WorkTables workTables; public final RankingProcess rankingProcess; // ordered search results, grows dynamically as all the query threads enrich this container - private final SnippetProcess resultFetcher; public final SecondarySearchSuperviser secondarySearchSuperviser; public final List primarySearchThreadsL; protected Thread[] secondarySearchThreads; @@ -104,9 +113,19 @@ public final class SearchEvent { private final ScoreMap namespaceNavigator; // a counter for name spaces private final ScoreMap protocolNavigator; // a counter for protocol types private final ScoreMap filetypeNavigator; // a counter for file types - protected final WeakPriorityBlockingQueue nodeStack; - + private final WeakPriorityBlockingQueue images; // container to sort images by size + protected final WeakPriorityBlockingQueue result; + protected final LoaderDispatcher loader; + protected final HandleSet snippetFetchWordHashes; // a set of word hashes that are used to match with the snippets + protected final boolean deleteIfSnippetFail; + protected SnippetWorker[] workerThreads; + protected long urlRetrievalAllTime; + protected long snippetComputationAllTime; + private final boolean remote; + private boolean cleanupState; + private int resultCounter = 0; + protected SearchEvent( final QueryParams query, final SeedDB peers, @@ -126,6 +145,7 @@ public final class SearchEvent { this.peers = peers; this.workTables = workTables; this.query = query; + this.loader = loader; this.nodeStack = new WeakPriorityBlockingQueue(300, false); this.maxExpectedRemoteReferences = new AtomicInteger(0); @@ -277,16 +297,33 @@ public final class SearchEvent { } // start worker threads to fetch urls and snippets - this.resultFetcher = - new SnippetProcess( - this, - loader, - this.query, - this.peers, - this.workTables, - deleteIfSnippetFail, - remote); + this.deleteIfSnippetFail = deleteIfSnippetFail; + this.remote = remote; + this.cleanupState = false; + this.urlRetrievalAllTime = 0; + this.snippetComputationAllTime = 0; + this.result = new WeakPriorityBlockingQueue(Math.max(1000, 10 * query.itemsPerPage()), true); // this is the result, enriched with snippets, ranked and ordered by ranking + this.images = new WeakPriorityBlockingQueue(Math.max(1000, 10 * query.itemsPerPage()), true); + + // snippets do not need to match with the complete query hashes, + // only with the query minus the stopwords which had not been used for the search + HandleSet filtered; + try { + filtered = RowHandleSet.joinConstructive(query.query_include_hashes, Switchboard.stopwordHashes); + } catch (final SpaceExceededException e) { + Log.logException(e); + filtered = new RowHandleSet(query.query_include_hashes.keylen(), query.query_include_hashes.comparator(), 0); + } + this.snippetFetchWordHashes = query.query_include_hashes.clone(); + if (filtered != null && !filtered.isEmpty()) { + this.snippetFetchWordHashes.excludeDestructive(Switchboard.stopwordHashes); + } + // start worker threads to fetch urls and snippets + this.workerThreads = null; + deployWorker(Math.min(SNIPPET_WORKER_THREADS, query.itemsPerPage), query.neededResults()); + EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(query.id(true), SearchEventType.SNIPPETFETCH_START, ((this.workerThreads == null) ? "no" : this.workerThreads.length) + " online snippet fetch threads started", 0, 0), false); + // clean up events SearchEventCache.cleanupEvents(false); EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch( @@ -317,11 +354,10 @@ public final class SearchEvent { public void setQuery(final QueryParams query) { this.query = query; - this.resultFetcher.query = query; } protected void cleanup() { - this.resultFetcher.setCleanupState(); + this.cleanupState = true; // stop all threads if ( this.primarySearchThreadsL != null ) { @@ -348,7 +384,7 @@ public final class SearchEvent { } // call the worker threads and ask them to stop - for ( final SnippetWorker w : this.resultFetcher.workerThreads ) { + for ( final SnippetWorker w : this.workerThreads ) { if ( w != null && w.isAlive() ) { w.pleaseStop(); w.interrupt(); @@ -472,20 +508,12 @@ public final class SearchEvent { } } - public ResultEntry oneResult(final int item, final long timeout) { - if (this.localsearch != null && this.localsearch.isAlive()) try {this.localsearch.join();} catch (InterruptedException e) {} - return this.resultFetcher.oneResult(item, timeout); - } - - public SnippetProcess result() { - return this.resultFetcher; - } protected boolean workerAlive() { - if ( this.resultFetcher == null || this.resultFetcher.workerThreads == null ) { + if ( this.workerThreads == null ) { return false; } - for ( final SnippetWorker w : this.resultFetcher.workerThreads ) { + for ( final SnippetWorker w : this.workerThreads ) { if ( w != null && w.isAlive() ) { return true; } @@ -640,7 +668,7 @@ public final class SearchEvent { System.currentTimeMillis() - timer), false); } - protected long waitTimeRecommendation() { + private long waitTimeRecommendation() { return this.maxExpectedRemoteReferences.get() == 0 ? 0 : Math.min(maxWaitPerResult, Math.min( @@ -655,10 +683,6 @@ public final class SearchEvent { this.expectedRemoteReferences.addAndGet(x); } - protected boolean expectMoreRemoteReferences() { - return this.expectedRemoteReferences.get() > 0; - } - public int getSortOutCount() { return this.sortout; } @@ -957,5 +981,253 @@ public final class SearchEvent { Log.logWarning("RWIProcess", "loop terminated"); return null; } + + public long getURLRetrievalTime() { + return this.urlRetrievalAllTime; + } + + public long getSnippetComputationTime() { + return this.snippetComputationAllTime; + } + + public ResultEntry oneResult(final int item, final long timeout) { + if (this.localsearch != null && this.localsearch.isAlive()) try {this.localsearch.join();} catch (InterruptedException e) {} + // check if we already retrieved this item + // (happens if a search pages is accessed a second time) + final long finishTime = System.currentTimeMillis() + timeout; + EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "started, item = " + item + ", available = " + this.result.sizeAvailable(), 0, 0), false); + //Log.logInfo("SnippetProcess", "*start method for item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue()); + + // we must wait some time until the first result page is full to get enough elements for ranking + final long waittimeout = System.currentTimeMillis() + 300; + if (this.remote && item < 10 && !this.rankingProcess.feedingIsFinished()) { + // the first 10 results have a very special timing to get most of the remote results ordered + // before they are presented on the first lines .. yes sleeps seem to be bad. but how shall we predict how long other + // peers will take until they respond? + long sleep = item == 0 ? 400 : (10 - item) * 9; // the first result takes the longest time + //Log.logInfo("SnippetProcess", "SLEEP = " + sleep); + try { Thread.sleep(sleep); } catch (final InterruptedException e1) { Log.logException(e1); } + } + int thisRankingQueueSize, lastRankingQueueSize = 0; + if (item < 10) { + while ( + ((thisRankingQueueSize = this.rankingProcess.rwiQueueSize()) > 0 || !this.rankingProcess.feedingIsFinished()) && + (thisRankingQueueSize > lastRankingQueueSize || this.result.sizeAvailable() < item + 1) && + System.currentTimeMillis() < waittimeout && + anyWorkerAlive() + ) { + // wait a little time to get first results in the search + lastRankingQueueSize = thisRankingQueueSize; + try { Thread.sleep(20); } catch (final InterruptedException e1) {} + } + } + + if (this.result.sizeAvailable() > item) { + // we have the wanted result already in the result array .. return that + final ResultEntry re = this.result.element(item).getElement(); + EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "prefetched, item = " + item + ", available = " + this.result.sizeAvailable() + ": " + re.urlstring(), 0, 0), false); + return re; + } + + // finally wait until enough results are there produced from the snippet fetch process + WeakPriorityBlockingQueue.Element entry = null; + while (System.currentTimeMillis() < finishTime) { + + Log.logInfo("SnippetProcess", "item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.rwiQueueSize() + ", this.rankingProcess.nodeStack.sizeAvailable() = " + this.nodeStack.sizeAvailable()); + + if (!anyWorkerAlive() && !this.rankingProcess.isAlive() && this.result.sizeAvailable() + this.rankingProcess.rwiQueueSize() + this.nodeStack.sizeAvailable() <= item && this.rankingProcess.feedingIsFinished()) { + //Log.logInfo("SnippetProcess", "interrupted result fetching; item = " + item + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished()); + break; // the fail case + } + + // deploy worker to get more results + if (!anyWorkerAlive()) { + final int neededInclPrefetch = this.query.neededResults() + ((MemoryControl.available() > 100 * 1024 * 1024 && SNIPPET_WORKER_THREADS >= 8) ? this.query.itemsPerPage : 0); + deployWorker(Math.min(SNIPPET_WORKER_THREADS, this.query.itemsPerPage), neededInclPrefetch); + } + + try {entry = this.result.element(item, 50);} catch (final InterruptedException e) {break;} + if (entry != null) { break; } + } + + // finally, if there is something, return the result + if (entry == null) { + EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "not found, item = " + item + ", available = " + this.result.sizeAvailable(), 0, 0), false); + //Log.logInfo("SnippetProcess", "NO ENTRY computed (possible timeout); anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue()); + return null; + } + final ResultEntry re = entry.getElement(); + EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "retrieved, item = " + item + ", available = " + this.result.sizeAvailable() + ": " + re.urlstring(), 0, 0), false); + if (item == this.query.offset + this.query.itemsPerPage - 1) { + stopAllWorker(); // we don't need more + } + return re; + } + + public MediaSnippet oneImage(final int item) { + // always look for a next object if there are way too less + if (this.images.sizeAvailable() <= item + 10) { + fillImagesCache(); + } + + // check if we already retrieved the item + if (this.images.sizeDrained() > item) { + return this.images.element(item).getElement(); + } + + // look again if there are not enough for presentation + while (this.images.sizeAvailable() <= item) { + if (fillImagesCache() == 0) { + break; + } + } + if (this.images.sizeAvailable() <= item) { + return null; + } + + // now take the specific item from the image stack + return this.images.element(item).getElement(); + } + + private int fillImagesCache() { + final ResultEntry re = oneResult(this.resultCounter, Math.max(3000, this.query.timeout - System.currentTimeMillis())); + this.resultCounter++; + final ResultEntry result = re; + int c = 0; + if (result == null) { + return c; + } + // iterate over all images in the result + final List imagemedia = result.mediaSnippets(); + if (imagemedia != null) { + ResponseHeader header; + feedloop: for (final MediaSnippet ms: imagemedia) { + // check cache to see if the mime type of the image url is correct + header = Cache.getResponseHeader(ms.href.hash()); + if (header != null) { + // this does not work for all urls since some of them may not be in the cache + if (header.mime().startsWith("text") || header.mime().startsWith("application")) { + continue feedloop; + } + } + this.images.put(new ReverseElement(ms, ms.ranking)); // remove smallest in case of overflow + c++; + //System.out.println("*** image " + UTF8.String(ms.href.hash()) + " images.size = " + images.size() + "/" + images.size()); + } + } + return c; + } + + public ArrayList> completeResults(final long waitingtime) { + final long timeout = System.currentTimeMillis() + waitingtime; + while ( this.result.sizeAvailable() < this.query.neededResults() && + anyWorkerAlive() && + System.currentTimeMillis() < timeout) { + try {Thread.sleep(10);} catch (final InterruptedException e) {} + //System.out.println("+++DEBUG-completeResults+++ sleeping " + 200); + } + return this.result.list(Math.min(this.query.neededResults(), this.result.sizeAvailable())); + } + + + private void deployWorker(int deployCount, final int neededResults) { + if (this.cleanupState || + (this.rankingProcess.feedingIsFinished() && this.rankingProcess.rwiQueueSize() == 0 && this.nodeStack.sizeAvailable() == 0) || + this.result.sizeAvailable() >= neededResults) { + return; + } + SnippetWorker worker; + if (this.workerThreads == null) { + this.workerThreads = new SnippetWorker[deployCount]; + synchronized(this.workerThreads) {try { + for (int i = 0; i < this.workerThreads.length; i++) { + if (this.result.sizeAvailable() >= neededResults || + (this.rankingProcess.feedingIsFinished() && this.rankingProcess.rwiQueueSize() == 0) && this.nodeStack.sizeAvailable() == 0) { + break; + } + worker = new SnippetWorker(this, this.query.maxtime, this.query.snippetCacheStrategy, neededResults); + worker.start(); + this.workerThreads[i] = worker; + if (this.expectedRemoteReferences.get() > 0) { + long wait = this.waitTimeRecommendation(); + if (wait > 0) { + try {Thread.sleep(wait);} catch ( InterruptedException e ) {} + } + } + } + } catch (OutOfMemoryError e) {}} + } else { + // there are still worker threads running, but some may be dead. + // if we find dead workers, reanimate them + synchronized(this.workerThreads) { + for (int i = 0; i < this.workerThreads.length; i++) { + if (deployCount <= 0 || + this.result.sizeAvailable() >= neededResults || + (this.rankingProcess.feedingIsFinished() && this.rankingProcess.rwiQueueSize() == 0) && this.nodeStack.sizeAvailable() == 0) { + break; + } + if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) { + worker = new SnippetWorker(this, this.query.maxtime, this.query.snippetCacheStrategy, neededResults); + worker.start(); + this.workerThreads[i] = worker; + deployCount--; + } + if (this.expectedRemoteReferences.get() > 0) { + long wait = this.waitTimeRecommendation(); + if (wait > 0) { + try {Thread.sleep(wait);} catch ( InterruptedException e ) {} + } + } + } + } + } + } + + private void stopAllWorker() { + synchronized(this.workerThreads) { + for (int i = 0; i < this.workerThreads.length; i++) { + if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) { + continue; + } + this.workerThreads[i].pleaseStop(); + this.workerThreads[i].interrupt(); + } + } + } + + private boolean anyWorkerAlive() { + if (this.workerThreads == null || this.workerThreads.length == 0) { + return false; + } + synchronized(this.workerThreads) { + for (final SnippetWorker workerThread : this.workerThreads) { + if ((workerThread != null) && + (workerThread.isAlive()) && + (workerThread.busytime() < 10000)) { + return true; + } + } + } + return false; + } + + /** + * delete a specific entry from the search results + * this is used if the user clicks on a '-' sign beside the search result + * @param urlhash + * @return true if an entry was deleted, false otherwise + */ + protected boolean delete(final String urlhash) { + final Iterator> i = this.result.iterator(); + Element entry; + while (i.hasNext()) { + entry = i.next(); + if (urlhash.equals(ASCII.String(entry.getElement().url().hash()))) { + i.remove(); + return true; + } + } + return false; + } } diff --git a/source/net/yacy/search/query/SearchEventCache.java b/source/net/yacy/search/query/SearchEventCache.java index 947e07b32..56805e4eb 100644 --- a/source/net/yacy/search/query/SearchEventCache.java +++ b/source/net/yacy/search/query/SearchEventCache.java @@ -63,7 +63,7 @@ public class SearchEventCache { public static boolean delete(final String urlhash) { for (final SearchEvent event: lastEvents.values()) { - if (event.result().delete(urlhash)) return true; + if (event.delete(urlhash)) return true; } return false; } diff --git a/source/net/yacy/search/query/SnippetProcess.java b/source/net/yacy/search/query/SnippetProcess.java deleted file mode 100644 index f33233077..000000000 --- a/source/net/yacy/search/query/SnippetProcess.java +++ /dev/null @@ -1,376 +0,0 @@ -// SearchEvent.java -// (C) 2005 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany -// first published 10.10.2005 on http://yacy.net -// -// This is a part of YaCy, a peer-to-peer based web search engine -// -// $LastChangedDate$ -// $LastChangedRevision$ -// $LastChangedBy$ -// -// LICENSE -// -// This program is free software; you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation; either version 2 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program; if not, write to the Free Software -// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -package net.yacy.search.query; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import net.yacy.cora.document.ASCII; -import net.yacy.cora.protocol.ResponseHeader; -import net.yacy.cora.sorting.WeakPriorityBlockingQueue; -import net.yacy.cora.sorting.WeakPriorityBlockingQueue.Element; -import net.yacy.cora.sorting.WeakPriorityBlockingQueue.ReverseElement; -import net.yacy.cora.storage.HandleSet; -import net.yacy.cora.util.SpaceExceededException; -import net.yacy.crawler.data.Cache; -import net.yacy.data.WorkTables; -import net.yacy.kelondro.index.RowHandleSet; -import net.yacy.kelondro.logging.Log; -import net.yacy.kelondro.util.MemoryControl; -import net.yacy.peers.SeedDB; -import net.yacy.peers.graphics.ProfilingGraph; -import net.yacy.repository.LoaderDispatcher; -import net.yacy.search.EventTracker; -import net.yacy.search.Switchboard; -import net.yacy.search.snippet.MediaSnippet; -import net.yacy.search.snippet.ResultEntry; - -public class SnippetProcess { - - public static Log log = new Log("SEARCH"); - - public static final int SNIPPET_MAX_LENGTH = 220; - private final static int SNIPPET_WORKER_THREADS = Math.max(4, Runtime.getRuntime().availableProcessors() * 2); - - // input values - final SearchEvent searchEvent; - QueryParams query; - final SeedDB peers; - final WorkTables workTables; - - // result values - private final WeakPriorityBlockingQueue images; // container to sort images by size - protected final WeakPriorityBlockingQueue result; - protected final LoaderDispatcher loader; - protected final HandleSet snippetFetchWordHashes; // a set of word hashes that are used to match with the snippets - protected final boolean deleteIfSnippetFail; - protected SnippetWorker[] workerThreads; - protected long urlRetrievalAllTime; - protected long snippetComputationAllTime; - - private final boolean remote; - private boolean cleanupState; - - protected SnippetProcess( - final SearchEvent searchEvent, - final LoaderDispatcher loader, - final QueryParams query, - final SeedDB peers, - final WorkTables workTables, - final boolean deleteIfSnippetFail, - final boolean remote) { - assert query != null; - this.searchEvent = searchEvent; - this.loader = loader; - this.query = query; - this.peers = peers; - this.workTables = workTables; - this.deleteIfSnippetFail = deleteIfSnippetFail; - this.remote = remote; - this.cleanupState = false; - this.urlRetrievalAllTime = 0; - this.snippetComputationAllTime = 0; - this.result = new WeakPriorityBlockingQueue(Math.max(1000, 10 * query.itemsPerPage()), true); // this is the result, enriched with snippets, ranked and ordered by ranking - this.images = new WeakPriorityBlockingQueue(Math.max(1000, 10 * query.itemsPerPage()), true); - - // snippets do not need to match with the complete query hashes, - // only with the query minus the stopwords which had not been used for the search - HandleSet filtered; - try { - filtered = RowHandleSet.joinConstructive(query.query_include_hashes, Switchboard.stopwordHashes); - } catch (final SpaceExceededException e) { - Log.logException(e); - filtered = new RowHandleSet(query.query_include_hashes.keylen(), query.query_include_hashes.comparator(), 0); - } - this.snippetFetchWordHashes = query.query_include_hashes.clone(); - if (filtered != null && !filtered.isEmpty()) { - this.snippetFetchWordHashes.excludeDestructive(Switchboard.stopwordHashes); - } - - // start worker threads to fetch urls and snippets - this.workerThreads = null; - deployWorker(Math.min(SNIPPET_WORKER_THREADS, query.itemsPerPage), query.neededResults()); - EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(query.id(true), SearchEventType.SNIPPETFETCH_START, ((this.workerThreads == null) ? "no" : this.workerThreads.length) + " online snippet fetch threads started", 0, 0), false); - } - - protected void setCleanupState() { - this.cleanupState = true; - } - - public long getURLRetrievalTime() { - return this.urlRetrievalAllTime; - } - - public long getSnippetComputationTime() { - return this.snippetComputationAllTime; - } - - protected ResultEntry oneResult(final int item, final long timeout) { - // check if we already retrieved this item - // (happens if a search pages is accessed a second time) - final long finishTime = System.currentTimeMillis() + timeout; - EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "started, item = " + item + ", available = " + this.result.sizeAvailable(), 0, 0), false); - //Log.logInfo("SnippetProcess", "*start method for item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue()); - - // we must wait some time until the first result page is full to get enough elements for ranking - final long waittimeout = System.currentTimeMillis() + 300; - if (this.remote && item < 10 && !this.searchEvent.rankingProcess.feedingIsFinished()) { - // the first 10 results have a very special timing to get most of the remote results ordered - // before they are presented on the first lines .. yes sleeps seem to be bad. but how shall we predict how long other - // peers will take until they respond? - long sleep = item == 0 ? 400 : (10 - item) * 9; // the first result takes the longest time - //Log.logInfo("SnippetProcess", "SLEEP = " + sleep); - try { Thread.sleep(sleep); } catch (final InterruptedException e1) { Log.logException(e1); } - } - int thisRankingQueueSize, lastRankingQueueSize = 0; - if (item < 10) { - while ( - ((thisRankingQueueSize = this.searchEvent.rankingProcess.rwiQueueSize()) > 0 || !this.searchEvent.rankingProcess.feedingIsFinished()) && - (thisRankingQueueSize > lastRankingQueueSize || this.result.sizeAvailable() < item + 1) && - System.currentTimeMillis() < waittimeout && - anyWorkerAlive() - ) { - // wait a little time to get first results in the search - lastRankingQueueSize = thisRankingQueueSize; - try { Thread.sleep(20); } catch (final InterruptedException e1) {} - } - } - - if (this.result.sizeAvailable() > item) { - // we have the wanted result already in the result array .. return that - final ResultEntry re = this.result.element(item).getElement(); - EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "prefetched, item = " + item + ", available = " + this.result.sizeAvailable() + ": " + re.urlstring(), 0, 0), false); - return re; - } - - // finally wait until enough results are there produced from the snippet fetch process - WeakPriorityBlockingQueue.Element entry = null; - while (System.currentTimeMillis() < finishTime) { - - Log.logInfo("SnippetProcess", "item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.searchEvent.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.searchEvent.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.searchEvent.rankingProcess.rwiQueueSize() + ", this.rankingProcess.nodeStack.sizeAvailable() = " + this.searchEvent.nodeStack.sizeAvailable()); - - if (!anyWorkerAlive() && !this.searchEvent.rankingProcess.isAlive() && this.result.sizeAvailable() + this.searchEvent.rankingProcess.rwiQueueSize() + this.searchEvent.nodeStack.sizeAvailable() <= item && this.searchEvent.rankingProcess.feedingIsFinished()) { - //Log.logInfo("SnippetProcess", "interrupted result fetching; item = " + item + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished()); - break; // the fail case - } - - // deploy worker to get more results - if (!anyWorkerAlive()) { - final int neededInclPrefetch = this.query.neededResults() + ((MemoryControl.available() > 100 * 1024 * 1024 && SNIPPET_WORKER_THREADS >= 8) ? this.query.itemsPerPage : 0); - deployWorker(Math.min(SNIPPET_WORKER_THREADS, this.query.itemsPerPage), neededInclPrefetch); - } - - try {entry = this.result.element(item, 50);} catch (final InterruptedException e) {break;} - if (entry != null) { break; } - } - - // finally, if there is something, return the result - if (entry == null) { - EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "not found, item = " + item + ", available = " + this.result.sizeAvailable(), 0, 0), false); - //Log.logInfo("SnippetProcess", "NO ENTRY computed (possible timeout); anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue()); - return null; - } - final ResultEntry re = entry.getElement(); - EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "retrieved, item = " + item + ", available = " + this.result.sizeAvailable() + ": " + re.urlstring(), 0, 0), false); - if (item == this.query.offset + this.query.itemsPerPage - 1) { - stopAllWorker(); // we don't need more - } - return re; - } - - private int resultCounter = 0; - private ResultEntry nextResult() { - final ResultEntry re = oneResult(this.resultCounter, Math.max(3000, this.query.timeout - System.currentTimeMillis())); - this.resultCounter++; - return re; - } - - public MediaSnippet oneImage(final int item) { - // always look for a next object if there are way too less - if (this.images.sizeAvailable() <= item + 10) { - fillImagesCache(); - } - - // check if we already retrieved the item - if (this.images.sizeDrained() > item) { - return this.images.element(item).getElement(); - } - - // look again if there are not enough for presentation - while (this.images.sizeAvailable() <= item) { - if (fillImagesCache() == 0) { - break; - } - } - if (this.images.sizeAvailable() <= item) { - return null; - } - - // now take the specific item from the image stack - return this.images.element(item).getElement(); - } - - private int fillImagesCache() { - final ResultEntry result = nextResult(); - int c = 0; - if (result == null) { - return c; - } - // iterate over all images in the result - final List imagemedia = result.mediaSnippets(); - if (imagemedia != null) { - ResponseHeader header; - feedloop: for (final MediaSnippet ms: imagemedia) { - // check cache to see if the mime type of the image url is correct - header = Cache.getResponseHeader(ms.href.hash()); - if (header != null) { - // this does not work for all urls since some of them may not be in the cache - if (header.mime().startsWith("text") || header.mime().startsWith("application")) { - continue feedloop; - } - } - this.images.put(new ReverseElement(ms, ms.ranking)); // remove smallest in case of overflow - c++; - //System.out.println("*** image " + UTF8.String(ms.href.hash()) + " images.size = " + images.size() + "/" + images.size()); - } - } - return c; - } - - public ArrayList> completeResults(final long waitingtime) { - final long timeout = System.currentTimeMillis() + waitingtime; - while ( this.result.sizeAvailable() < this.query.neededResults() && - anyWorkerAlive() && - System.currentTimeMillis() < timeout) { - try {Thread.sleep(10);} catch (final InterruptedException e) {} - //System.out.println("+++DEBUG-completeResults+++ sleeping " + 200); - } - return this.result.list(Math.min(this.query.neededResults(), this.result.sizeAvailable())); - } - - - private void deployWorker(int deployCount, final int neededResults) { - if (this.cleanupState || - (this.searchEvent.rankingProcess.feedingIsFinished() && this.searchEvent.rankingProcess.rwiQueueSize() == 0 && this.searchEvent.nodeStack.sizeAvailable() == 0) || - this.result.sizeAvailable() >= neededResults) { - return; - } - SnippetWorker worker; - if (this.workerThreads == null) { - this.workerThreads = new SnippetWorker[deployCount]; - synchronized(this.workerThreads) {try { - for (int i = 0; i < this.workerThreads.length; i++) { - if (this.result.sizeAvailable() >= neededResults || - (this.searchEvent.rankingProcess.feedingIsFinished() && this.searchEvent.rankingProcess.rwiQueueSize() == 0) && this.searchEvent.nodeStack.sizeAvailable() == 0) { - break; - } - worker = new SnippetWorker(this, this.query.maxtime, this.query.snippetCacheStrategy, neededResults); - worker.start(); - this.workerThreads[i] = worker; - if (this.searchEvent.expectMoreRemoteReferences()) { - long wait = this.searchEvent.waitTimeRecommendation(); - if (wait > 0) { - try {Thread.sleep(wait);} catch ( InterruptedException e ) {} - } - } - } - } catch (OutOfMemoryError e) {}} - } else { - // there are still worker threads running, but some may be dead. - // if we find dead workers, reanimate them - synchronized(this.workerThreads) { - for (int i = 0; i < this.workerThreads.length; i++) { - if (deployCount <= 0 || - this.result.sizeAvailable() >= neededResults || - (this.searchEvent.rankingProcess.feedingIsFinished() && this.searchEvent.rankingProcess.rwiQueueSize() == 0) && this.searchEvent.nodeStack.sizeAvailable() == 0) { - break; - } - if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) { - worker = new SnippetWorker(this, this.query.maxtime, this.query.snippetCacheStrategy, neededResults); - worker.start(); - this.workerThreads[i] = worker; - deployCount--; - } - if (this.searchEvent.expectMoreRemoteReferences()) { - long wait = this.searchEvent.waitTimeRecommendation(); - if (wait > 0) { - try {Thread.sleep(wait);} catch ( InterruptedException e ) {} - } - } - } - } - } - } - - private void stopAllWorker() { - synchronized(this.workerThreads) { - for (int i = 0; i < this.workerThreads.length; i++) { - if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) { - continue; - } - this.workerThreads[i].pleaseStop(); - this.workerThreads[i].interrupt(); - } - } - } - - private boolean anyWorkerAlive() { - if (this.workerThreads == null || this.workerThreads.length == 0) { - return false; - } - synchronized(this.workerThreads) { - for (final SnippetWorker workerThread : this.workerThreads) { - if ((workerThread != null) && - (workerThread.isAlive()) && - (workerThread.busytime() < 10000)) { - return true; - } - } - } - return false; - } - - /** - * delete a specific entry from the search results - * this is used if the user clicks on a '-' sign beside the search result - * @param urlhash - * @return true if an entry was deleted, false otherwise - */ - protected boolean delete(final String urlhash) { - final Iterator> i = this.result.iterator(); - Element entry; - while (i.hasNext()) { - entry = i.next(); - if (urlhash.equals(ASCII.String(entry.getElement().url().hash()))) { - i.remove(); - return true; - } - } - return false; - } -} diff --git a/source/net/yacy/search/query/SnippetWorker.java b/source/net/yacy/search/query/SnippetWorker.java index abc9bec66..57d46a454 100644 --- a/source/net/yacy/search/query/SnippetWorker.java +++ b/source/net/yacy/search/query/SnippetWorker.java @@ -38,14 +38,14 @@ import net.yacy.search.snippet.ResultEntry; import net.yacy.search.snippet.TextSnippet; public class SnippetWorker extends Thread { - private final SnippetProcess snippetProcess; + private final SearchEvent snippetProcess; private final long timeout; // the date until this thread should try to work private long lastLifeSign; // when the last time the run()-loop was executed private final CacheStrategy cacheStrategy; private final int neededResults; private boolean shallrun; - public SnippetWorker(final SnippetProcess snippetProcess, final long maxlifetime, final CacheStrategy cacheStrategy, final int neededResults) { + public SnippetWorker(final SearchEvent snippetProcess, final long maxlifetime, final CacheStrategy cacheStrategy, final int neededResults) { this.snippetProcess = snippetProcess; this.cacheStrategy = cacheStrategy; this.lastLifeSign = System.currentTimeMillis(); @@ -78,13 +78,13 @@ public class SnippetWorker extends Thread { } // check if we can succeed if we try to take another url - if (this.snippetProcess.searchEvent.rankingProcess.feedingIsFinished() && this.snippetProcess.searchEvent.rankingProcess.rwiQueueSize() == 0 && this.snippetProcess.searchEvent.nodeStack.sizeAvailable() == 0) { + if (this.snippetProcess.rankingProcess.feedingIsFinished() && this.snippetProcess.rankingProcess.rwiQueueSize() == 0 && this.snippetProcess.nodeStack.sizeAvailable() == 0) { Log.logWarning("SnippetProcess", "rankingProcess.feedingIsFinished() && rankingProcess.sizeQueue() == 0"); break; } // get next entry - page = this.snippetProcess.searchEvent.takeURL(true, Math.min(500, Math.max(20, this.timeout - System.currentTimeMillis()))); + page = this.snippetProcess.takeURL(true, Math.min(500, Math.max(20, this.timeout - System.currentTimeMillis()))); //if (page != null) Log.logInfo("SnippetProcess", "got one page: " + page.metadata().url().toNormalform(true, false)); //if (page == null) page = rankedCache.takeURL(false, this.timeout - System.currentTimeMillis()); if (page == null) { @@ -111,12 +111,12 @@ public class SnippetWorker extends Thread { // place the result to the result vector // apply post-ranking - long ranking = resultEntry.word() == null ? 0 : Long.valueOf(this.snippetProcess.searchEvent.rankingProcess.order.cardinal(resultEntry.word())); - ranking += postRanking(resultEntry, this.snippetProcess.searchEvent.rankingProcess.getTopicNavigator(10)); + long ranking = resultEntry.word() == null ? 0 : Long.valueOf(this.snippetProcess.rankingProcess.order.cardinal(resultEntry.word())); + ranking += postRanking(resultEntry, this.snippetProcess.rankingProcess.getTopicNavigator(10)); resultEntry.ranking = ranking; this.snippetProcess.result.put(new ReverseElement(resultEntry, ranking)); // remove smallest in case of overflow if (nav_topics) { - this.snippetProcess.searchEvent.rankingProcess.addTopics(resultEntry); + this.snippetProcess.rankingProcess.addTopics(resultEntry); } } if (System.currentTimeMillis() >= this.timeout) { @@ -135,7 +135,7 @@ public class SnippetWorker extends Thread { * calculate the time since the worker has had the latest activity * @return time in milliseconds lasted since latest activity */ - public long busytime() { + protected long busytime() { return System.currentTimeMillis() - this.lastLifeSign; } @@ -223,7 +223,7 @@ public class SnippetWorker extends Thread { //this.query.queryString, null, ((this.snippetProcess.query.constraint != null) && (this.snippetProcess.query.constraint.get(Condenser.flag_cat_indexof))), - SnippetProcess.SNIPPET_MAX_LENGTH, + SearchEvent.SNIPPET_MAX_LENGTH, !this.snippetProcess.query.isLocal()); return new ResultEntry(page, this.snippetProcess.query.getSegment(), this.snippetProcess.peers, snippet, null, dbRetrievalTime, 0); // result without snippet } @@ -242,7 +242,7 @@ public class SnippetWorker extends Thread { 180, !this.snippetProcess.query.isLocal()); final long snippetComputationTime = System.currentTimeMillis() - startTime; - SnippetProcess.log.logInfo("text snippet load time for " + page.url() + ": " + snippetComputationTime + ", " + (!snippet.getErrorCode().fail() ? "snippet found" : ("no snippet found (" + snippet.getError() + ")"))); + SearchEvent.log.logInfo("text snippet load time for " + page.url() + ": " + snippetComputationTime + ", " + (!snippet.getErrorCode().fail() ? "snippet found" : ("no snippet found (" + snippet.getError() + ")"))); if (!snippet.getErrorCode().fail()) { // we loaded the file and found the snippet @@ -261,7 +261,7 @@ public class SnippetWorker extends Thread { if (this.snippetProcess.deleteIfSnippetFail) { this.snippetProcess.workTables.failURLsRegisterMissingWord(this.snippetProcess.query.getSegment().termIndex(), page.url(), this.snippetProcess.query.query_include_hashes, reason); } - SnippetProcess.log.logInfo("sorted out url " + page.url().toNormalform(true) + " during search: " + reason); + SearchEvent.log.logInfo("sorted out url " + page.url().toNormalform(true) + " during search: " + reason); return null; } }