fixed RWIProcess queue limits: now discovering hidden results for mass

result retrieval
pull/1/head
Michael Peter Christen 13 years ago
parent 10c9c17d51
commit ab7107b34b

@ -731,7 +731,7 @@ public class IndexControlRWIs_p
final QueryParams query =
new QueryParams(ASCII.String(keyhash), -1, filter, segment, sb.getRanking(), "IndexControlRWIs_p");
final ReferenceOrder order = new ReferenceOrder(query.ranking, UTF8.getBytes(query.targetlang));
final RWIProcess ranked = new RWIProcess(query, order, Integer.MAX_VALUE, false);
final RWIProcess ranked = new RWIProcess(query, order, false);
ranked.run();
if ( ranked.filteredCount() == 0 ) {

@ -31,7 +31,7 @@ import net.yacy.cora.sorting.WeakPriorityBlockingQueue;
public class SearchResult extends WeakPriorityBlockingQueue<Object> {
public SearchResult(final int maxsize) {
super(maxsize);
super(maxsize, true);
}
private static final long serialVersionUID = -4865225874936938082L;
@ -64,6 +64,7 @@ public class SearchResult extends WeakPriorityBlockingQueue<Object> {
return this.maxScore;
}
@Override
public String toString() {
return "{count=" + this.numFound + ", offset=" + this.start + (this.maxScore != null ? ", maxScore=" + this.maxScore : "") + ", docs=" + super.toString() + "}";
}

@ -53,11 +53,11 @@ public class WeakPriorityBlockingQueue<E> {
* weights that are preferred are returned first when a pop from the stack is made
* @param maxsize the maximum size of the stack. When the stack exceeds this number, then entries are removed
*/
public WeakPriorityBlockingQueue(final int maxsize) {
public WeakPriorityBlockingQueue(final int maxsize, boolean drain) {
// the maxsize is the maximum number of entries in the stack
// if this is set to -1, the size is unlimited
this.queue = new TreeSet<Element<E>>();
this.drained = new ArrayList<Element<E>>();
this.drained = drain ? new ArrayList<Element<E>>() : null;
this.enqueued = new Semaphore(0);
this.maxsize = maxsize;
}
@ -66,7 +66,7 @@ public class WeakPriorityBlockingQueue<E> {
* clear the queue
*/
public synchronized void clear() {
this.drained.clear();
if (this.drained != null) this.drained.clear();
this.queue.clear();
this.enqueued.drainPermits();
}
@ -76,7 +76,7 @@ public class WeakPriorityBlockingQueue<E> {
* @return true if the queue is empty, false if not
*/
public boolean isEmpty() {
return this.queue.isEmpty() & this.drained.isEmpty();
return this.queue.isEmpty() & (this.drained == null || this.drained.isEmpty());
}
/**
@ -94,7 +94,7 @@ public class WeakPriorityBlockingQueue<E> {
* @return
*/
public synchronized int sizeDrained() {
return this.drained.size();
return this.drained == null ? 0 : this.drained.size();
}
/**
@ -103,7 +103,9 @@ public class WeakPriorityBlockingQueue<E> {
* @return
*/
public synchronized int sizeAvailable() {
return Math.min(this.maxsize, this.queue.size() + this.drained.size());
return this.maxsize < 0 ?
this.queue.size() + (this.drained == null ? 0 : this.drained.size()) :
Math.min(this.maxsize, this.queue.size() + (this.drained == null ? 0 : this.drained.size()));
}
/**
@ -116,7 +118,7 @@ public class WeakPriorityBlockingQueue<E> {
*/
public synchronized void put(final Element<E> element) {
// put the element on the stack
if (this.drained.contains(element)) return;
if (this.drained != null && this.drained.contains(element)) return;
if (this.queue.size() == this.maxsize) {
// remove last elements if stack is too large
if (this.queue.add(element)) this.queue.remove(this.queue.last());
@ -170,7 +172,7 @@ public class WeakPriorityBlockingQueue<E> {
final Element<E> element = this.queue.first();
assert element != null;
this.queue.remove(element);
if (this.drained.size() < this.maxsize) this.drained.add(element);
if (this.drained != null && (this.maxsize == -1 || this.drained.size() < this.maxsize)) this.drained.add(element);
assert this.queue.size() >= this.enqueued.availablePermits() : "(take) queue.size() = " + this.queue.size() + ", enqueued.availablePermits() = " + this.enqueued.availablePermits();
return element;
}
@ -197,6 +199,7 @@ public class WeakPriorityBlockingQueue<E> {
* @return the element from the recorded position or null if that position is not available
*/
public Element<E> element(final int position) {
if (this.drained == null) return null;
if (position < this.drained.size()) {
return this.drained.get(position);
}
@ -225,6 +228,7 @@ public class WeakPriorityBlockingQueue<E> {
* @throws InterruptedException
*/
public Element<E> element(final int position, long time) throws InterruptedException {
if (this.drained == null) return null;
long timeout = System.currentTimeMillis() + time;
if (position < this.drained.size()) {
return this.drained.get(position);
@ -246,6 +250,7 @@ public class WeakPriorityBlockingQueue<E> {
* @return a list of elements in the stack
*/
public synchronized ArrayList<Element<E>> list(final int count) {
if (this.drained == null) return null;
if (count < 0) {
return list();
}
@ -259,6 +264,7 @@ public class WeakPriorityBlockingQueue<E> {
* @return a list of all elements in the stack
*/
public synchronized ArrayList<Element<E>> list() {
if (this.drained == null) return null;
// shift all elements
while (!this.queue.isEmpty()) this.poll();
return this.drained;
@ -269,6 +275,7 @@ public class WeakPriorityBlockingQueue<E> {
* @return an iterator over all drained positions.
*/
public synchronized Iterator<Element<E>> iterator() {
if (this.drained == null) return null;
// shift all elements to the offstack
while (!this.queue.isEmpty()) this.poll();
return this.drained.iterator();
@ -377,7 +384,7 @@ public class WeakPriorityBlockingQueue<E> {
}
public static void main(String[] args) {
final WeakPriorityBlockingQueue<String> a = new WeakPriorityBlockingQueue<String>(3);
final WeakPriorityBlockingQueue<String> a = new WeakPriorityBlockingQueue<String>(3, true);
//final Element<String> REVERSE_POISON = new ReverseElement<String>("", Long.MIN_VALUE);
new Thread(){
@Override

@ -45,7 +45,6 @@ import net.yacy.kelondro.data.meta.URIMetadataRow;
import net.yacy.kelondro.logging.Log;
import net.yacy.search.query.QueryParams;
import net.yacy.search.query.RWIProcess;
import net.yacy.search.query.SearchEvent;
import net.yacy.search.ranking.RankingProfile;
import net.yacy.search.ranking.ReferenceOrder;
@ -223,7 +222,7 @@ public class DocumentIndex extends Segment
final QueryParams query =
new QueryParams(querystring, count, null, this, textRankingDefault, "DocumentIndex");
final ReferenceOrder order = new ReferenceOrder(query.ranking, UTF8.getBytes(query.targetlang));
final RWIProcess rankedCache = new RWIProcess(query, order, SearchEvent.max_results_preparation, false);
final RWIProcess rankedCache = new RWIProcess(query, order, false);
rankedCache.start();
// search is running; retrieve results

@ -44,6 +44,7 @@ import net.yacy.cora.document.Classification.ContentDomain;
import net.yacy.cora.document.MultiProtocolURI;
import net.yacy.cora.lod.SimpleVocabulary;
import net.yacy.cora.protocol.Scanner;
import net.yacy.cora.services.federated.yacy.CacheStrategy;
import net.yacy.cora.sorting.ClusteredScoreMap;
import net.yacy.cora.sorting.ConcurrentScoreMap;
import net.yacy.cora.sorting.ScoreMap;
@ -73,7 +74,7 @@ public final class RWIProcess extends Thread
{
private static final long maxWaitPerResult = 300;
private static final int maxDoubleDomAll = 1000, maxDoubleDomSpecial = 10000;
private static final int max_results_preparation = 3000, max_results_preparation_special = -1; // -1 means 'no limit'
private final QueryParams query;
private final HandleSet urlhashes; // map for double-check; String/Long relation, addresses ranking number (backreference for deletion)
@ -109,13 +110,13 @@ public final class RWIProcess extends Thread
private final ScoreMap<String> filetypeNavigator; // a counter for file types
private final Map<String, ScoreMap<String>> vocabularyNavigator; // counters for Vocabularies
public RWIProcess(final QueryParams query, final ReferenceOrder order, final int maxentries, final boolean remote) {
public RWIProcess(final QueryParams query, final ReferenceOrder order, final boolean remote) {
// we collect the urlhashes and construct a list with urlEntry objects
// attention: if minEntries is too high, this method will not terminate within the maxTime
// sortorder: 0 = hash, 1 = url, 2 = ranking
this.addRunning = true;
this.localSearchInclusion = null;
this.stack = new WeakPriorityBlockingQueue<WordReferenceVars>(maxentries);
this.stack = new WeakPriorityBlockingQueue<WordReferenceVars>(query.snippetCacheStrategy == null || query.snippetCacheStrategy == CacheStrategy.CACHEONLY ? max_results_preparation_special : max_results_preparation, false);
this.doubleDomCache = new ConcurrentHashMap<String, WeakPriorityBlockingQueue<WordReferenceVars>>();
this.query = query;
this.order = order;
@ -499,7 +500,7 @@ public final class RWIProcess extends Thread
m = this.doubleDomCache.get(hosthash);
if ( m == null ) {
// first appearance of dom. we create an entry to signal that one of that domain was already returned
m = new WeakPriorityBlockingQueue<WordReferenceVars>(this.query.specialRights ? maxDoubleDomSpecial : maxDoubleDomAll);
m = new WeakPriorityBlockingQueue<WordReferenceVars>(this.query.snippetCacheStrategy == null || this.query.snippetCacheStrategy == CacheStrategy.CACHEONLY ? max_results_preparation_special : max_results_preparation, false);
this.doubleDomCache.put(hosthash, m);
return rwi;
}
@ -552,15 +553,16 @@ public final class RWIProcess extends Thread
// finally remove the best entry from the doubledom cache
m = this.doubleDomCache.get(bestEntry.getElement().hosthash());
bestEntry = m.poll();
if (m.sizeAvailable() == 0) {
synchronized ( this.doubleDomCache ) {
if (m.sizeAvailable() == 0) {
this.doubleDomCache.remove(bestEntry.getElement().hosthash());
if (m != null) {
bestEntry = m.poll();
if (bestEntry != null && m.sizeAvailable() == 0) {
synchronized ( this.doubleDomCache ) {
if (m.sizeAvailable() == 0) {
this.doubleDomCache.remove(bestEntry.getElement().hosthash());
}
}
}
}
return bestEntry;
}

@ -82,8 +82,6 @@ public final class SearchEvent
RESULTLIST;
}
public static final int max_results_preparation = 420000;
// class variables that may be implemented with an abstract class
private long eventTime;
private QueryParams query;
@ -147,7 +145,7 @@ public final class SearchEvent
// initialize a ranking process that is the target for data
// that is generated concurrently from local and global search threads
this.rankingProcess = new RWIProcess(this.query, this.order, max_results_preparation, remote);
this.rankingProcess = new RWIProcess(this.query, this.order, remote);
// start a local search concurrently
this.rankingProcess.start();

@ -109,8 +109,8 @@ public class SnippetProcess {
this.urlRetrievalAllTime = 0;
this.snippetComputationAllTime = 0;
this.result = new WeakPriorityBlockingQueue<ResultEntry>(Math.max(1000, 10 * query.itemsPerPage())); // this is the result, enriched with snippets, ranked and ordered by ranking
this.images = new WeakPriorityBlockingQueue<MediaSnippet>(Math.max(1000, 10 * query.itemsPerPage()));
this.result = new WeakPriorityBlockingQueue<ResultEntry>(Math.max(1000, 10 * query.itemsPerPage()), true); // this is the result, enriched with snippets, ranked and ordered by ranking
this.images = new WeakPriorityBlockingQueue<MediaSnippet>(Math.max(1000, 10 * query.itemsPerPage()), true);
// snippets do not need to match with the complete query hashes,
// only with the query minus the stopwords which had not been used for the search

Loading…
Cancel
Save