Merge branch 'master' of ssh://

Michael Peter Christen 12 years ago
commit f327ffedb4

@ -127,7 +127,8 @@ public class RemoteSearch extends Thread {
public static void primaryRemoteSearches(
final SearchEvent event,
final int count, final long time,
final int start, final int count,
final long time,
final Blacklist blacklist,
final SortedMap<byte[], String> clusterselection,
final int burstRobinsonPercent,
@ -167,7 +168,7 @@ public class RemoteSearch extends Thread {
// start solr searches
for (Seed s: nodePeers) {
solrRemoteSearch(event, count, s, blacklist);
solrRemoteSearch(event, start, count, s, blacklist);
// start search to YaCy DHT peers
@ -252,6 +253,7 @@ public class RemoteSearch extends Thread {
public static Thread solrRemoteSearch(
final SearchEvent event,
final int start,
final int count,
final Seed targetPeer,
final Blacklist blacklist) {
@ -263,17 +265,14 @@ public class RemoteSearch extends Thread {
Thread solr = new Thread() {
public void run() {
int tmpoffset = 0;
int tmpcount = 10;
while (tmpoffset + tmpcount <= count && tmpcount > 0) {
int urls = 0;
try {
urls = Protocol.solrQuery(
tmpoffset == 0,
start == 0,
if (urls >= 0) {
@ -290,10 +289,6 @@ public class RemoteSearch extends Thread {
} finally {
if (urls < tmpcount) break; // there won't be more
tmpoffset += tmpcount;
tmpcount = targetPeer == null ? 10 : count - tmpoffset;
/*if (targetPeer == null); else*/ solr.start();

@ -123,7 +123,9 @@ public final class SearchEvent {
private final SortedMap<byte[], String> IAResults;
private final SortedMap<byte[], HeuristicResult> heuristics;
private byte[] IAmaxcounthash, IAneardhthash;
public Thread rwiProcess, localsearch;
public Thread rwiProcess;
private Thread localsolrsearch;
private int localsolroffset;
private final AtomicInteger expectedRemoteReferences, maxExpectedRemoteReferences; // counter for referenced that had been sorted out for other reasons
public final ScoreMap<String> hostNavigator; // a counter for the appearance of host names
public final ScoreMap<String> authorNavigator; // a counter for the appearances of authors
@ -135,7 +137,7 @@ public final class SearchEvent {
private final LoaderDispatcher loader;
private final HandleSet snippetFetchWordHashes; // a set of word hashes that are used to match with the snippets
private final boolean deleteIfSnippetFail;
private long urlRetrievalAllTime;
private long urlRetrievalAllTime;
private long snippetComputationAllTime;
private ConcurrentHashMap<String, String> snippets;
private final boolean remote;
@ -159,15 +161,16 @@ public final class SearchEvent {
public final AtomicInteger local_rwi_stored; // the number of existing hits by the local search in rwi index
public final AtomicInteger remote_rwi_available; // the number of hits imported from remote peers (rwi/solr mixed)
public final AtomicInteger remote_rwi_stored; // the number of existing hits at remote site
public final AtomicInteger remote_rwi_peerCount; // the number of peers which contributed to the remote search result
public final AtomicInteger remote_rwi_peerCount; // the number of peers which contributed to the remote search result
public final AtomicInteger local_solr_available; // the number of hits generated/ranked by the local search in solr
public final AtomicInteger local_solr_stored; // the number of existing hits by the local search in solr
public final AtomicInteger remote_solr_available;// the number of hits imported from remote peers (rwi/solr mixed)
public final AtomicInteger remote_solr_stored; // the number of existing hits at remote site
public final AtomicInteger remote_solr_peerCount;// the number of peers which contributed to the remote search result
public final AtomicInteger remote_solr_peerCount;// the number of peers which contributed to the remote search result
public int getResultCount() {
return this.local_rwi_available.get() + local_solr_stored.get();
return this.local_rwi_available.get() + this.remote_rwi_available.get() +
this.remote_solr_available.get() + this.local_solr_stored.get();
protected SearchEvent(
@ -252,8 +255,9 @@ public final class SearchEvent {
// start a local solr search
this.localsearch = RemoteSearch.solrRemoteSearch(this, 100, null /*this peer*/, Switchboard.urlBlacklist);
this.localsolrsearch = RemoteSearch.solrRemoteSearch(this, 0, this.query.itemsPerPage, null /*this peer*/, Switchboard.urlBlacklist);
this.localsolroffset = this.query.itemsPerPage;
// start a local RWI search concurrently
this.rwiProcess = null;
if (query.getSegment().connectedRWI() && (!this.remote || this.peers.mySeed().getBirthdate() < noRobinsonLocalRWISearch)) {
@ -278,7 +282,7 @@ public final class SearchEvent {
0, remote_maxcount,
(SearchEvent.this.query.domType == QueryParams.Searchdom.GLOBAL) ? null : preselectedPeerHashes,
@ -486,8 +490,11 @@ public final class SearchEvent {
assert (iEntry.urlhash().length == index.row().primaryKeyLength);
// doublecheck for urls
if (this.urlhashes.has(iEntry.urlhash())) continue pollloop;
if (this.urlhashes.has(iEntry.urlhash())) {
if (log.isFine()) log.logFine("dropped RWI: doublecheck");
continue pollloop;
// increase flag counts
Bitfield flags = iEntry.flags();
for (int j = 0; j < 32; j++) {
@ -495,7 +502,10 @@ public final class SearchEvent {
// check constraints
if (!this.testFlags(flags)) continue pollloop;
if (!this.testFlags(flags)) {
if (log.isFine()) log.logFine("dropped RWI: flag test failed");
continue pollloop;
// check document domain
if (this.query.contentdom.getCode() > 0 &&
@ -503,6 +513,7 @@ public final class SearchEvent {
(this.query.contentdom == ContentDomain.VIDEO && !(flags.get(Condenser.flag_cat_hasvideo))) ||
(this.query.contentdom == ContentDomain.IMAGE && !(flags.get(Condenser.flag_cat_hasimage))) ||
(this.query.contentdom == ContentDomain.APP && !(flags.get(Condenser.flag_cat_hasapp))))) {
if (log.isFine()) log.logFine("dropped RWI: contentdom fail");
continue pollloop;
@ -512,10 +523,16 @@ public final class SearchEvent {
// check site constraints
final String hosthash = iEntry.hosthash();
if ( this.query.modifier.sitehash == null ) {
if (this.query.siteexcludes != null && this.query.siteexcludes.contains(hosthash)) continue pollloop;
if (this.query.siteexcludes != null && this.query.siteexcludes.contains(hosthash)) {
if (log.isFine()) log.logFine("dropped RWI: siteexcludes");
continue pollloop;
} else {
// filter out all domains that do not match with the site constraint
if (!hosthash.equals(this.query.modifier.sitehash)) continue pollloop;
if (!hosthash.equals(this.query.modifier.sitehash)) {
if (log.isFine()) log.logFine("dropped RWI: modifier.sitehash");
continue pollloop;
// finally extend the double-check and insert result to stack
@ -526,6 +543,7 @@ public final class SearchEvent {
break rankingtryloop;
} catch ( final ArithmeticException e ) {
// this may happen if the concurrent normalizer changes values during cardinal computation
if (log.isFine()) log.logFine("dropped RWI: arithmetic exception");
continue rankingtryloop;
@ -735,12 +753,14 @@ public final class SearchEvent {
if ( !this.query.urlMask_isCatchall ) {
// check url mask
if (!iEntry.matches(this.query.urlMask)) {
if (log.isFine()) log.logFine("dropped Node: url mask does not match");
continue pollloop;
// doublecheck for urls
if (this.urlhashes.has(iEntry.hash())) {
if (log.isFine()) log.logFine("dropped Node: double check");
continue pollloop;
@ -751,7 +771,10 @@ public final class SearchEvent {
// check constraints
Bitfield flags = iEntry.flags();
if (!this.testFlags(flags)) continue pollloop;
if (!this.testFlags(flags)) {
if (log.isFine()) log.logFine("dropped Node: flag test");
continue pollloop;
// check document domain
if (this.query.contentdom.getCode() > 0 &&
@ -759,6 +782,7 @@ public final class SearchEvent {
(this.query.contentdom == ContentDomain.VIDEO && !(flags.get(Condenser.flag_cat_hasvideo))) ||
(this.query.contentdom == ContentDomain.IMAGE && !(flags.get(Condenser.flag_cat_hasimage))) ||
(this.query.contentdom == ContentDomain.APP && !(flags.get(Condenser.flag_cat_hasapp))))) {
if (log.isFine()) log.logFine("dropped Node: content domain does not match");
continue pollloop;
@ -766,11 +790,15 @@ public final class SearchEvent {
final String hosthash = iEntry.hosthash();
if ( this.query.modifier.sitehash == null ) {
if (this.query.siteexcludes != null && this.query.siteexcludes.contains(hosthash)) {
if (log.isFine()) log.logFine("dropped Node: siteexclude");
continue pollloop;
} else {
// filter out all domains that do not match with the site constraint
if (iEntry.url().getHost().indexOf(this.query.modifier.sitehost) < 0) continue pollloop;
if (iEntry.url().getHost().indexOf(this.query.modifier.sitehost) < 0) {
if (log.isFine()) log.logFine("dropped Node: sitehost");
continue pollloop;
// finally extend the double-check and insert result to stack
@ -1047,14 +1075,21 @@ public final class SearchEvent {
return null;
public void drainStacksToResult() {
public boolean drainStacksToResult() {
// we take one entry from both stacks at the same time
boolean success = false;
Element<URIMetadataNode> localEntryElement = this.nodeStack.sizeQueue() > 0 ? this.nodeStack.poll() : null;
URIMetadataNode localEntry = localEntryElement == null ? null : localEntryElement.getElement();
if (localEntry != null) addResult(getSnippet(localEntry, null));
if (localEntry != null) {
addResult(getSnippet(localEntry, null));
success = true;
if (localEntry == null) {
URIMetadataNode p2pEntry = pullOneFilteredFromRWI(true);
if (p2pEntry != null) addResult(getSnippet(p2pEntry, null));
if (p2pEntry != null) {
addResult(getSnippet(p2pEntry, null));
success = true;
} else {
new Thread() {
public void run() {
@ -1063,6 +1098,7 @@ public final class SearchEvent {
return success;
@ -1188,33 +1224,23 @@ public final class SearchEvent {
final long finishTime = System.currentTimeMillis() + timeout;
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(, SearchEventType.ONERESULT, "started, item = " + item + ", available = " + this.getResultCount(), 0, 0), false);
// check if we have a success
if (this.resultList.sizeAvailable() > item) {
// we have the wanted result already in the result array .. return that
final ResultEntry re = this.resultList.element(item).getElement();
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(, SearchEventType.ONERESULT, "prefetched, item = " + item + ", available = " + this.getResultCount() + ": " + re.urlstring(), 0, 0), false);
return re;
// wait until a local solr is finished, we must do that to be able to check if we need more
if (this.localsolrsearch != null && this.localsolrsearch.isAlive()) {
try {this.localsolrsearch.join();} catch (InterruptedException e) {}
// we must wait some time until the first result page is full to get enough elements for ranking
if (this.remote && item < 10 && !this.feedingIsFinished()) {
// the first 10 results have a very special timing to get most of the remote results ordered
// before they are presented on the first lines .. yes sleeps seem to be bad. but how shall we predict how long other
// peers will take until they respond?
long stoptime = System.currentTimeMillis() + Math.min(timeout, item == 0 ? 100 : (10 - item) * 9); // the first result takes the longest time
while (System.currentTimeMillis() < stoptime) {
try { Thread.sleep(10); } catch (final InterruptedException e) { Log.logException(e); }
this.localsolrsearch = null;
if (item >= this.localsolroffset && this.local_solr_stored.get() >= item) {
// load remaining solr results now
int nextitems = item - this.localsolroffset + this.query.itemsPerPage; // example: suddenly switch to item 60, just 10 had been shown, 20 loaded.
this.localsolrsearch = RemoteSearch.solrRemoteSearch(this, this.localsolroffset, nextitems, null /*this peer*/, Switchboard.urlBlacklist);
this.localsolroffset += nextitems;
// now do this as long as needed
while ((!this.feedingIsFinished() || this.rwiQueueSize() > 0 || this.nodeStack.sizeQueue() > 0) &&
this.resultList.sizeAvailable() < item + 1 && System.currentTimeMillis() < finishTime) {
try { Thread.sleep(10); } catch (final InterruptedException e) { Log.logException(e); }
// now pull results as long as needed and as long as possible
while ( this.resultList.sizeAvailable() <= item &&
(this.rwiQueueSize() > 0 || this.nodeStack.sizeQueue() > 0 ||
(!this.feedingIsFinished() && System.currentTimeMillis() < finishTime))) {
if (!drainStacksToResult()) try {Thread.sleep(10);} catch (final InterruptedException e) {Log.logException(e);}
// check if we have a success
@ -1222,6 +1248,12 @@ public final class SearchEvent {
// we have the wanted result already in the result array .. return that
final ResultEntry re = this.resultList.element(item).getElement();
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(, SearchEventType.ONERESULT, "fetched, item = " + item + ", available = " + this.getResultCount() + ": " + re.urlstring(), 0, 0), false);
if (this.local_solr_stored.get() > this.localsolroffset && (item + 1) % this.query.itemsPerPage == 0) {
// at the end of a list, trigger a next solr search
this.localsolrsearch = RemoteSearch.solrRemoteSearch(this, this.localsolroffset, this.query.itemsPerPage, null /*this peer*/, Switchboard.urlBlacklist);
this.localsolroffset += this.query.itemsPerPage;
return re;
