new handling of remote search processes: looking for seeds will now not

block the whole search process any more. A deadlock with a DHT selection
process may have been the cause for interface lockings in the past.
pull/1/head
Michael Christen 13 years ago
parent cc5c00cc0f
commit 0797b0de99

@ -25,6 +25,7 @@
package net.yacy.peers;
import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.regex.Pattern;
@ -113,6 +114,7 @@ public class RemoteSearch extends Thread {
@Override
public void run() {
this.containerCache.oneFeederStarted();
try {
this.urls = Protocol.search(
this.peers.mySeed(),
@ -156,7 +158,8 @@ public class RemoteSearch extends Thread {
return this.targetPeer;
}
public static RemoteSearch[] primaryRemoteSearches(
public static void primaryRemoteSearches(
final List<RemoteSearch> searchThreads,
final String wordhashes, final String excludehashes,
final Pattern prefer, final Pattern filter, final Pattern snippet,
final QueryParams.Modifier modifier,
@ -189,25 +192,24 @@ public class RemoteSearch extends Thread {
burstRobinsonPercent,
burstMultiwordPercent)
: PeerSelection.selectClusterPeers(peers, clusterselection);
if (targetPeers == null) return new RemoteSearch[0];
if (targetPeers == null) return;
final int targets = targetPeers.length;
if (targets == 0) return new RemoteSearch[0];
final RemoteSearch[] searchThreads = new RemoteSearch[targets];
if (targets == 0) return;
for (int i = 0; i < targets; i++) {
if (targetPeers[i] == null || targetPeers[i].hash == null) continue;
try {
searchThreads[i] = new RemoteSearch(
RemoteSearch rs = new RemoteSearch(
wordhashes, excludehashes, "", prefer, filter, snippet, modifier,
language, sitehash, authorhash,
count, time, maxDist, true, targets, targetPeers[i],
indexSegment, peers, containerCache, secondarySearchSuperviser, blacklist, rankingProfile, constraint);
searchThreads[i].start();
rs.start();
searchThreads.add(rs);
} catch (final OutOfMemoryError e) {
Log.logException(e);
break;
}
}
return searchThreads;
}
public static RemoteSearch secondaryRemoteSearch(

@ -32,6 +32,7 @@ import java.awt.RenderingHints;
import java.awt.image.BufferedImage;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import net.yacy.cora.document.ASCII;
import net.yacy.cora.document.Hit;
@ -119,7 +120,7 @@ public class NetworkGraph {
public static RasterPlotter getSearchEventPicture(final SeedDB seedDB, final String eventID, final int coronaangle, final int cyc) {
final SearchEvent event = SearchEventCache.getEvent(eventID);
if (event == null) return null;
final RemoteSearch[] primarySearches = event.getPrimarySearchThreads();
final List<RemoteSearch> primarySearches = event.getPrimarySearchThreads();
final RemoteSearch[] secondarySearches = event.getSecondarySearchThreads();
if (primarySearches == null) return null; // this was a local search and there are no threads

@ -145,6 +145,7 @@ import net.yacy.repository.LoaderDispatcher;
import net.yacy.search.index.Segment;
import net.yacy.search.index.Segments;
import net.yacy.search.query.AccessTracker;
import net.yacy.search.query.QueryParams;
import net.yacy.search.query.SearchEvent;
import net.yacy.search.query.SearchEventCache;
import net.yacy.search.ranking.BlockRank;
@ -2553,21 +2554,25 @@ public final class Switchboard extends serverSwitch {
}
final Map<MultiProtocolURI, String> links;
searchEvent.getRankingResult().oneFeederStarted();
try {
links = Switchboard.this.loader.loadLinks(url, CacheStrategy.NOCACHE);
} catch (final IOException e) {
Log.logException(e);
return;
}
final Iterator<MultiProtocolURI> i = links.keySet().iterator();
while (i.hasNext()) {
if (!i.next().getHost().endsWith(host)) {
i.remove();
if (links != null) {
final Iterator<MultiProtocolURI> i = links.keySet().iterator();
while (i.hasNext()) {
if (!i.next().getHost().endsWith(host)) {
i.remove();
}
}
// add all pages to the index
addAllToIndex(url, links, searchEvent, "site");
}
} catch (final Throwable e) {
Log.logException(e);
} finally {
searchEvent.getRankingResult().oneFeederTerminated();
}
// add all pages to the index
addAllToIndex(url, links, searchEvent, "site");
}
}.start();
}
@ -2576,13 +2581,14 @@ public final class Switchboard extends serverSwitch {
new Thread() {
@Override
public void run() {
String query = searchEvent.getQuery().queryString(true);
final int meta = query.indexOf("heuristic:",0);
QueryParams query = searchEvent.getQuery();
String queryString = query.queryString(true);
final int meta = queryString.indexOf("heuristic:",0);
if (meta >= 0) {
final int q = query.indexOf(' ', meta);
query = (q >= 0) ? query.substring(0, meta) + query.substring(q + 1) : query.substring(0, meta);
final int q = queryString.indexOf(' ', meta);
queryString = (q >= 0) ? queryString.substring(0, meta) + queryString.substring(q + 1) : queryString.substring(0, meta);
}
final String urlString = "http://www.scroogle.org/cgi-bin/nbbw.cgi?Gw=" + query.trim().replaceAll(" ", "+") + "&n=2";
final String urlString = "http://www.scroogle.org/cgi-bin/nbbw.cgi?Gw=" + queryString.trim().replaceAll(" ", "+") + "&n=2";
final DigestURI url;
try {
url = new DigestURI(MultiProtocolURI.unescape(urlString));
@ -2592,21 +2598,25 @@ public final class Switchboard extends serverSwitch {
}
Map<MultiProtocolURI, String> links = null;
searchEvent.getRankingResult().oneFeederStarted();
try {
links = Switchboard.this.loader.loadLinks(url, CacheStrategy.NOCACHE);
} catch (final IOException e) {
//Log.logException(e);
return;
}
final Iterator<MultiProtocolURI> i = links.keySet().iterator();
while (i.hasNext()) {
if (i.next().toNormalform(false, false).indexOf("scroogle",0) >= 0) {
i.remove();
if (links != null) {
final Iterator<MultiProtocolURI> i = links.keySet().iterator();
while (i.hasNext()) {
if (i.next().toNormalform(false, false).indexOf("scroogle",0) >= 0) {
i.remove();
}
}
Switchboard.this.log.logInfo("Heuristic: adding " + links.size() + " links from scroogle");
// add all pages to the index
addAllToIndex(null, links, searchEvent, "scroogle");
}
} catch (final Throwable e) {
//Log.logException(e);
} finally {
searchEvent.getRankingResult().oneFeederTerminated();
}
Switchboard.this.log.logInfo("Heuristic: adding " + links.size() + " links from scroogle");
// add all pages to the index
addAllToIndex(null, links, searchEvent, "scroogle");
}
}.start();
}
@ -2618,14 +2628,15 @@ public final class Switchboard extends serverSwitch {
new Thread() {
@Override
public void run() {
String query = searchEvent.getQuery().queryString(true);
final int meta = query.indexOf("heuristic:",0);
QueryParams query = searchEvent.getQuery();
String queryString = query.queryString(true);
final int meta = queryString.indexOf("heuristic:",0);
if (meta >= 0) {
final int q = query.indexOf(' ', meta);
if (q >= 0) query = query.substring(0, meta) + query.substring(q + 1); else query = query.substring(0, meta);
final int q = queryString.indexOf(' ', meta);
if (q >= 0) queryString = queryString.substring(0, meta) + queryString.substring(q + 1); else queryString = queryString.substring(0, meta);
}
final String urlString = urlpattern.substring(0, p) + query.trim().replaceAll(" ", "+") + urlpattern.substring(p + 1);
final String urlString = urlpattern.substring(0, p) + queryString.trim().replaceAll(" ", "+") + urlpattern.substring(p + 1);
final DigestURI url;
try {
url = new DigestURI(MultiProtocolURI.unescape(urlString));
@ -2636,30 +2647,30 @@ public final class Switchboard extends serverSwitch {
// if we have an url then try to load the rss
RSSReader rss = null;
searchEvent.getRankingResult().oneFeederStarted();
try {
final Response response = sb.loader.load(sb.loader.request(url, true, false), CacheStrategy.NOCACHE, true);
final byte[] resource = (response == null) ? null : response.getContent();
//System.out.println("BLEKKO: " + UTF8.String(resource));
rss = resource == null ? null : RSSReader.parse(RSSFeed.DEFAULT_MAXSIZE, resource);
} catch (final IOException e) {
Log.logException(e);
}
if (rss == null) {
Log.logInfo("heuristicRSS", "rss result not parsed from " + feedName);
return;
}
if (rss != null) {
final Map<MultiProtocolURI, String> links = new TreeMap<MultiProtocolURI, String>();
MultiProtocolURI uri;
for (final RSSMessage message: rss.getFeed()) try {
uri = new MultiProtocolURI(message.getLink());
links.put(uri, message.getTitle());
} catch (final MalformedURLException e) {
}
final Map<MultiProtocolURI, String> links = new TreeMap<MultiProtocolURI, String>();
MultiProtocolURI uri;
for (final RSSMessage message: rss.getFeed()) try {
uri = new MultiProtocolURI(message.getLink());
links.put(uri, message.getTitle());
} catch (final MalformedURLException e) {
Log.logInfo("heuristicRSS", "Heuristic: adding " + links.size() + " links from '" + feedName + "' rss feed");
// add all pages to the index
addAllToIndex(null, links, searchEvent, feedName);
}
} catch (final Throwable e) {
//Log.logException(e);
} finally {
searchEvent.getRankingResult().oneFeederTerminated();
}
Log.logInfo("heuristicRSS", "Heuristic: adding " + links.size() + " links from '" + feedName + "' rss feed");
// add all pages to the index
addAllToIndex(null, links, searchEvent, feedName);
}
}.start();
}

@ -218,7 +218,6 @@ public class Segment {
final SearchEvent searchEvent,
final String sourceName) {
final RWIProcess rankingProcess = (searchEvent == null) ? null : searchEvent.getRankingResult();
if (rankingProcess != null) rankingProcess.moreFeeders(1);
int wordCount = 0;
final int urlLength = url.toNormalform(true, true).length();
final int urlComps = MultiProtocolURI.urlComps(url.toString()).length;
@ -264,7 +263,6 @@ public class Segment {
}
}
}
if (rankingProcess != null) rankingProcess.oneFeederTerminated();
return wordCount;
}
@ -373,12 +371,11 @@ public class Segment {
condenser, // document condenser
language, // document language
Response.docType(document.dc_format()), // document type
document.inboundLinkCount(), // inbound links
document.outboundLinkCount(), // outbound links
document.inboundLinkCount(), // inbound links
document.outboundLinkCount(), // outbound links
searchEvent, // a search event that can have results directly
sourceName // the name of the source where the index was created
);
final long indexingEndTime = System.currentTimeMillis();
if (this.log.isInfo()) {

@ -169,7 +169,8 @@ public final class RWIProcess extends Thread
@Override
public void run() {
// do a search
oneFeederStarted();
// sort the local containers and truncate it to a limited count,
// so following sortings together with the global results will be fast
try {
@ -385,8 +386,8 @@ public final class RWIProcess extends Thread
assert c >= 0 : "feeders = " + c;
}
public void moreFeeders(final int countMoreFeeders) {
this.feeders.addAndGet(countMoreFeeders);
public void oneFeederStarted() {
this.feeders.addAndGet(1);
}
public boolean feedingIsFinished() {

@ -26,8 +26,10 @@
package net.yacy.search.query;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
@ -93,7 +95,8 @@ public final class SearchEvent
private final SecondarySearchSuperviser secondarySearchSuperviser;
// class variables for remote searches
private RemoteSearch[] primarySearchThreads, secondarySearchThreads;
private final List<RemoteSearch> primarySearchThreadsL;
private RemoteSearch[] secondarySearchThreads;
private final SortedMap<byte[], String> preselectedPeerHashes;
private final Thread localSearchThread;
private final SortedMap<byte[], Integer> IACount;
@ -126,7 +129,6 @@ public final class SearchEvent
if ( this.secondarySearchSuperviser != null ) {
this.secondarySearchSuperviser.start();
}
this.primarySearchThreads = null;
this.secondarySearchThreads = null;
this.preselectedPeerHashes = preselectedPeerHashes;
this.IAResults = new TreeMap<byte[], String>(Base64Order.enhancedCoder);
@ -153,49 +155,61 @@ public final class SearchEvent
if ( remote ) {
// start global searches
final long timer = System.currentTimeMillis();
this.primarySearchThreads =
(this.query.queryHashes.isEmpty()) ? null : RemoteSearch.primaryRemoteSearches(
QueryParams.hashSet2hashString(this.query.queryHashes),
QueryParams.hashSet2hashString(this.query.excludeHashes),
this.query.prefer,
this.query.urlMask,
this.query.snippetMatcher,
this.query.modifier,
this.query.targetlang == null ? "" : this.query.targetlang,
this.query.sitehash == null ? "" : this.query.sitehash,
this.query.authorhash == null ? "" : this.query.authorhash,
remote_maxcount,
remote_maxtime,
this.query.maxDistance,
this.query.getSegment(),
peers,
this.rankingProcess,
this.secondarySearchSuperviser,
Switchboard.urlBlacklist,
this.query.ranking,
this.query.constraint,
(this.query.domType == QueryParams.Searchdom.GLOBAL) ? null : preselectedPeerHashes,
burstRobinsonPercent,
burstMultiwordPercent);
if ( this.primarySearchThreads != null ) {
if (this.query.queryHashes.isEmpty()) {
this.primarySearchThreadsL = null;
} else {
this.primarySearchThreadsL = new ArrayList<RemoteSearch>();
// start this concurrently because the remote search needs an enumeration
// of the remote peers which may block in some cases when i.e. DHT is active
// at the same time.
new Thread() {
@Override
public void run() {
RemoteSearch.primaryRemoteSearches(
SearchEvent.this.primarySearchThreadsL,
QueryParams.hashSet2hashString(SearchEvent.this.query.queryHashes),
QueryParams.hashSet2hashString(SearchEvent.this.query.excludeHashes),
SearchEvent.this.query.prefer,
SearchEvent.this.query.urlMask,
SearchEvent.this.query.snippetMatcher,
SearchEvent.this.query.modifier,
SearchEvent.this.query.targetlang == null ? "" : SearchEvent.this.query.targetlang,
SearchEvent.this.query.sitehash == null ? "" : SearchEvent.this.query.sitehash,
SearchEvent.this.query.authorhash == null ? "" : SearchEvent.this.query.authorhash,
remote_maxcount,
remote_maxtime,
SearchEvent.this.query.maxDistance,
SearchEvent.this.query.getSegment(),
peers,
SearchEvent.this.rankingProcess,
SearchEvent.this.secondarySearchSuperviser,
Switchboard.urlBlacklist,
SearchEvent.this.query.ranking,
SearchEvent.this.query.constraint,
(SearchEvent.this.query.domType == QueryParams.Searchdom.GLOBAL) ? null : preselectedPeerHashes,
burstRobinsonPercent,
burstMultiwordPercent);
}
}.start();
}
if ( this.primarySearchThreadsL != null ) {
Log.logFine("SEARCH_EVENT", "STARTING "
+ this.primarySearchThreads.length
+ this.primarySearchThreadsL.size()
+ " THREADS TO CATCH EACH "
+ remote_maxcount
+ " URLs");
this.rankingProcess.moreFeeders(this.primarySearchThreads.length);
EventTracker.update(
EventTracker.EClass.SEARCH,
new ProfilingGraph.EventSearch(
this.query.id(true),
Type.REMOTESEARCH_START,
"",
this.primarySearchThreads.length,
this.primarySearchThreadsL.size(),
System.currentTimeMillis() - timer),
false);
// finished searching
Log.logFine("SEARCH_EVENT", "SEARCH TIME AFTER GLOBAL-TRIGGER TO "
+ this.primarySearchThreads.length
+ this.primarySearchThreadsL.size()
+ " PEERS: "
+ ((System.currentTimeMillis() - start) / 1000)
+ " seconds");
@ -204,6 +218,7 @@ public final class SearchEvent
Log.logFine("SEARCH_EVENT", "NO SEARCH STARTED DUE TO EMPTY SEARCH REQUEST.");
}
} else {
this.primarySearchThreadsL = null;
if ( generateAbstracts ) {
// we need the results now
try {
@ -313,8 +328,8 @@ public final class SearchEvent
this.resultFetcher.setCleanupState();
// stop all threads
if ( this.primarySearchThreads != null ) {
for ( final RemoteSearch search : this.primarySearchThreads ) {
if ( this.primarySearchThreadsL != null ) {
for ( final RemoteSearch search : this.primarySearchThreadsL ) {
if ( search != null ) {
synchronized ( search ) {
if ( search.isAlive() ) {
@ -400,8 +415,8 @@ public final class SearchEvent
boolean anyRemoteSearchAlive() {
// check primary search threads
if ( (this.primarySearchThreads != null) && (this.primarySearchThreads.length != 0) ) {
for ( final RemoteSearch primarySearchThread : this.primarySearchThreads ) {
if ( (this.primarySearchThreadsL != null) && (this.primarySearchThreadsL.size() != 0) ) {
for ( final RemoteSearch primarySearchThread : this.primarySearchThreadsL ) {
if ( (primarySearchThread != null) && (primarySearchThread.isAlive()) ) {
return true;
}
@ -418,8 +433,8 @@ public final class SearchEvent
return false;
}
public RemoteSearch[] getPrimarySearchThreads() {
return this.primarySearchThreads;
public List<RemoteSearch> getPrimarySearchThreads() {
return this.primarySearchThreadsL;
}
public RemoteSearch[] getSecondarySearchThreads() {
@ -688,7 +703,6 @@ public final class SearchEvent
}
assert words.length() >= 12 : "words = " + words;
//System.out.println("DEBUG-INDEXABSTRACT ***: peer " + peer + " has urls: " + urls + " from words: " + words);
SearchEvent.this.rankingProcess.moreFeeders(1);
this.checkedPeers.add(peer);
SearchEvent.this.secondarySearchThreads[c++] =
RemoteSearch.secondaryRemoteSearch(

@ -172,9 +172,9 @@ public class SearchEventCache {
if (countAliveThreads() < allowedThreads) break throttling;
// finally we just wait some time until we get access
Log.logWarning("SearchEventCache", "throttling phase 3: " + SearchEventCache.lastEvents.size() + " in cache; " + countAliveThreads() + " alive; " + allowedThreads + " allowed");
try { Thread.sleep(100); } catch (final InterruptedException e) { }
try { Thread.sleep(200); } catch (final InterruptedException e) { }
waitcount++;
if (waitcount >= 10) return getDummyEvent(workTables, loader, query.getSegment());
if (waitcount >= 100) return getDummyEvent(workTables, loader, query.getSegment());
}
if (waitcount > 0) {

Loading…
Cancel
Save