added some pauses into the search process which shall produce

better-ranked search results. without that pauses the result page will
only contain links from the peer that answers first which is not a good
average picture of all the peers that provided results
pull/1/head
Michael Christen 13 years ago
parent b67e89ca3f
commit 044f83feed

@ -62,7 +62,7 @@ public class serverSwitch
private final File configFile;
private final String configComment;
private final File dataPath;
protected final File appPath;
public final File appPath;
protected boolean firstInit;
protected Log log;
protected int serverJobs;

@ -62,6 +62,7 @@ public class HeapModifier extends HeapReader implements BLOB {
* clears the content of the database
* @throws IOException
*/
@Override
public synchronized void clear() throws IOException {
this.index.clear();
this.free.clear();
@ -69,7 +70,7 @@ public class HeapModifier extends HeapReader implements BLOB {
this.file = null;
FileUtils.deletedelete(this.heapFile);
super.deleteFingerprint();
this.file = new CachedFileWriter(heapFile);
this.file = new CachedFileWriter(this.heapFile);
}
/**
@ -96,16 +97,17 @@ public class HeapModifier extends HeapReader implements BLOB {
* @param key the primary key
* @throws IOException
*/
@Override
public void delete(byte[] key) throws IOException {
key = normalizeKey(key);
// pre-check before synchronization
long seek = index.get(key);
long seek = this.index.get(key);
if (seek < 0) return;
synchronized (this) {
// check again if the index contains the key
seek = index.get(key);
seek = this.index.get(key);
if (seek < 0) return;
// check consistency of the index
@ -113,12 +115,12 @@ public class HeapModifier extends HeapReader implements BLOB {
// access the file and read the container
this.file.seek(seek);
int size = file.readInt();
int size = this.file.readInt();
//assert seek + size + 4 <= this.file.length() : heapFile.getName() + ": too long size " + size + " in record at " + seek;
long filelength = this.file.length(); // put in separate variable for debugging
if (seek + size + 4 > filelength) {
Log.logSevere("BLOBHeap", heapFile.getName() + ": too long size " + size + " in record at " + seek);
throw new IOException(heapFile.getName() + ": too long size " + size + " in record at " + seek);
Log.logSevere("BLOBHeap", this.heapFile.getName() + ": too long size " + size + " in record at " + seek);
throw new IOException(this.heapFile.getName() + ": too long size " + size + " in record at " + seek);
}
super.deleteFingerprint();
@ -238,35 +240,38 @@ public class HeapModifier extends HeapReader implements BLOB {
}
}
public void insert(byte[] key, byte[] b) throws IOException {
@Override
public void insert(byte[] key, byte[] b) throws IOException {
throw new UnsupportedOperationException("put is not supported in BLOBHeapModifier");
}
public int replace(byte[] key, final Rewriter rewriter) throws IOException {
@Override
public int replace(byte[] key, final Rewriter rewriter) throws IOException {
throw new UnsupportedOperationException();
}
public int reduce(byte[] key, final Reducer reducer) throws IOException, RowSpaceExceededException {
@Override
public int reduce(byte[] key, final Reducer reducer) throws IOException, RowSpaceExceededException {
key = normalizeKey(key);
assert key.length == this.keylength;
// pre-check before synchronization
long pos = index.get(key);
long pos = this.index.get(key);
if (pos < 0) return 0;
synchronized (this) {
long m = this.mem();
// check again if the index contains the key
pos = index.get(key);
pos = this.index.get(key);
if (pos < 0) return 0;
// check consistency of the index
//assert checkKey(key, pos) : "key compare failed; key = " + UTF8.String(key) + ", seek = " + pos;
// access the file and read the container
file.seek(pos);
final int len = file.readInt() - this.keylength;
this.file.seek(pos);
final int len = this.file.readInt() - this.keylength;
if (MemoryControl.available() < len) {
if (!MemoryControl.request(len, true)) return 0; // not enough memory available for this blob
}
@ -274,12 +279,12 @@ public class HeapModifier extends HeapReader implements BLOB {
// read the key
final byte[] keyf = new byte[this.keylength];
file.readFully(keyf, 0, keyf.length);
assert this.ordering.equal(key, keyf);
this.file.readFully(keyf, 0, keyf.length);
assert this.ordering == null || this.ordering.equal(key, keyf);
// read the blob
byte[] blob = new byte[len];
file.readFully(blob, 0, blob.length);
this.file.readFully(blob, 0, blob.length);
// rewrite the entry
blob = reducer.rewrite(blob);
@ -287,7 +292,7 @@ public class HeapModifier extends HeapReader implements BLOB {
if (reduction == 0) {
// even if the reduction is zero then it is still be possible that the record has been changed
this.file.seek(pos + 4 + key.length);
file.write(blob);
this.file.write(blob);
return 0;
}
@ -297,14 +302,14 @@ public class HeapModifier extends HeapReader implements BLOB {
// replace old content
this.file.seek(pos);
file.writeInt(blob.length + key.length);
file.write(key);
file.write(blob);
this.file.writeInt(blob.length + key.length);
this.file.write(key);
this.file.write(blob);
// define the new empty entry
final int newfreereclen = reduction - 4;
assert newfreereclen >= 0;
file.writeInt(newfreereclen);
this.file.writeInt(newfreereclen);
// fill zeros to the content
int l = newfreereclen; byte[] fill = new byte[newfreereclen];

@ -617,6 +617,7 @@ public final class Protocol
// duetime : maximum time that a peer should spent to create a result
final long timestamp = System.currentTimeMillis();
containerCache.addExpectedRemoteReferences(count);
SearchResult result;
try {
result =
@ -783,7 +784,7 @@ public final class Protocol
// insert one container into the search result buffer
// one is enough, only the references are used, not the word
containerCache.add(container[0], false, target.getName() + "/" + target.hash, result.joincount, true);
containerCache.decExpectedRemoteReferences(count - container[0].size());
containerCache.addExpectedRemoteReferences(-count);
// insert the containers to the index
for ( final ReferenceContainer<WordReference> c : container ) {

@ -62,6 +62,7 @@ import net.yacy.kelondro.rwi.ReferenceContainerCache;
import net.yacy.kelondro.rwi.ReferenceFactory;
import net.yacy.kelondro.util.FileUtils;
import net.yacy.kelondro.util.LookAheadIterator;
import net.yacy.search.Switchboard;
public class WebStructureGraph
{
@ -194,6 +195,7 @@ public class WebStructureGraph
//final String refhashp = ASCII.String(lro.url.hash(), 6, 6); // ref hash part
String nexturlhash;
for ( final MultiProtocolURI u : lro.globalRefURLs ) {
if (Switchboard.getSwitchboard().shallTerminate()) break;
final byte[] nexturlhashb = new DigestURI(u).hash();
assert nexturlhashb != null;
if ( nexturlhashb != null ) {

@ -2598,7 +2598,7 @@ public final class Switchboard extends serverSwitch {
try {
links = Switchboard.this.loader.loadLinks(url, CacheStrategy.NOCACHE);
} catch (final IOException e) {
Log.logException(e);
//Log.logException(e);
return;
}
final Iterator<MultiProtocolURI> i = links.keySet().iterator();
@ -2858,6 +2858,10 @@ public final class Switchboard extends serverSwitch {
(new delayedShutdown(this, delay, reason)).start();
}
public boolean shallTerminate() {
return this.terminate;
}
public void terminate(final String reason) {
this.terminate = true;
this.log.logInfo("caught terminate request: " + reason);

@ -68,7 +68,7 @@ import net.yacy.search.snippet.ResultEntry;
public final class RWIProcess extends Thread
{
private static final long maxWaitPerResult = 30;
private static final long maxWaitPerResult = 300;
private static final int maxDoubleDomAll = 1000, maxDoubleDomSpecial = 10000;
private final QueryParams query;
@ -81,8 +81,8 @@ public final class RWIProcess extends Thread
private int remote_resourceSize, remote_indexCount, remote_peerCount;
private int local_indexCount;
private int initialExpectedRemoteReferences;
private final AtomicInteger expectedRemoteReferences, receivedRemoteReferences;
private final AtomicInteger maxExpectedRemoteReferences, expectedRemoteReferences,
receivedRemoteReferences;
private final WeakPriorityBlockingQueue<WordReferenceVars> stack;
private final AtomicInteger feeders;
private final ConcurrentHashMap<String, WeakPriorityBlockingQueue<WordReferenceVars>> doubleDomCache; // key = domhash (6 bytes); value = like stack
@ -133,20 +133,31 @@ public final class RWIProcess extends Thread
this.ref = new ConcurrentScoreMap<String>();
this.feeders = new AtomicInteger(1);
this.startTime = System.currentTimeMillis();
this.initialExpectedRemoteReferences = 0;
this.maxExpectedRemoteReferences = new AtomicInteger(0);
this.expectedRemoteReferences = new AtomicInteger(0);
this.receivedRemoteReferences = new AtomicInteger(0);
}
public void setExpectedRemoteReferences(int expectedRemoteReferences) {
this.initialExpectedRemoteReferences = expectedRemoteReferences;
this.expectedRemoteReferences.set(expectedRemoteReferences);
public void addExpectedRemoteReferences(int x) {
if ( x > 0 ) {
this.maxExpectedRemoteReferences.addAndGet(x);
}
this.expectedRemoteReferences.addAndGet(x);
}
public void decExpectedRemoteReferences(int x) {
this.expectedRemoteReferences.addAndGet(-x);
public boolean expectMoreRemoteReferences() {
return this.expectedRemoteReferences.get() > 0;
}
public long waitTimeRecommendation() {
return
this.maxExpectedRemoteReferences.get() == 0 ? 0 :
Math.min(maxWaitPerResult,
Math.min(
maxWaitPerResult * this.expectedRemoteReferences.get() / this.maxExpectedRemoteReferences.get(),
maxWaitPerResult * (100 - Math.min(100, this.receivedRemoteReferences.get())) / 100));
}
public QueryParams getQuery() {
return this.query;
}
@ -221,13 +232,14 @@ public final class RWIProcess extends Thread
// normalize entries
final BlockingQueue<WordReferenceVars> decodedEntries = this.order.normalizeWith(index);
int is = index.size();
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(
this.query.id(true),
SearchEvent.Type.NORMALIZING,
resourceName,
index.size(),
is,
System.currentTimeMillis() - timer), false);
this.receivedRemoteReferences.addAndGet(index.size());
if (!local) this.receivedRemoteReferences.addAndGet(is);
// iterate over normalized entries and select some that are better than currently stored
timer = System.currentTimeMillis();
@ -419,19 +431,15 @@ public final class RWIProcess extends Thread
try {
//System.out.println("stack.poll: feeders = " + this.feeders + ", stack.sizeQueue = " + stack.sizeQueue());
int loops = 0; // a loop counter to terminate the reading if all the results are from the same domain
final long timeout = System.currentTimeMillis() + waitingtime;
// wait some time if we did not get so much remote results so far to get a better ranking over remote results
// we wait at most 30 milliseconds to get a maximum total waiting time of 300 milliseconds for 10 results
long wait =
this.receivedRemoteReferences.get() == 0 ? maxWaitPerResult : Math.min(
maxWaitPerResult,
maxWaitPerResult
* this.initialExpectedRemoteReferences
/ this.receivedRemoteReferences.get());
long wait = waitTimeRecommendation();
if ( wait > 0 ) {
System.out.println("*** RWIProcess extra wait: " + wait + "ms; expectedRemoteReferences = " + this.expectedRemoteReferences.get() + ", receivedRemoteReferences = " + this.receivedRemoteReferences.get() + ", initialExpectedRemoteReferences = " + this.maxExpectedRemoteReferences.get());
Thread.sleep(wait);
}
// loop as long as we can expect that we should get more results
final long timeout = System.currentTimeMillis() + waitingtime;
while ( ((!feedingIsFinished() && this.addRunning) || this.stack.sizeQueue() > 0)
&& (this.query.itemsPerPage < 1 || loops++ < this.query.itemsPerPage) ) {
if ( waitingtime <= 0 ) {
@ -526,7 +534,7 @@ public final class RWIProcess extends Thread
* applied ranking. If there are no more entries left or the timeout limit is reached then null is
* returned. The caller may distinguish the timeout case from the case where there will be no more also in
* the future by calling this.feedingIsFinished()
*
*
* @param skipDoubleDom should be true if it is wanted that double domain entries are skipped
* @param waitingtime the time this method may take for a result computation
* @return a metadata entry for a url

@ -183,8 +183,6 @@ public final class SearchEvent
+ remote_maxcount
+ " URLs");
this.rankingProcess.moreFeeders(this.primarySearchThreads.length);
this.rankingProcess.setExpectedRemoteReferences(this.primarySearchThreads.length
* remote_maxcount);
EventTracker.update(
EventTracker.EClass.SEARCH,
new ProfilingGraph.EventSearch(

@ -148,7 +148,7 @@ public class SnippetProcess {
final long waittimeout = System.currentTimeMillis() + 300;
if (item == 0) while (
(!this.rankingProcess.feedingIsFinished() || this.rankingProcess.sizeQueue() > 0) &&
this.result.sizeAvailable() < this.query.neededResults() &&
this.result.sizeAvailable() < 3 &&
System.currentTimeMillis() < waittimeout &&
anyWorkerAlive()
) {
@ -309,6 +309,10 @@ public class SnippetProcess {
this.workerThreads[i] = worker;
if (this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0) break;
if (this.result.sizeAvailable() >= neededResults) break;
if (this.rankingProcess.expectMoreRemoteReferences()) {
long wait = this.rankingProcess.waitTimeRecommendation();
if (wait > 0)try {Thread.sleep(wait);} catch ( InterruptedException e ) {}
}
}
}
} else {
@ -325,6 +329,10 @@ public class SnippetProcess {
}
if (this.rankingProcess.feedingIsFinished() && this.rankingProcess.sizeQueue() == 0) break;
if (this.result.sizeAvailable() >= neededResults) break;
if (this.rankingProcess.expectMoreRemoteReferences()) {
long wait = this.rankingProcess.waitTimeRecommendation();
if (wait > 0)try {Thread.sleep(wait);} catch ( InterruptedException e ) {}
}
}
}
}

@ -44,7 +44,8 @@ public enum ContentDomain {
}
public static ContentDomain contentdomParser(final String dom) {
if ("text".equals(dom)) return TEXT;
if ("all".equals(dom)) return ALL;
else if ("text".equals(dom)) return TEXT;
else if ("image".equals(dom)) return IMAGE;
else if ("audio".equals(dom)) return AUDIO;
else if ("video".equals(dom)) return VIDEO;
@ -54,7 +55,8 @@ public enum ContentDomain {
@Override
public String toString() {
if (this == TEXT) return "text";
if (this == ALL) return "all";
else if (this == TEXT) return "text";
else if (this == IMAGE) return "image";
else if (this == AUDIO) return "audio";
else if (this == VIDEO) return "video";

Loading…
Cancel
Save