more refactoring - integrated the code of SnippetProcess into

SearchEvent
pull/1/head
Michael Peter Christen 12 years ago
parent 6629e37685
commit 1168d09de8

@ -37,7 +37,7 @@ import net.yacy.kelondro.logging.Log;
import net.yacy.search.Switchboard;
import net.yacy.search.query.AccessTracker;
import net.yacy.search.query.QueryParams;
import net.yacy.search.query.SnippetProcess;
import net.yacy.search.query.SearchEvent;
import net.yacy.server.serverObjects;
import net.yacy.server.serverSwitch;
@ -115,7 +115,7 @@ public class searchresult {
post.put("hl.alternateField", YaCySchema.description.getSolrFieldName());
post.put("hl.simple.pre", "<b>");
post.put("hl.simple.post", "</b>");
post.put("hl.fragsize", Integer.toString(SnippetProcess.SNIPPET_MAX_LENGTH));
post.put("hl.fragsize", Integer.toString(SearchEvent.SNIPPET_MAX_LENGTH));
GSAResponseWriter.Sort sort = new GSAResponseWriter.Sort(post.get(CommonParams.SORT, ""));
String sorts = sort.toSolr();
if (sorts == null) {

@ -40,7 +40,7 @@ import net.yacy.kelondro.logging.Log;
import net.yacy.search.Switchboard;
import net.yacy.search.SwitchboardConstants;
import net.yacy.search.query.AccessTracker;
import net.yacy.search.query.SnippetProcess;
import net.yacy.search.query.SearchEvent;
import net.yacy.server.serverObjects;
import net.yacy.server.serverSwitch;
@ -166,7 +166,7 @@ public class select {
post.put("hl.fl", "text_t,h1,h2");
post.put("hl.simple.pre", "");
post.put("hl.simple.post", "");
post.put("hl.fragsize", Integer.toString(SnippetProcess.SNIPPET_MAX_LENGTH));
post.put("hl.fragsize", Integer.toString(SearchEvent.SNIPPET_MAX_LENGTH));
}
// get the embedded connector

@ -327,7 +327,7 @@ public final class search {
joincount = theSearch.rankingProcess.rwiAvailableCount() - theSearch.rankingProcess.getMissCount() - theSearch.getSortOutCount();
prop.put("joincount", Integer.toString(joincount));
if (joincount != 0) {
accu = theSearch.result().completeResults(maxtime);
accu = theSearch.completeResults(maxtime);
}
if (joincount <= 0 || abstracts.isEmpty()) {
prop.put("indexcount", "");
@ -421,8 +421,8 @@ public final class search {
theQuery.remotepeer = client == null ? null : sb.peers.lookupByIP(Domains.dnsResolve(client), -1, true, false, false);
theQuery.resultcount = (theSearch == null) ? 0 : joincount;
theQuery.searchtime = System.currentTimeMillis() - timestamp;
theQuery.urlretrievaltime = (theSearch == null) ? 0 : theSearch.result().getURLRetrievalTime();
theQuery.snippetcomputationtime = (theSearch == null) ? 0 : theSearch.result().getSnippetComputationTime();
theQuery.urlretrievaltime = (theSearch == null) ? 0 : theSearch.getURLRetrievalTime();
theQuery.snippetcomputationtime = (theSearch == null) ? 0 : theSearch.getSnippetComputationTime();
AccessTracker.add(AccessTracker.Location.remote, theQuery);
// update the search tracker

@ -81,7 +81,6 @@ import net.yacy.search.query.QueryParams;
import net.yacy.search.query.SearchEvent;
import net.yacy.search.query.SearchEventCache;
import net.yacy.search.query.SearchEventType;
import net.yacy.search.query.SnippetProcess;
import net.yacy.search.ranking.RankingProfile;
import net.yacy.search.snippet.TextSnippet;
import net.yacy.server.serverCore;
@ -709,7 +708,7 @@ public class yacysearch {
try {
Pattern.compile(urlmask);
} catch ( final PatternSyntaxException ex ) {
SnippetProcess.log.logWarning("Illegal URL mask, not a valid regex: " + urlmask);
SearchEvent.log.logWarning("Illegal URL mask, not a valid regex: " + urlmask);
prop.put("urlmaskerror", 1);
prop.putHTML("urlmaskerror_urlmask", urlmask);
urlmask = ".*";
@ -718,7 +717,7 @@ public class yacysearch {
try {
Pattern.compile(prefermask);
} catch ( final PatternSyntaxException ex ) {
SnippetProcess.log.logWarning("Illegal prefer mask, not a valid regex: " + prefermask);
SearchEvent.log.logWarning("Illegal prefer mask, not a valid regex: " + prefermask);
prop.put("prefermaskerror", 1);
prop.putHTML("prefermaskerror_prefermask", prefermask);
prefermask = "";
@ -854,8 +853,8 @@ public class yacysearch {
- theSearch.rankingProcess.getMissCount()
+ theSearch.rankingProcess.getRemoteIndexCount();
theQuery.searchtime = System.currentTimeMillis() - timestamp;
theQuery.urlretrievaltime = theSearch.result().getURLRetrievalTime();
theQuery.snippetcomputationtime = theSearch.result().getSnippetComputationTime();
theQuery.urlretrievaltime = theSearch.getURLRetrievalTime();
theQuery.snippetcomputationtime = theSearch.getSnippetComputationTime();
AccessTracker.add(AccessTracker.Location.local, theQuery);
// check suggestions

@ -47,13 +47,17 @@ import net.yacy.cora.lod.JenaTripleStore;
import net.yacy.cora.lod.vocabulary.Tagging;
import net.yacy.cora.lod.vocabulary.YaCyMetadata;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.protocol.ResponseHeader;
import net.yacy.cora.protocol.Scanner;
import net.yacy.cora.sorting.ClusteredScoreMap;
import net.yacy.cora.sorting.ConcurrentScoreMap;
import net.yacy.cora.sorting.ScoreMap;
import net.yacy.cora.sorting.WeakPriorityBlockingQueue;
import net.yacy.cora.sorting.WeakPriorityBlockingQueue.Element;
import net.yacy.cora.sorting.WeakPriorityBlockingQueue.ReverseElement;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.crawler.data.Cache;
import net.yacy.data.WorkTables;
import net.yacy.document.Condenser;
import net.yacy.document.LargeNumberCache;
@ -64,6 +68,7 @@ import net.yacy.kelondro.data.word.Word;
import net.yacy.kelondro.data.word.WordReference;
import net.yacy.kelondro.data.word.WordReferenceFactory;
import net.yacy.kelondro.data.word.WordReferenceVars;
import net.yacy.kelondro.index.RowHandleSet;
import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.rwi.ReferenceContainer;
import net.yacy.kelondro.util.Bitfield;
@ -76,18 +81,22 @@ import net.yacy.repository.LoaderDispatcher;
import net.yacy.repository.Blacklist.BlacklistType;
import net.yacy.search.EventTracker;
import net.yacy.search.Switchboard;
import net.yacy.search.snippet.MediaSnippet;
import net.yacy.search.snippet.ResultEntry;
public final class SearchEvent {
public static Log log = new Log("SEARCH");
private static final long maxWaitPerResult = 30;
public static final int SNIPPET_MAX_LENGTH = 220;
private final static int SNIPPET_WORKER_THREADS = Math.max(4, Runtime.getRuntime().availableProcessors() * 2);
private long eventTime;
protected QueryParams query;
public final SeedDB peers;
private final WorkTables workTables;
final WorkTables workTables;
public final RankingProcess rankingProcess; // ordered search results, grows dynamically as all the query threads enrich this container
private final SnippetProcess resultFetcher;
public final SecondarySearchSuperviser secondarySearchSuperviser;
public final List<RemoteSearch> primarySearchThreadsL;
protected Thread[] secondarySearchThreads;
@ -104,9 +113,19 @@ public final class SearchEvent {
private final ScoreMap<String> namespaceNavigator; // a counter for name spaces
private final ScoreMap<String> protocolNavigator; // a counter for protocol types
private final ScoreMap<String> filetypeNavigator; // a counter for file types
protected final WeakPriorityBlockingQueue<URIMetadataNode> nodeStack;
private final WeakPriorityBlockingQueue<MediaSnippet> images; // container to sort images by size
protected final WeakPriorityBlockingQueue<ResultEntry> result;
protected final LoaderDispatcher loader;
protected final HandleSet snippetFetchWordHashes; // a set of word hashes that are used to match with the snippets
protected final boolean deleteIfSnippetFail;
protected SnippetWorker[] workerThreads;
protected long urlRetrievalAllTime;
protected long snippetComputationAllTime;
private final boolean remote;
private boolean cleanupState;
private int resultCounter = 0;
protected SearchEvent(
final QueryParams query,
final SeedDB peers,
@ -126,6 +145,7 @@ public final class SearchEvent {
this.peers = peers;
this.workTables = workTables;
this.query = query;
this.loader = loader;
this.nodeStack = new WeakPriorityBlockingQueue<URIMetadataNode>(300, false);
this.maxExpectedRemoteReferences = new AtomicInteger(0);
@ -277,16 +297,33 @@ public final class SearchEvent {
}
// start worker threads to fetch urls and snippets
this.resultFetcher =
new SnippetProcess(
this,
loader,
this.query,
this.peers,
this.workTables,
deleteIfSnippetFail,
remote);
this.deleteIfSnippetFail = deleteIfSnippetFail;
this.remote = remote;
this.cleanupState = false;
this.urlRetrievalAllTime = 0;
this.snippetComputationAllTime = 0;
this.result = new WeakPriorityBlockingQueue<ResultEntry>(Math.max(1000, 10 * query.itemsPerPage()), true); // this is the result, enriched with snippets, ranked and ordered by ranking
this.images = new WeakPriorityBlockingQueue<MediaSnippet>(Math.max(1000, 10 * query.itemsPerPage()), true);
// snippets do not need to match with the complete query hashes,
// only with the query minus the stopwords which had not been used for the search
HandleSet filtered;
try {
filtered = RowHandleSet.joinConstructive(query.query_include_hashes, Switchboard.stopwordHashes);
} catch (final SpaceExceededException e) {
Log.logException(e);
filtered = new RowHandleSet(query.query_include_hashes.keylen(), query.query_include_hashes.comparator(), 0);
}
this.snippetFetchWordHashes = query.query_include_hashes.clone();
if (filtered != null && !filtered.isEmpty()) {
this.snippetFetchWordHashes.excludeDestructive(Switchboard.stopwordHashes);
}
// start worker threads to fetch urls and snippets
this.workerThreads = null;
deployWorker(Math.min(SNIPPET_WORKER_THREADS, query.itemsPerPage), query.neededResults());
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(query.id(true), SearchEventType.SNIPPETFETCH_START, ((this.workerThreads == null) ? "no" : this.workerThreads.length) + " online snippet fetch threads started", 0, 0), false);
// clean up events
SearchEventCache.cleanupEvents(false);
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(
@ -317,11 +354,10 @@ public final class SearchEvent {
public void setQuery(final QueryParams query) {
this.query = query;
this.resultFetcher.query = query;
}
protected void cleanup() {
this.resultFetcher.setCleanupState();
this.cleanupState = true;
// stop all threads
if ( this.primarySearchThreadsL != null ) {
@ -348,7 +384,7 @@ public final class SearchEvent {
}
// call the worker threads and ask them to stop
for ( final SnippetWorker w : this.resultFetcher.workerThreads ) {
for ( final SnippetWorker w : this.workerThreads ) {
if ( w != null && w.isAlive() ) {
w.pleaseStop();
w.interrupt();
@ -472,20 +508,12 @@ public final class SearchEvent {
}
}
public ResultEntry oneResult(final int item, final long timeout) {
if (this.localsearch != null && this.localsearch.isAlive()) try {this.localsearch.join();} catch (InterruptedException e) {}
return this.resultFetcher.oneResult(item, timeout);
}
public SnippetProcess result() {
return this.resultFetcher;
}
protected boolean workerAlive() {
if ( this.resultFetcher == null || this.resultFetcher.workerThreads == null ) {
if ( this.workerThreads == null ) {
return false;
}
for ( final SnippetWorker w : this.resultFetcher.workerThreads ) {
for ( final SnippetWorker w : this.workerThreads ) {
if ( w != null && w.isAlive() ) {
return true;
}
@ -640,7 +668,7 @@ public final class SearchEvent {
System.currentTimeMillis() - timer), false);
}
protected long waitTimeRecommendation() {
private long waitTimeRecommendation() {
return this.maxExpectedRemoteReferences.get() == 0 ? 0 :
Math.min(maxWaitPerResult,
Math.min(
@ -655,10 +683,6 @@ public final class SearchEvent {
this.expectedRemoteReferences.addAndGet(x);
}
protected boolean expectMoreRemoteReferences() {
return this.expectedRemoteReferences.get() > 0;
}
public int getSortOutCount() {
return this.sortout;
}
@ -957,5 +981,253 @@ public final class SearchEvent {
Log.logWarning("RWIProcess", "loop terminated");
return null;
}
public long getURLRetrievalTime() {
return this.urlRetrievalAllTime;
}
public long getSnippetComputationTime() {
return this.snippetComputationAllTime;
}
public ResultEntry oneResult(final int item, final long timeout) {
if (this.localsearch != null && this.localsearch.isAlive()) try {this.localsearch.join();} catch (InterruptedException e) {}
// check if we already retrieved this item
// (happens if a search pages is accessed a second time)
final long finishTime = System.currentTimeMillis() + timeout;
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "started, item = " + item + ", available = " + this.result.sizeAvailable(), 0, 0), false);
//Log.logInfo("SnippetProcess", "*start method for item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
// we must wait some time until the first result page is full to get enough elements for ranking
final long waittimeout = System.currentTimeMillis() + 300;
if (this.remote && item < 10 && !this.rankingProcess.feedingIsFinished()) {
// the first 10 results have a very special timing to get most of the remote results ordered
// before they are presented on the first lines .. yes sleeps seem to be bad. but how shall we predict how long other
// peers will take until they respond?
long sleep = item == 0 ? 400 : (10 - item) * 9; // the first result takes the longest time
//Log.logInfo("SnippetProcess", "SLEEP = " + sleep);
try { Thread.sleep(sleep); } catch (final InterruptedException e1) { Log.logException(e1); }
}
int thisRankingQueueSize, lastRankingQueueSize = 0;
if (item < 10) {
while (
((thisRankingQueueSize = this.rankingProcess.rwiQueueSize()) > 0 || !this.rankingProcess.feedingIsFinished()) &&
(thisRankingQueueSize > lastRankingQueueSize || this.result.sizeAvailable() < item + 1) &&
System.currentTimeMillis() < waittimeout &&
anyWorkerAlive()
) {
// wait a little time to get first results in the search
lastRankingQueueSize = thisRankingQueueSize;
try { Thread.sleep(20); } catch (final InterruptedException e1) {}
}
}
if (this.result.sizeAvailable() > item) {
// we have the wanted result already in the result array .. return that
final ResultEntry re = this.result.element(item).getElement();
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "prefetched, item = " + item + ", available = " + this.result.sizeAvailable() + ": " + re.urlstring(), 0, 0), false);
return re;
}
// finally wait until enough results are there produced from the snippet fetch process
WeakPriorityBlockingQueue.Element<ResultEntry> entry = null;
while (System.currentTimeMillis() < finishTime) {
Log.logInfo("SnippetProcess", "item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.rwiQueueSize() + ", this.rankingProcess.nodeStack.sizeAvailable() = " + this.nodeStack.sizeAvailable());
if (!anyWorkerAlive() && !this.rankingProcess.isAlive() && this.result.sizeAvailable() + this.rankingProcess.rwiQueueSize() + this.nodeStack.sizeAvailable() <= item && this.rankingProcess.feedingIsFinished()) {
//Log.logInfo("SnippetProcess", "interrupted result fetching; item = " + item + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished());
break; // the fail case
}
// deploy worker to get more results
if (!anyWorkerAlive()) {
final int neededInclPrefetch = this.query.neededResults() + ((MemoryControl.available() > 100 * 1024 * 1024 && SNIPPET_WORKER_THREADS >= 8) ? this.query.itemsPerPage : 0);
deployWorker(Math.min(SNIPPET_WORKER_THREADS, this.query.itemsPerPage), neededInclPrefetch);
}
try {entry = this.result.element(item, 50);} catch (final InterruptedException e) {break;}
if (entry != null) { break; }
}
// finally, if there is something, return the result
if (entry == null) {
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "not found, item = " + item + ", available = " + this.result.sizeAvailable(), 0, 0), false);
//Log.logInfo("SnippetProcess", "NO ENTRY computed (possible timeout); anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
return null;
}
final ResultEntry re = entry.getElement();
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "retrieved, item = " + item + ", available = " + this.result.sizeAvailable() + ": " + re.urlstring(), 0, 0), false);
if (item == this.query.offset + this.query.itemsPerPage - 1) {
stopAllWorker(); // we don't need more
}
return re;
}
public MediaSnippet oneImage(final int item) {
// always look for a next object if there are way too less
if (this.images.sizeAvailable() <= item + 10) {
fillImagesCache();
}
// check if we already retrieved the item
if (this.images.sizeDrained() > item) {
return this.images.element(item).getElement();
}
// look again if there are not enough for presentation
while (this.images.sizeAvailable() <= item) {
if (fillImagesCache() == 0) {
break;
}
}
if (this.images.sizeAvailable() <= item) {
return null;
}
// now take the specific item from the image stack
return this.images.element(item).getElement();
}
private int fillImagesCache() {
final ResultEntry re = oneResult(this.resultCounter, Math.max(3000, this.query.timeout - System.currentTimeMillis()));
this.resultCounter++;
final ResultEntry result = re;
int c = 0;
if (result == null) {
return c;
}
// iterate over all images in the result
final List<MediaSnippet> imagemedia = result.mediaSnippets();
if (imagemedia != null) {
ResponseHeader header;
feedloop: for (final MediaSnippet ms: imagemedia) {
// check cache to see if the mime type of the image url is correct
header = Cache.getResponseHeader(ms.href.hash());
if (header != null) {
// this does not work for all urls since some of them may not be in the cache
if (header.mime().startsWith("text") || header.mime().startsWith("application")) {
continue feedloop;
}
}
this.images.put(new ReverseElement<MediaSnippet>(ms, ms.ranking)); // remove smallest in case of overflow
c++;
//System.out.println("*** image " + UTF8.String(ms.href.hash()) + " images.size = " + images.size() + "/" + images.size());
}
}
return c;
}
public ArrayList<WeakPriorityBlockingQueue.Element<ResultEntry>> completeResults(final long waitingtime) {
final long timeout = System.currentTimeMillis() + waitingtime;
while ( this.result.sizeAvailable() < this.query.neededResults() &&
anyWorkerAlive() &&
System.currentTimeMillis() < timeout) {
try {Thread.sleep(10);} catch (final InterruptedException e) {}
//System.out.println("+++DEBUG-completeResults+++ sleeping " + 200);
}
return this.result.list(Math.min(this.query.neededResults(), this.result.sizeAvailable()));
}
private void deployWorker(int deployCount, final int neededResults) {
if (this.cleanupState ||
(this.rankingProcess.feedingIsFinished() && this.rankingProcess.rwiQueueSize() == 0 && this.nodeStack.sizeAvailable() == 0) ||
this.result.sizeAvailable() >= neededResults) {
return;
}
SnippetWorker worker;
if (this.workerThreads == null) {
this.workerThreads = new SnippetWorker[deployCount];
synchronized(this.workerThreads) {try {
for (int i = 0; i < this.workerThreads.length; i++) {
if (this.result.sizeAvailable() >= neededResults ||
(this.rankingProcess.feedingIsFinished() && this.rankingProcess.rwiQueueSize() == 0) && this.nodeStack.sizeAvailable() == 0) {
break;
}
worker = new SnippetWorker(this, this.query.maxtime, this.query.snippetCacheStrategy, neededResults);
worker.start();
this.workerThreads[i] = worker;
if (this.expectedRemoteReferences.get() > 0) {
long wait = this.waitTimeRecommendation();
if (wait > 0) {
try {Thread.sleep(wait);} catch ( InterruptedException e ) {}
}
}
}
} catch (OutOfMemoryError e) {}}
} else {
// there are still worker threads running, but some may be dead.
// if we find dead workers, reanimate them
synchronized(this.workerThreads) {
for (int i = 0; i < this.workerThreads.length; i++) {
if (deployCount <= 0 ||
this.result.sizeAvailable() >= neededResults ||
(this.rankingProcess.feedingIsFinished() && this.rankingProcess.rwiQueueSize() == 0) && this.nodeStack.sizeAvailable() == 0) {
break;
}
if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) {
worker = new SnippetWorker(this, this.query.maxtime, this.query.snippetCacheStrategy, neededResults);
worker.start();
this.workerThreads[i] = worker;
deployCount--;
}
if (this.expectedRemoteReferences.get() > 0) {
long wait = this.waitTimeRecommendation();
if (wait > 0) {
try {Thread.sleep(wait);} catch ( InterruptedException e ) {}
}
}
}
}
}
}
private void stopAllWorker() {
synchronized(this.workerThreads) {
for (int i = 0; i < this.workerThreads.length; i++) {
if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) {
continue;
}
this.workerThreads[i].pleaseStop();
this.workerThreads[i].interrupt();
}
}
}
private boolean anyWorkerAlive() {
if (this.workerThreads == null || this.workerThreads.length == 0) {
return false;
}
synchronized(this.workerThreads) {
for (final SnippetWorker workerThread : this.workerThreads) {
if ((workerThread != null) &&
(workerThread.isAlive()) &&
(workerThread.busytime() < 10000)) {
return true;
}
}
}
return false;
}
/**
* delete a specific entry from the search results
* this is used if the user clicks on a '-' sign beside the search result
* @param urlhash
* @return true if an entry was deleted, false otherwise
*/
protected boolean delete(final String urlhash) {
final Iterator<Element<ResultEntry>> i = this.result.iterator();
Element<ResultEntry> entry;
while (i.hasNext()) {
entry = i.next();
if (urlhash.equals(ASCII.String(entry.getElement().url().hash()))) {
i.remove();
return true;
}
}
return false;
}
}

@ -63,7 +63,7 @@ public class SearchEventCache {
public static boolean delete(final String urlhash) {
for (final SearchEvent event: lastEvents.values()) {
if (event.result().delete(urlhash)) return true;
if (event.delete(urlhash)) return true;
}
return false;
}

@ -1,376 +0,0 @@
// SearchEvent.java
// (C) 2005 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 10.10.2005 on http://yacy.net
//
// This is a part of YaCy, a peer-to-peer based web search engine
//
// $LastChangedDate$
// $LastChangedRevision$
// $LastChangedBy$
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package net.yacy.search.query;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import net.yacy.cora.document.ASCII;
import net.yacy.cora.protocol.ResponseHeader;
import net.yacy.cora.sorting.WeakPriorityBlockingQueue;
import net.yacy.cora.sorting.WeakPriorityBlockingQueue.Element;
import net.yacy.cora.sorting.WeakPriorityBlockingQueue.ReverseElement;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.crawler.data.Cache;
import net.yacy.data.WorkTables;
import net.yacy.kelondro.index.RowHandleSet;
import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.util.MemoryControl;
import net.yacy.peers.SeedDB;
import net.yacy.peers.graphics.ProfilingGraph;
import net.yacy.repository.LoaderDispatcher;
import net.yacy.search.EventTracker;
import net.yacy.search.Switchboard;
import net.yacy.search.snippet.MediaSnippet;
import net.yacy.search.snippet.ResultEntry;
public class SnippetProcess {
public static Log log = new Log("SEARCH");
public static final int SNIPPET_MAX_LENGTH = 220;
private final static int SNIPPET_WORKER_THREADS = Math.max(4, Runtime.getRuntime().availableProcessors() * 2);
// input values
final SearchEvent searchEvent;
QueryParams query;
final SeedDB peers;
final WorkTables workTables;
// result values
private final WeakPriorityBlockingQueue<MediaSnippet> images; // container to sort images by size
protected final WeakPriorityBlockingQueue<ResultEntry> result;
protected final LoaderDispatcher loader;
protected final HandleSet snippetFetchWordHashes; // a set of word hashes that are used to match with the snippets
protected final boolean deleteIfSnippetFail;
protected SnippetWorker[] workerThreads;
protected long urlRetrievalAllTime;
protected long snippetComputationAllTime;
private final boolean remote;
private boolean cleanupState;
protected SnippetProcess(
final SearchEvent searchEvent,
final LoaderDispatcher loader,
final QueryParams query,
final SeedDB peers,
final WorkTables workTables,
final boolean deleteIfSnippetFail,
final boolean remote) {
assert query != null;
this.searchEvent = searchEvent;
this.loader = loader;
this.query = query;
this.peers = peers;
this.workTables = workTables;
this.deleteIfSnippetFail = deleteIfSnippetFail;
this.remote = remote;
this.cleanupState = false;
this.urlRetrievalAllTime = 0;
this.snippetComputationAllTime = 0;
this.result = new WeakPriorityBlockingQueue<ResultEntry>(Math.max(1000, 10 * query.itemsPerPage()), true); // this is the result, enriched with snippets, ranked and ordered by ranking
this.images = new WeakPriorityBlockingQueue<MediaSnippet>(Math.max(1000, 10 * query.itemsPerPage()), true);
// snippets do not need to match with the complete query hashes,
// only with the query minus the stopwords which had not been used for the search
HandleSet filtered;
try {
filtered = RowHandleSet.joinConstructive(query.query_include_hashes, Switchboard.stopwordHashes);
} catch (final SpaceExceededException e) {
Log.logException(e);
filtered = new RowHandleSet(query.query_include_hashes.keylen(), query.query_include_hashes.comparator(), 0);
}
this.snippetFetchWordHashes = query.query_include_hashes.clone();
if (filtered != null && !filtered.isEmpty()) {
this.snippetFetchWordHashes.excludeDestructive(Switchboard.stopwordHashes);
}
// start worker threads to fetch urls and snippets
this.workerThreads = null;
deployWorker(Math.min(SNIPPET_WORKER_THREADS, query.itemsPerPage), query.neededResults());
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(query.id(true), SearchEventType.SNIPPETFETCH_START, ((this.workerThreads == null) ? "no" : this.workerThreads.length) + " online snippet fetch threads started", 0, 0), false);
}
protected void setCleanupState() {
this.cleanupState = true;
}
public long getURLRetrievalTime() {
return this.urlRetrievalAllTime;
}
public long getSnippetComputationTime() {
return this.snippetComputationAllTime;
}
protected ResultEntry oneResult(final int item, final long timeout) {
// check if we already retrieved this item
// (happens if a search pages is accessed a second time)
final long finishTime = System.currentTimeMillis() + timeout;
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "started, item = " + item + ", available = " + this.result.sizeAvailable(), 0, 0), false);
//Log.logInfo("SnippetProcess", "*start method for item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
// we must wait some time until the first result page is full to get enough elements for ranking
final long waittimeout = System.currentTimeMillis() + 300;
if (this.remote && item < 10 && !this.searchEvent.rankingProcess.feedingIsFinished()) {
// the first 10 results have a very special timing to get most of the remote results ordered
// before they are presented on the first lines .. yes sleeps seem to be bad. but how shall we predict how long other
// peers will take until they respond?
long sleep = item == 0 ? 400 : (10 - item) * 9; // the first result takes the longest time
//Log.logInfo("SnippetProcess", "SLEEP = " + sleep);
try { Thread.sleep(sleep); } catch (final InterruptedException e1) { Log.logException(e1); }
}
int thisRankingQueueSize, lastRankingQueueSize = 0;
if (item < 10) {
while (
((thisRankingQueueSize = this.searchEvent.rankingProcess.rwiQueueSize()) > 0 || !this.searchEvent.rankingProcess.feedingIsFinished()) &&
(thisRankingQueueSize > lastRankingQueueSize || this.result.sizeAvailable() < item + 1) &&
System.currentTimeMillis() < waittimeout &&
anyWorkerAlive()
) {
// wait a little time to get first results in the search
lastRankingQueueSize = thisRankingQueueSize;
try { Thread.sleep(20); } catch (final InterruptedException e1) {}
}
}
if (this.result.sizeAvailable() > item) {
// we have the wanted result already in the result array .. return that
final ResultEntry re = this.result.element(item).getElement();
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "prefetched, item = " + item + ", available = " + this.result.sizeAvailable() + ": " + re.urlstring(), 0, 0), false);
return re;
}
// finally wait until enough results are there produced from the snippet fetch process
WeakPriorityBlockingQueue.Element<ResultEntry> entry = null;
while (System.currentTimeMillis() < finishTime) {
Log.logInfo("SnippetProcess", "item = " + item + "; anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.searchEvent.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.searchEvent.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.searchEvent.rankingProcess.rwiQueueSize() + ", this.rankingProcess.nodeStack.sizeAvailable() = " + this.searchEvent.nodeStack.sizeAvailable());
if (!anyWorkerAlive() && !this.searchEvent.rankingProcess.isAlive() && this.result.sizeAvailable() + this.searchEvent.rankingProcess.rwiQueueSize() + this.searchEvent.nodeStack.sizeAvailable() <= item && this.searchEvent.rankingProcess.feedingIsFinished()) {
//Log.logInfo("SnippetProcess", "interrupted result fetching; item = " + item + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished());
break; // the fail case
}
// deploy worker to get more results
if (!anyWorkerAlive()) {
final int neededInclPrefetch = this.query.neededResults() + ((MemoryControl.available() > 100 * 1024 * 1024 && SNIPPET_WORKER_THREADS >= 8) ? this.query.itemsPerPage : 0);
deployWorker(Math.min(SNIPPET_WORKER_THREADS, this.query.itemsPerPage), neededInclPrefetch);
}
try {entry = this.result.element(item, 50);} catch (final InterruptedException e) {break;}
if (entry != null) { break; }
}
// finally, if there is something, return the result
if (entry == null) {
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "not found, item = " + item + ", available = " + this.result.sizeAvailable(), 0, 0), false);
//Log.logInfo("SnippetProcess", "NO ENTRY computed (possible timeout); anyWorkerAlive=" + anyWorkerAlive() + "; this.rankingProcess.isAlive() = " + this.rankingProcess.isAlive() + "; this.rankingProcess.feedingIsFinished() = " + this.rankingProcess.feedingIsFinished() + "; this.result.sizeAvailable() = " + this.result.sizeAvailable() + ", this.rankingProcess.sizeQueue() = " + this.rankingProcess.sizeQueue());
return null;
}
final ResultEntry re = entry.getElement();
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(this.query.id(true), SearchEventType.ONERESULT, "retrieved, item = " + item + ", available = " + this.result.sizeAvailable() + ": " + re.urlstring(), 0, 0), false);
if (item == this.query.offset + this.query.itemsPerPage - 1) {
stopAllWorker(); // we don't need more
}
return re;
}
private int resultCounter = 0;
private ResultEntry nextResult() {
final ResultEntry re = oneResult(this.resultCounter, Math.max(3000, this.query.timeout - System.currentTimeMillis()));
this.resultCounter++;
return re;
}
public MediaSnippet oneImage(final int item) {
// always look for a next object if there are way too less
if (this.images.sizeAvailable() <= item + 10) {
fillImagesCache();
}
// check if we already retrieved the item
if (this.images.sizeDrained() > item) {
return this.images.element(item).getElement();
}
// look again if there are not enough for presentation
while (this.images.sizeAvailable() <= item) {
if (fillImagesCache() == 0) {
break;
}
}
if (this.images.sizeAvailable() <= item) {
return null;
}
// now take the specific item from the image stack
return this.images.element(item).getElement();
}
private int fillImagesCache() {
final ResultEntry result = nextResult();
int c = 0;
if (result == null) {
return c;
}
// iterate over all images in the result
final List<MediaSnippet> imagemedia = result.mediaSnippets();
if (imagemedia != null) {
ResponseHeader header;
feedloop: for (final MediaSnippet ms: imagemedia) {
// check cache to see if the mime type of the image url is correct
header = Cache.getResponseHeader(ms.href.hash());
if (header != null) {
// this does not work for all urls since some of them may not be in the cache
if (header.mime().startsWith("text") || header.mime().startsWith("application")) {
continue feedloop;
}
}
this.images.put(new ReverseElement<MediaSnippet>(ms, ms.ranking)); // remove smallest in case of overflow
c++;
//System.out.println("*** image " + UTF8.String(ms.href.hash()) + " images.size = " + images.size() + "/" + images.size());
}
}
return c;
}
public ArrayList<WeakPriorityBlockingQueue.Element<ResultEntry>> completeResults(final long waitingtime) {
final long timeout = System.currentTimeMillis() + waitingtime;
while ( this.result.sizeAvailable() < this.query.neededResults() &&
anyWorkerAlive() &&
System.currentTimeMillis() < timeout) {
try {Thread.sleep(10);} catch (final InterruptedException e) {}
//System.out.println("+++DEBUG-completeResults+++ sleeping " + 200);
}
return this.result.list(Math.min(this.query.neededResults(), this.result.sizeAvailable()));
}
private void deployWorker(int deployCount, final int neededResults) {
if (this.cleanupState ||
(this.searchEvent.rankingProcess.feedingIsFinished() && this.searchEvent.rankingProcess.rwiQueueSize() == 0 && this.searchEvent.nodeStack.sizeAvailable() == 0) ||
this.result.sizeAvailable() >= neededResults) {
return;
}
SnippetWorker worker;
if (this.workerThreads == null) {
this.workerThreads = new SnippetWorker[deployCount];
synchronized(this.workerThreads) {try {
for (int i = 0; i < this.workerThreads.length; i++) {
if (this.result.sizeAvailable() >= neededResults ||
(this.searchEvent.rankingProcess.feedingIsFinished() && this.searchEvent.rankingProcess.rwiQueueSize() == 0) && this.searchEvent.nodeStack.sizeAvailable() == 0) {
break;
}
worker = new SnippetWorker(this, this.query.maxtime, this.query.snippetCacheStrategy, neededResults);
worker.start();
this.workerThreads[i] = worker;
if (this.searchEvent.expectMoreRemoteReferences()) {
long wait = this.searchEvent.waitTimeRecommendation();
if (wait > 0) {
try {Thread.sleep(wait);} catch ( InterruptedException e ) {}
}
}
}
} catch (OutOfMemoryError e) {}}
} else {
// there are still worker threads running, but some may be dead.
// if we find dead workers, reanimate them
synchronized(this.workerThreads) {
for (int i = 0; i < this.workerThreads.length; i++) {
if (deployCount <= 0 ||
this.result.sizeAvailable() >= neededResults ||
(this.searchEvent.rankingProcess.feedingIsFinished() && this.searchEvent.rankingProcess.rwiQueueSize() == 0) && this.searchEvent.nodeStack.sizeAvailable() == 0) {
break;
}
if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) {
worker = new SnippetWorker(this, this.query.maxtime, this.query.snippetCacheStrategy, neededResults);
worker.start();
this.workerThreads[i] = worker;
deployCount--;
}
if (this.searchEvent.expectMoreRemoteReferences()) {
long wait = this.searchEvent.waitTimeRecommendation();
if (wait > 0) {
try {Thread.sleep(wait);} catch ( InterruptedException e ) {}
}
}
}
}
}
}
private void stopAllWorker() {
synchronized(this.workerThreads) {
for (int i = 0; i < this.workerThreads.length; i++) {
if (this.workerThreads[i] == null || !this.workerThreads[i].isAlive()) {
continue;
}
this.workerThreads[i].pleaseStop();
this.workerThreads[i].interrupt();
}
}
}
private boolean anyWorkerAlive() {
if (this.workerThreads == null || this.workerThreads.length == 0) {
return false;
}
synchronized(this.workerThreads) {
for (final SnippetWorker workerThread : this.workerThreads) {
if ((workerThread != null) &&
(workerThread.isAlive()) &&
(workerThread.busytime() < 10000)) {
return true;
}
}
}
return false;
}
/**
* delete a specific entry from the search results
* this is used if the user clicks on a '-' sign beside the search result
* @param urlhash
* @return true if an entry was deleted, false otherwise
*/
protected boolean delete(final String urlhash) {
final Iterator<Element<ResultEntry>> i = this.result.iterator();
Element<ResultEntry> entry;
while (i.hasNext()) {
entry = i.next();
if (urlhash.equals(ASCII.String(entry.getElement().url().hash()))) {
i.remove();
return true;
}
}
return false;
}
}

@ -38,14 +38,14 @@ import net.yacy.search.snippet.ResultEntry;
import net.yacy.search.snippet.TextSnippet;
public class SnippetWorker extends Thread {
private final SnippetProcess snippetProcess;
private final SearchEvent snippetProcess;
private final long timeout; // the date until this thread should try to work
private long lastLifeSign; // when the last time the run()-loop was executed
private final CacheStrategy cacheStrategy;
private final int neededResults;
private boolean shallrun;
public SnippetWorker(final SnippetProcess snippetProcess, final long maxlifetime, final CacheStrategy cacheStrategy, final int neededResults) {
public SnippetWorker(final SearchEvent snippetProcess, final long maxlifetime, final CacheStrategy cacheStrategy, final int neededResults) {
this.snippetProcess = snippetProcess;
this.cacheStrategy = cacheStrategy;
this.lastLifeSign = System.currentTimeMillis();
@ -78,13 +78,13 @@ public class SnippetWorker extends Thread {
}
// check if we can succeed if we try to take another url
if (this.snippetProcess.searchEvent.rankingProcess.feedingIsFinished() && this.snippetProcess.searchEvent.rankingProcess.rwiQueueSize() == 0 && this.snippetProcess.searchEvent.nodeStack.sizeAvailable() == 0) {
if (this.snippetProcess.rankingProcess.feedingIsFinished() && this.snippetProcess.rankingProcess.rwiQueueSize() == 0 && this.snippetProcess.nodeStack.sizeAvailable() == 0) {
Log.logWarning("SnippetProcess", "rankingProcess.feedingIsFinished() && rankingProcess.sizeQueue() == 0");
break;
}
// get next entry
page = this.snippetProcess.searchEvent.takeURL(true, Math.min(500, Math.max(20, this.timeout - System.currentTimeMillis())));
page = this.snippetProcess.takeURL(true, Math.min(500, Math.max(20, this.timeout - System.currentTimeMillis())));
//if (page != null) Log.logInfo("SnippetProcess", "got one page: " + page.metadata().url().toNormalform(true, false));
//if (page == null) page = rankedCache.takeURL(false, this.timeout - System.currentTimeMillis());
if (page == null) {
@ -111,12 +111,12 @@ public class SnippetWorker extends Thread {
// place the result to the result vector
// apply post-ranking
long ranking = resultEntry.word() == null ? 0 : Long.valueOf(this.snippetProcess.searchEvent.rankingProcess.order.cardinal(resultEntry.word()));
ranking += postRanking(resultEntry, this.snippetProcess.searchEvent.rankingProcess.getTopicNavigator(10));
long ranking = resultEntry.word() == null ? 0 : Long.valueOf(this.snippetProcess.rankingProcess.order.cardinal(resultEntry.word()));
ranking += postRanking(resultEntry, this.snippetProcess.rankingProcess.getTopicNavigator(10));
resultEntry.ranking = ranking;
this.snippetProcess.result.put(new ReverseElement<ResultEntry>(resultEntry, ranking)); // remove smallest in case of overflow
if (nav_topics) {
this.snippetProcess.searchEvent.rankingProcess.addTopics(resultEntry);
this.snippetProcess.rankingProcess.addTopics(resultEntry);
}
}
if (System.currentTimeMillis() >= this.timeout) {
@ -135,7 +135,7 @@ public class SnippetWorker extends Thread {
* calculate the time since the worker has had the latest activity
* @return time in milliseconds lasted since latest activity
*/
public long busytime() {
protected long busytime() {
return System.currentTimeMillis() - this.lastLifeSign;
}
@ -223,7 +223,7 @@ public class SnippetWorker extends Thread {
//this.query.queryString,
null,
((this.snippetProcess.query.constraint != null) && (this.snippetProcess.query.constraint.get(Condenser.flag_cat_indexof))),
SnippetProcess.SNIPPET_MAX_LENGTH,
SearchEvent.SNIPPET_MAX_LENGTH,
!this.snippetProcess.query.isLocal());
return new ResultEntry(page, this.snippetProcess.query.getSegment(), this.snippetProcess.peers, snippet, null, dbRetrievalTime, 0); // result without snippet
}
@ -242,7 +242,7 @@ public class SnippetWorker extends Thread {
180,
!this.snippetProcess.query.isLocal());
final long snippetComputationTime = System.currentTimeMillis() - startTime;
SnippetProcess.log.logInfo("text snippet load time for " + page.url() + ": " + snippetComputationTime + ", " + (!snippet.getErrorCode().fail() ? "snippet found" : ("no snippet found (" + snippet.getError() + ")")));
SearchEvent.log.logInfo("text snippet load time for " + page.url() + ": " + snippetComputationTime + ", " + (!snippet.getErrorCode().fail() ? "snippet found" : ("no snippet found (" + snippet.getError() + ")")));
if (!snippet.getErrorCode().fail()) {
// we loaded the file and found the snippet
@ -261,7 +261,7 @@ public class SnippetWorker extends Thread {
if (this.snippetProcess.deleteIfSnippetFail) {
this.snippetProcess.workTables.failURLsRegisterMissingWord(this.snippetProcess.query.getSegment().termIndex(), page.url(), this.snippetProcess.query.query_include_hashes, reason);
}
SnippetProcess.log.logInfo("sorted out url " + page.url().toNormalform(true) + " during search: " + reason);
SearchEvent.log.logInfo("sorted out url " + page.url().toNormalform(true) + " during search: " + reason);
return null;
}
}

Loading…
Cancel
Save