redesign of the SortStack and SortStore classes:

created a WeakPriorityBlockingQueue as special implementation
of a PriorityBlockingQueue with a weak object binding.
- better abstraction of ordering technique
- fixed some bugs according to result numbering (distinguish different counters in Queue)
- fixed a ordering bug in post-ranking (ordering was decreased instead of increased)
- reversed ordering numbering using a reversed ordering. The higher the ranking number the better (now).

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@7128 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 15 years ago
parent 03eb021568
commit 348dece62f

@ -38,6 +38,7 @@ import java.util.TreeSet;
import net.yacy.cora.document.RSSMessage;
import net.yacy.cora.protocol.HeaderFramework;
import net.yacy.cora.protocol.RequestHeader;
import net.yacy.cora.storage.WeakPriorityBlockingQueue.ReverseElement;
import net.yacy.kelondro.data.meta.DigestURI;
import net.yacy.kelondro.data.word.WordReference;
import net.yacy.kelondro.data.word.WordReferenceRow;
@ -45,7 +46,6 @@ import net.yacy.kelondro.index.HandleSet;
import net.yacy.kelondro.order.Bitfield;
import net.yacy.kelondro.rwi.ReferenceContainer;
import net.yacy.kelondro.util.EventTracker;
import net.yacy.kelondro.util.SortStack;
import net.yacy.kelondro.util.ISO639;
import de.anomic.crawler.CrawlProfile;
@ -192,7 +192,7 @@ public final class search {
int joincount = 0;
QueryParams theQuery = null;
SearchEvent theSearch = null;
ArrayList<SortStack<ResultEntry>.stackElement> accu = null;
ArrayList<ReverseElement<ResultEntry>> accu = null;
if ((query.length() == 0) && (abstractSet != null)) {
// this is _not_ a normal search, only a request for index abstracts
Segment indexSegment = sb.indexSegments.segment(Segments.Process.PUBLIC);
@ -362,10 +362,10 @@ public final class search {
final long timer = System.currentTimeMillis();
final StringBuilder links = new StringBuilder(6000);
String resource = null;
SortStack<ResultEntry>.stackElement entry;
ReverseElement<ResultEntry> entry;
for (int i = 0; i < accu.size(); i++) {
entry = accu.get(i);
resource = entry.element.resource();
resource = entry.getElement().resource();
if (resource != null) {
links.append("resource").append(i).append('=').append(resource).append(serverCore.CRLF_STRING);
}

@ -130,7 +130,7 @@ public class SitemapImporter extends Thread {
url,
null, // this.siteMapURL.toString(),
entry.url(),
new Date(),
entry.lastmod(new Date()),
this.crawlingProfile.handle(),
0,
0,

@ -49,11 +49,11 @@ public class MediaSnippet implements Comparable<MediaSnippet>, Comparator<MediaS
public ContentDomain type;
public DigestURI href, source;
public String name, attr, mime;
public int ranking;
public long ranking;
public int width, height;
public long fileSize;
public MediaSnippet(final ContentDomain type, final DigestURI href, final String mime, final String name, final long fileSize, final String attr, final int ranking, final DigestURI source) {
public MediaSnippet(final ContentDomain type, final DigestURI href, final String mime, final String name, final long fileSize, final String attr, final long ranking, final DigestURI source) {
this.type = type;
this.href = href;
this.mime = mime;
@ -73,7 +73,7 @@ public class MediaSnippet implements Comparable<MediaSnippet>, Comparator<MediaS
if ((this.attr == null) || (this.attr.length() == 0)) this.attr = "_";
}
public MediaSnippet(final ContentDomain type, final DigestURI href, final String mime, final String name, final long fileSize, final int width, final int height, final int ranking, final DigestURI source) {
public MediaSnippet(final ContentDomain type, final DigestURI href, final String mime, final String name, final long fileSize, final int width, final int height, final long ranking, final DigestURI source) {
this.type = type;
this.href = href;
this.mime = mime;
@ -188,7 +188,7 @@ public class MediaSnippet implements Comparable<MediaSnippet>, Comparator<MediaS
int appcount = queryhashes.size() * 2 -
TextSnippet.removeAppearanceHashes(url.toNormalform(false, false), queryhashes).size() -
TextSnippet.removeAppearanceHashes(desc, queryhashes).size();
final int ranking = Integer.MAX_VALUE - (ientry.height() + 1) * (ientry.width() + 1) * (appcount + 1);
final long ranking = Long.MAX_VALUE - (ientry.height() + 1) * (ientry.width() + 1) * (appcount + 1);
result.add(new MediaSnippet(ContentDomain.IMAGE, url, MimeTable.url2mime(url), desc, ientry.fileSize(), ientry.width(), ientry.height(), ranking, source));
}
return result;

@ -99,7 +99,7 @@ public final class MetadataRepository implements Iterable<byte[]> {
}
public int size() {
return urlIndexFile.size();
return urlIndexFile == null ? 0 : urlIndexFile.size();
}
public void close() {

@ -40,6 +40,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import net.yacy.cora.document.MultiProtocolURI;
import net.yacy.cora.storage.WeakPriorityBlockingQueue;
import net.yacy.cora.storage.WeakPriorityBlockingQueue.ReverseElement;
import net.yacy.document.Condenser;
import net.yacy.kelondro.data.meta.DigestURI;
import net.yacy.kelondro.data.meta.URIMetadataRow;
@ -56,7 +58,6 @@ import net.yacy.kelondro.rwi.ReferenceContainer;
import net.yacy.kelondro.rwi.TermSearch;
import net.yacy.kelondro.util.EventTracker;
import net.yacy.kelondro.util.FileUtils;
import net.yacy.kelondro.util.SortStack;
import de.anomic.yacy.graphics.ProfilingGraph;
@ -77,9 +78,9 @@ public final class RankingProcess extends Thread {
private int remote_resourceSize, remote_indexCount, remote_peerCount;
private int local_resourceSize, local_indexCount;
private final SortStack<WordReferenceVars> stack;
private final WeakPriorityBlockingQueue<ReverseElement<WordReferenceVars>> stack;
private int feeders;
private final ConcurrentHashMap<String, SortStack<WordReferenceVars>> doubleDomCache; // key = domhash (6 bytes); value = like stack
private final ConcurrentHashMap<String, WeakPriorityBlockingQueue<ReverseElement<WordReferenceVars>>> doubleDomCache; // key = domhash (6 bytes); value = like stack
//private final HandleSet handover; // key = urlhash; used for double-check of urls that had been handed over to search process
private final Navigator ref; // reference score computation for the commonSense heuristic
@ -93,8 +94,8 @@ public final class RankingProcess extends Thread {
// attention: if minEntries is too high, this method will not terminate within the maxTime
// sortorder: 0 = hash, 1 = url, 2 = ranking
this.localSearchInclusion = null;
this.stack = new SortStack<WordReferenceVars>(maxentries, true);
this.doubleDomCache = new ConcurrentHashMap<String, SortStack<WordReferenceVars>>();
this.stack = new WeakPriorityBlockingQueue<ReverseElement<WordReferenceVars>>(maxentries);
this.doubleDomCache = new ConcurrentHashMap<String, WeakPriorityBlockingQueue<ReverseElement<WordReferenceVars>>>();
//this.handover = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.getOrdering(), 0);
this.query = query;
this.order = order;
@ -245,17 +246,7 @@ public final class RankingProcess extends Thread {
if (urlhashes.has(fEntry.metadataHash())) continue;
// insert
if (maxentries < 0 || stack.size() < maxentries) {
// in case that we don't have enough yet, accept any new entry
stack.push(fEntry, r);
} else {
// if we already have enough entries, insert only such that are necessary to get a better result
if (stack.bottom(r.longValue())) continue;
// take the entry. the stack is automatically reduced
// to the maximum size by deletion of elements at the bottom
stack.push(fEntry, r);
}
stack.put(new ReverseElement<WordReferenceVars>(fEntry, r)); // inserts the element and removed the worst (which is smallest)
try {
urlhashes.put(fEntry.metadataHash());
} catch (RowSpaceExceededException e) {
@ -314,32 +305,30 @@ public final class RankingProcess extends Thread {
// - root-domain guessing to prefer the root domain over other urls if search word appears in domain name
private SortStack<WordReferenceVars>.stackElement takeRWI(final boolean skipDoubleDom) {
private ReverseElement<WordReferenceVars> takeRWI(final boolean skipDoubleDom) {
// returns from the current RWI list the best entry and removes this entry from the list
SortStack<WordReferenceVars> m;
SortStack<WordReferenceVars>.stackElement rwi;
while (!stack.isEmpty()) {
rwi = stack.pop();
if (rwi == null) continue; // in case that a synchronization problem occurred just go lazy over it
WeakPriorityBlockingQueue<ReverseElement<WordReferenceVars>> m;
ReverseElement<WordReferenceVars> rwi;
while ((rwi = stack.poll()) != null) {
if (!skipDoubleDom) return rwi;
// check doubledom
final String domhash = new String(rwi.element.metadataHash()).substring(6);
final String domhash = new String(rwi.getElement().metadataHash()).substring(6);
m = this.doubleDomCache.get(domhash);
if (m == null) {
// first appearance of dom
m = new SortStack<WordReferenceVars>((query.specialRights) ? maxDoubleDomSpecial : maxDoubleDomAll, true);
m = new WeakPriorityBlockingQueue<ReverseElement<WordReferenceVars>>((query.specialRights) ? maxDoubleDomSpecial : maxDoubleDomAll);
this.doubleDomCache.put(domhash, m);
return rwi;
}
// second appearances of dom
m.push(rwi.element, rwi.weight);
m.put(rwi);
}
// no more entries in sorted RWI entries. Now take Elements from the doubleDomCache
// find best entry from all caches
SortStack<WordReferenceVars>.stackElement bestEntry = null;
SortStack<WordReferenceVars>.stackElement o;
ReverseElement<WordReferenceVars> bestEntry = null;
ReverseElement<WordReferenceVars> o;
synchronized (this.doubleDomCache) {
final Iterator<SortStack<WordReferenceVars>> i = this.doubleDomCache.values().iterator();
final Iterator<WeakPriorityBlockingQueue<ReverseElement<WordReferenceVars>>> i = this.doubleDomCache.values().iterator();
while (i.hasNext()) {
try {
m = i.next();
@ -350,19 +339,19 @@ public final class RankingProcess extends Thread {
if (m == null) continue;
if (m.isEmpty()) continue;
if (bestEntry == null) {
bestEntry = m.top();
bestEntry = m.peek();
continue;
}
o = m.top();
if (o.weight.longValue() < bestEntry.weight.longValue()) {
o = m.peek();
if (o.getWeight() < bestEntry.getWeight()) {
bestEntry = o;
}
}
}
if (bestEntry == null) return null;
// finally remove the best entry from the doubledom cache
m = this.doubleDomCache.get(new String(bestEntry.element.metadataHash()).substring(6));
o = m.pop();
m = this.doubleDomCache.get(new String(bestEntry.getElement().metadataHash()).substring(6));
o = m.poll();
//assert o == null || o.element.metadataHash().equals(bestEntry.element.metadataHash()) : "bestEntry.element.metadataHash() = " + bestEntry.element.metadataHash() + ", o.element.metadataHash() = " + o.element.metadataHash();
return bestEntry;
}
@ -382,17 +371,17 @@ public final class RankingProcess extends Thread {
int p = -1;
byte[] urlhash;
while (System.currentTimeMillis() < timeLimit) {
final SortStack<WordReferenceVars>.stackElement obrwi = takeRWI(skipDoubleDom);
final ReverseElement<WordReferenceVars> obrwi = takeRWI(skipDoubleDom);
if (obrwi == null) {
if (this.feedingIsFinished()) return null;
try {Thread.sleep(50);} catch (final InterruptedException e1) {}
continue;
}
urlhash = obrwi.element.metadataHash();
final URIMetadataRow page = this.query.getSegment().urlMetadata().load(urlhash, obrwi.element, obrwi.weight.longValue());
urlhash = obrwi.getElement().metadataHash();
final URIMetadataRow page = this.query.getSegment().urlMetadata().load(urlhash, obrwi.getElement(), obrwi.getWeight());
if (page == null) {
try {
misses.put(obrwi.element.metadataHash());
misses.put(obrwi.getElement().metadataHash());
} catch (RowSpaceExceededException e) {
Log.logException(e);
}
@ -494,17 +483,16 @@ public final class RankingProcess extends Thread {
}
protected int size() {
//assert sortedRWIEntries.size() == urlhashes.size() : "sortedRWIEntries.size() = " + sortedRWIEntries.size() + ", urlhashes.size() = " + urlhashes.size();
int c = stack.size();
for (SortStack<WordReferenceVars> s: this.doubleDomCache.values()) {
c += s.size();
int c = stack.sizeAvailable();
for (WeakPriorityBlockingQueue<ReverseElement<WordReferenceVars>> s: this.doubleDomCache.values()) {
c += s.sizeAvailable();
}
return c;
}
public boolean isEmpty() {
if (!stack.isEmpty()) return false;
for (SortStack<WordReferenceVars> s: this.doubleDomCache.values()) {
for (WeakPriorityBlockingQueue<ReverseElement<WordReferenceVars>> s: this.doubleDomCache.values()) {
if (!s.isEmpty()) return false;
}
return true;
@ -518,7 +506,7 @@ public final class RankingProcess extends Thread {
public int filteredCount() {
// the number of index entries that are considered as result set
return this.stack.size();
return this.stack.sizeAvailable();
}
public int getLocalIndexCount() {
@ -546,11 +534,6 @@ public final class RankingProcess extends Thread {
return this.remote_peerCount;
}
protected void remove(final WordReferenceVars reference) {
stack.remove(reference);
urlhashes.remove(reference.urlHash);
}
public Iterator<byte[]> miss() {
return this.misses.iterator();
}

@ -201,6 +201,11 @@ public class ReferenceOrder {
return (doms.getScore(new String(urlHash, 6, 6)) << 8) / (1 + this.maxdomcount);
}
/**
* return the ranking of a given word entry
* @param t
* @return a ranking: the higher the number, the better is the ranking
*/
public long cardinal(final WordReferenceVars t) {
//return Long.MAX_VALUE - preRanking(ranking, iEntry, this.entryMin, this.entryMax, this.searchWords);
// the normalizedEntry must be a normalized indexEntry
@ -247,7 +252,7 @@ public class ReferenceOrder {
//if (searchWords != null) r += (yacyURL.probablyWordURL(t.urlHash(), searchWords) != null) ? 256 << ranking.coeff_appurl : 0;
return Long.MAX_VALUE - r; // returns a reversed number: the lower the number the better the ranking. This is used for simple sorting with a TreeMap
return r; // the higher the number the better the ranking.
}
private static final String patchUK(String l) {

@ -31,6 +31,8 @@ import java.util.Iterator;
import java.util.Map;
import net.yacy.cora.document.MultiProtocolURI;
import net.yacy.cora.storage.WeakPriorityBlockingQueue;
import net.yacy.cora.storage.WeakPriorityBlockingQueue.ReverseElement;
import net.yacy.document.Condenser;
import net.yacy.kelondro.data.meta.URIMetadataRow;
import net.yacy.kelondro.data.word.Word;
@ -38,8 +40,6 @@ import net.yacy.kelondro.index.HandleSet;
import net.yacy.kelondro.index.RowSpaceExceededException;
import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.util.EventTracker;
import net.yacy.kelondro.util.SortStack;
import net.yacy.kelondro.util.SortStore;
import net.yacy.repository.LoaderDispatcher;
import de.anomic.crawler.CrawlProfile;
@ -57,8 +57,8 @@ public class ResultFetcher {
// result values
protected final LoaderDispatcher loader;
protected Worker[] workerThreads;
protected final SortStore<ResultEntry> result;
protected final SortStore<MediaSnippet> images; // container to sort images by size
protected final WeakPriorityBlockingQueue<ReverseElement<ResultEntry>> result;
protected final WeakPriorityBlockingQueue<ReverseElement<MediaSnippet>> images; // container to sort images by size
protected final HandleSet failedURLs; // a set of urlhashes that could not been verified during search
protected final HandleSet snippetFetchWordHashes; // a set of word hashes that are used to match with the snippets
long urlRetrievalAllTime;
@ -80,8 +80,8 @@ public class ResultFetcher {
this.urlRetrievalAllTime = 0;
this.snippetComputationAllTime = 0;
this.result = new SortStore<ResultEntry>(-1, true); // this is the result, enriched with snippets, ranked and ordered by ranking
this.images = new SortStore<MediaSnippet>(-1, true);
this.result = new WeakPriorityBlockingQueue<ReverseElement<ResultEntry>>(-1); // this is the result, enriched with snippets, ranked and ordered by ranking
this.images = new WeakPriorityBlockingQueue<ReverseElement<MediaSnippet>>(-1);
this.failedURLs = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0); // a set of url hashes where a worker thread tried to work on, but failed.
// snippets do not need to match with the complete query hashes,
@ -155,12 +155,12 @@ public class ResultFetcher {
boolean nav_topics = query.navigators.equals("all") || query.navigators.indexOf("topics") >= 0;
try {
while (System.currentTimeMillis() < this.timeout) {
if (result.size() > neededResults) break;
if (result.sizeAvailable() > neededResults) break;
this.lastLifeSign = System.currentTimeMillis();
// check if we have enough
if ((query.contentdom == ContentDomain.IMAGE) && (images.size() >= query.neededResults() + 50)) break;
if ((query.contentdom != ContentDomain.IMAGE) && (result.size() >= query.neededResults() + 10)) break;
if ((query.contentdom == ContentDomain.IMAGE) && (images.sizeAvailable() >= query.neededResults() + 50)) break;
if ((query.contentdom != ContentDomain.IMAGE) && (result.sizeAvailable() >= query.neededResults() + 10)) break;
// get next entry
page = rankedCache.takeURL(true, taketimeout);
@ -171,7 +171,7 @@ public class ResultFetcher {
final ResultEntry resultEntry = fetchSnippet(page, cacheStrategy); // does not fetch snippets if snippetMode == 0
if (resultEntry == null) continue; // the entry had some problems, cannot be used
if (result.exists(resultEntry)) continue;
//if (result.contains(resultEntry)) continue;
urlRetrievalAllTime += resultEntry.dbRetrievalTime;
snippetComputationAllTime += resultEntry.snippetComputationTime;
@ -182,7 +182,7 @@ public class ResultFetcher {
long ranking = Long.valueOf(rankedCache.getOrder().cardinal(resultEntry.word()));
ranking += postRanking(resultEntry, rankedCache.getTopics());
//System.out.println("*** resultEntry.hash = " + resultEntry.hash());
result.push(resultEntry, ranking);
result.put(new ReverseElement<ResultEntry>(resultEntry, ranking)); // remove smallest in case of overflow
if (nav_topics) rankedCache.addTopics(resultEntry);
//System.out.println("DEBUG SNIPPET_LOADING: thread " + id + " got " + resultEntry.url());
}
@ -273,17 +273,13 @@ public class ResultFetcher {
Log.logInfo("SEARCH", "sorted out urlhash " + new String(urlhash) + " during search: " + reason);
}
public int resultCount() {
return this.result.size();
}
public ResultEntry oneResult(final int item) {
// check if we already retrieved this item
// (happens if a search pages is accessed a second time)
EventTracker.update("SEARCH", new ProfilingGraph.searchEvent(query.id(true), "obtain one result entry - start", 0, 0), false, 30000, ProfilingGraph.maxTime);
if (this.result.size() > item) {
if (this.result.sizeAvailable() > item) {
// we have the wanted result already in the result array .. return that
return this.result.element(item).element;
return this.result.element(item).getElement();
}
/*
System.out.println("rankedCache.size() = " + this.rankedCache.size());
@ -291,10 +287,10 @@ public class ResultFetcher {
System.out.println("query.neededResults() = " + query.neededResults());
*/
if ((!anyWorkerAlive()) &&
(((query.contentdom == ContentDomain.IMAGE) && (images.size() + 30 < query.neededResults())) ||
(this.result.size() < query.neededResults())) &&
(((query.contentdom == ContentDomain.IMAGE) && (images.sizeAvailable() + 30 < query.neededResults())) ||
(this.result.sizeAvailable() < query.neededResults())) &&
//(event.query.onlineSnippetFetch) &&
(this.rankedCache.size() > this.result.size())
(this.rankedCache.size() > this.result.sizeAvailable())
) {
// start worker threads to fetch urls and snippets
deployWorker(Math.min(10, query.itemsPerPage), query.neededResults());
@ -302,13 +298,13 @@ public class ResultFetcher {
// finally wait until enough results are there produced from the
// snippet fetch process
while ((anyWorkerAlive()) && (result.size() <= item)) {
while ((anyWorkerAlive()) && (result.sizeAvailable() <= item)) {
try {Thread.sleep((item % query.itemsPerPage) * 10L);} catch (final InterruptedException e) {}
}
// finally, if there is something, return the result
if (this.result.size() <= item) return null;
return this.result.element(item).element;
if (this.result.sizeAvailable() <= item) return null;
return this.result.element(item).getElement();
}
private int resultCounter = 0;
@ -320,19 +316,19 @@ public class ResultFetcher {
public MediaSnippet oneImage(final int item) {
// always look for a next object if there are way too less
if (this.images.size() <= item + 10) fillImagesCache();
if (this.images.sizeAvailable() <= item + 10) fillImagesCache();
// check if we already retrieved the item
if (this.images.size() > item) return this.images.element(item).element;
if (this.images.sizeDrained() > item) return this.images.element(item).getElement();
// look again if there are not enough for presentation
while (this.images.size() <= item) {
while (this.images.sizeAvailable() <= item) {
if (fillImagesCache() == 0) break;
}
if (this.images.size() <= item) return null;
if (this.images.sizeAvailable() <= item) return null;
// now take the specific item from the image stack
return this.images.element(item).element;
return this.images.element(item).getElement();
}
private int fillImagesCache() {
@ -343,7 +339,7 @@ public class ResultFetcher {
final ArrayList<MediaSnippet> imagemedia = result.mediaSnippets();
if (imagemedia != null) {
for (MediaSnippet ms: imagemedia) {
images.push(ms, Long.valueOf(ms.ranking));
images.put(new ReverseElement<MediaSnippet>(ms, ms.ranking)); // remove smallest in case of overflow
c++;
//System.out.println("*** image " + new String(ms.href.hash()) + " images.size = " + images.size() + "/" + images.size());
}
@ -351,13 +347,13 @@ public class ResultFetcher {
return c;
}
public ArrayList<SortStack<ResultEntry>.stackElement> completeResults(final long waitingtime) {
public ArrayList<ReverseElement<ResultEntry>> completeResults(final long waitingtime) {
final long timeout = System.currentTimeMillis() + waitingtime;
while ((result.size() < query.neededResults()) && (anyWorkerAlive()) && (System.currentTimeMillis() < timeout)) {
while ((result.sizeAvailable() < query.neededResults()) && (anyWorkerAlive()) && (System.currentTimeMillis() < timeout)) {
try {Thread.sleep(100);} catch (final InterruptedException e) {}
//System.out.println("+++DEBUG-completeResults+++ sleeping " + 200);
}
return this.result.list(this.result.size());
return this.result.list(this.result.sizeAvailable());
}
public long postRanking(

@ -37,7 +37,6 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import net.yacy.kelondro.data.word.WordReference;
import net.yacy.kelondro.data.word.WordReferenceVars;
import net.yacy.kelondro.index.HandleSet;
import net.yacy.kelondro.index.RowSpaceExceededException;
import net.yacy.kelondro.logging.Log;
@ -533,11 +532,6 @@ public final class SearchEvent {
}
public void remove(final WordReferenceVars reference) {
this.rankedCache.remove(reference);
}
public ResultFetcher result() {
return this.results;
}

@ -0,0 +1,352 @@
/**
* WeakPriorityBlockingQueue
* an priority blocking queue that drains elements if it gets too large
* Copyright 2010 by Michael Peter Christen, mc@yacy.net, Frankfurt a. M., Germany
* First released 09.09.2010 at http://yacy.net
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
*/
package net.yacy.cora.storage;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.TreeSet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* implements a stack where elements 'float' on-top of the stack according to a weight value.
* objects pushed on the stack must implement the hashCode() method to provide a handle
* for a double-check.
* If the queue gets larger that the given maxsize, then elements from the tail of the queue
* are drained (deleted).
*/
public class WeakPriorityBlockingQueue<E> {
private final TreeSet<E> queue; // object within the stack, ordered using a TreeSet
private final Semaphore enqueued; // semaphore for elements in the stack
private final ArrayList<E> drained; // objects that had been on the stack but had been removed
protected int maxsize;
/**
* create a new WeakPriorityBlockingQueue
* all elements in the stack are not ordered by their insert order but by a given element weight
* weights that are preferred are returned first when a pop from the stack is made
* @param maxsize the maximum size of the stack. When the stack exceeds this number, then entries are removed
*/
public WeakPriorityBlockingQueue(final int maxsize) {
// the maxsize is the maximum number of entries in the stack
// if this is set to -1, the size is unlimited
this.queue = new TreeSet<E>();
this.drained = new ArrayList<E>();
this.enqueued = new Semaphore(0);
this.maxsize = maxsize;
}
/**
* clear the queue
*/
public synchronized void clear() {
this.drained.clear();
this.queue.clear();
this.enqueued.drainPermits();
}
/**
* test if the queue is empty
* @return true if the queue is empty, false if not
*/
public boolean isEmpty() {
return this.queue.isEmpty() & this.drained.isEmpty();
}
/**
* get the number of elements in the queue, waiting to be removed with take() or poll()
* @return
*/
public synchronized int sizeQueue() {
return this.queue.size();
}
/**
* get the number of elements that had been drained so far and are wainting
* in a list to get enumerated with element()
* @return
*/
public synchronized int sizeDrained() {
return this.drained.size();
}
/**
* get the number of elements that are available for retrieval
* this is a combined number of sizeQueue() and sizeDrained();
* @return
*/
public synchronized int sizeAvailable() {
return this.queue.size() + this.drained.size();
}
/**
* put a element on the stack using a order of the weight
* elements that had been on the stack cannot be put in again,
* they are checked against the drained list
* @param element the element (must have a equals() method)
* @param weight the weight of the element
* @param remove - the rating of the element that shall be removed in case that the stack has an size overflow
*/
public synchronized void put(final E element) {
// put the element on the stack
if (this.drained.contains(element)) return;
if (this.queue.size() == this.maxsize) {
// remove last elements if stack is too large
this.queue.remove(this.queue.last());
this.queue.add(element);
} else {
this.queue.add(element);
this.enqueued.release();
}
assert this.queue.size() == this.enqueued.availablePermits();
}
/**
* return the element with the smallest weight and remove it from the stack
* @return null if no element is on the queue or the head of the queue
*/
public synchronized E poll() {
if (this.queue.isEmpty()) return null;
this.enqueued.tryAcquire();
return takeUnsafe();
}
/**
* Retrieves and removes the head of this queue, waiting if necessary
* up to the specified wait time if no elements are present on this queue.
* @param timeout milliseconds until timeout
* @return the head element from the queue
* @throws InterruptedException
*/
public synchronized E poll(long timeout) throws InterruptedException {
boolean a = this.enqueued.tryAcquire(timeout, TimeUnit.MILLISECONDS);
if (!a) return null;
return takeUnsafe();
}
/**
* Retrieves and removes the head of this queue, waiting if no elements are present on this queue.
* @return the head element from the queue
* @throws InterruptedException
*/
public synchronized E take() throws InterruptedException {
this.enqueued.acquire();
return takeUnsafe();
}
private E takeUnsafe() {
final E element = this.queue.first();
assert element != null;
this.queue.remove(element);
this.drained.add(element);
assert this.queue.size() == this.enqueued.availablePermits();
return element;
}
/**
* return the element with the smallest weight, but do not remove it
* @return null if no element is on the queue or the head of the queue
*/
public synchronized E peek() {
if (this.queue.isEmpty()) return null;
return this.queue.first();
}
/**
* all objects that have been returned by poll or take are stored in a back-up list
* where they can be retrieved afterward. The elements from that list are stored in
* the specific order as they had been retrieved. This method returns the elements
* in that specific order and if the list is not large enough, elements available
* with poll() are taken and written to the list until the required position is
* written. If the stach size together with the recorded list is not large enough,
* null is returned
* @param position inside the drained queue
* @return the element from the recorded position or null if that position is not available
*/
public synchronized E element(final int position) {
if (position < this.drained.size()) {
return this.drained.get(position);
}
if (position >= this.queue.size() + this.drained.size()) return null; // we don't have that element
while (position >= this.drained.size()) this.poll();
return this.drained.get(position);
}
/**
* retrieve an element from the drained queue but wait until a timeout
* until returning null when no element will be available within the time
* from the input queue
* @param position inside the drained queue
* @param time the timeout
* @return the element from the recorded position or null if that position is not available within the timeout
* @throws InterruptedException
*/
public synchronized E element(final int position, long time) throws InterruptedException {
long timeout = System.currentTimeMillis() + time;
if (position < this.drained.size()) {
return this.drained.get(position);
}
if (position >= this.queue.size() + this.drained.size()) return null; // we don't have that element
while (position >= this.drained.size()) {
long t = timeout - System.currentTimeMillis();
if (t <= 0) break;
this.poll(t);
}
if (position >= this.drained.size()) return null; // we still don't have that element
return this.drained.get(position);
}
/**
* return the specific amount of entrie as they would be retrievable with element()
* if count is < 0 then all elements are taken
* the returned list is not cloned from the internal list and shall not be modified in any way (read-only)
* @param count
* @return a list of elements in the stack
*/
public synchronized ArrayList<E> list(final int count) {
if (count < 0) {
// shift all elements
while (!this.queue.isEmpty()) this.poll();
return this.drained;
}
if (count > sizeAvailable()) throw new RuntimeException("list(" + count + ") exceeded avaiable number of elements (" + sizeAvailable() + ")");
while (count > this.drained.size()) this.poll();
return this.drained;
}
/**
* iterate over all elements available. All elements that are still in the queue are drained to recorded positions
* @return an iterator over all drained positions.
*/
public synchronized Iterator<E> iterator() {
// shift all elements to the offstack
while (!this.queue.isEmpty()) this.poll();
return this.drained.iterator();
}
protected interface Element<E> {
public long getWeight();
public E getElement();
public boolean equals(Element<E> o);
public int hashCode();
public String toString();
}
protected abstract static class AbstractElement<E> {
public long weight;
public E element;
public long getWeight() {
return this.weight;
}
public E getElement() {
return this.element;
}
public boolean equals(Element<E> o) {
return this.element.equals(o.getElement());
}
public int hashCode() {
return this.element.hashCode();
}
public String toString() {
return element.toString() + "/" + weight;
}
}
/**
* natural ordering elements, can be used as container of objects <E> in the priority queue
* the elements with smallest ordering weights are first in the queue when elements are taken
*/
public static class NaturalElement<E> extends AbstractElement<E> implements Comparable<NaturalElement<E>>, Comparator<NaturalElement<E>> {
public NaturalElement(final E element, final long weight) {
this.element = element;
this.weight = weight;
}
public int compare(NaturalElement<E> o1, NaturalElement<E> o2) {
return o1.compareTo(o2);
}
public int compareTo(NaturalElement<E> o) {
if (this.element == o.getElement()) return 0;
if (this.element.equals(o.getElement())) return 0;
if (this.weight > o.getWeight()) return 1;
if (this.weight < o.getWeight()) return -1;
int o1h = this.hashCode();
int o2h = o.hashCode();
if (o1h > o2h) return 1;
if (o1h < o2h) return -1;
return 0;
}
}
/**
* reverse ordering elements, can be used as container of objects <E> in the priority queue
* the elements with highest ordering weights are first in the queue when elements are taken
*/
public static class ReverseElement<E> extends AbstractElement<E> implements Comparable<ReverseElement<E>>, Comparator<ReverseElement<E>> {
public ReverseElement(final E element, final long weight) {
this.element = element;
this.weight = weight;
}
public int compare(ReverseElement<E> o1, ReverseElement<E> o2) {
return o1.compareTo(o2);
}
public int compareTo(ReverseElement<E> o) {
if (this.element == o.getElement()) return 0;
if (this.element.equals(o.getElement())) return 0;
if (this.weight > o.getWeight()) return -1;
if (this.weight < o.getWeight()) return 1;
int o1h = this.hashCode();
int o2h = o.hashCode();
if (o1h > o2h) return -1;
if (o1h < o2h) return 1;
return 0;
}
}
public static void main(String[] args) {
WeakPriorityBlockingQueue<ReverseElement<String>> a = new WeakPriorityBlockingQueue<ReverseElement<String>>(3);
a.put(new ReverseElement<String>("abc", 1));
//a.poll();
a.put(new ReverseElement<String>("abcx", 2));
a.put(new ReverseElement<String>("6s_7dfZk4xvc", 3));
a.put(new ReverseElement<String>("6s_7dfZk4xvcx", 4));
//a.poll();
System.out.println("size = " + a.sizeAvailable());
while (a.sizeQueue() > 0) System.out.println("> " + a.poll().toString());
}
}

@ -27,6 +27,7 @@
package net.yacy.kelondro.data.word;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
@ -42,7 +43,7 @@ import net.yacy.kelondro.util.ByteArray;
import net.yacy.kelondro.index.Row;
public class WordReferenceVars extends AbstractReference implements WordReference, Reference, Cloneable, Comparable<WordReferenceVars> {
public class WordReferenceVars extends AbstractReference implements WordReference, Reference, Cloneable, Comparable<WordReferenceVars>, Comparator<WordReferenceVars> {
/**
* object for termination of concurrent blocking queue processing
@ -380,6 +381,10 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
public int compareTo(final WordReferenceVars o) {
return Base64Order.enhancedCoder.compare(this.urlHash, o.metadataHash());
}
public int compare(WordReferenceVars o1, WordReferenceVars o2) {
return o1.compareTo(o2);
}
public void addPosition(final int position) {
this.positions.add(position);

@ -1,205 +0,0 @@
// kelondroSortStack.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 20.02.2008 on http://yacy.net
//
// This is a part of YaCy, a peer-to-peer based web search engine
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package net.yacy.kelondro.util;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
public class SortStack<E> {
// implements a stack where elements 'float' on-top of the stack according to a weight value.
// objects pushed on the stack must implement the hashCode() method to provide a handle
// for a double-check.
private static final Object PRESENT = new Object(); // Dummy value to associate with an Object in the backing Map
private TreeMap<Long, List<E>> onstack; // object within the stack
private ConcurrentHashMap<E, Object> instack; // keeps track which element has been on the stack
protected int maxsize;
private boolean upward;
public SortStack(boolean upward) {
this(-1, upward);
}
/**
* create a new sort stack
* all elements in the stack are not ordered by their insert order but by a given element weight
* weights that are preferred are returned first when a pop from the stack is made
* the stack may be ordered upward (preferring small weights) or downward (preferring high wights)
* @param maxsize the maximum size of the stack. When the stack exceeds this number, then the worst entries according to entry order are removed
* @param upward is the entry order and controls which elements are returned on pop. if true, then the smallest is returned first
*/
public SortStack(final int maxsize, boolean upward) {
// the maxsize is the maximum number of entries in the stack
// if this is set to -1, the size is unlimited
this.onstack = new TreeMap<Long, List<E>>();
this.instack = new ConcurrentHashMap<E, Object>();
this.maxsize = maxsize;
this.upward = upward;
}
public boolean isEmpty() {
return this.instack.isEmpty();
}
public int size() {
/*
int c = 0;
synchronized (onstack) {
for (List<E> l: onstack.values()) c += l.size();
assert c == this.instack.size() : "c = " + c + "; this.size() = " + this.instack.size();
}
*/
return this.instack.size();
}
/**
* put a element on the stack using a order of the weight
* @param element
* @param weight
*/
public void push(final E element, Long weight) {
// put the element on the stack
synchronized (this.onstack) {
if (this.instack.put(element, PRESENT) != null) return;
List<E> l = this.onstack.get(weight);
if (l == null) {
l = new LinkedList<E>();
l.add(element);
this.onstack.put(weight, l);
} else {
l.add(element);
}
//this.instack.put(element, PRESENT);
}
// check maximum size of the stack an remove elements if the stack gets too large
if (this.maxsize <= 0) return;
while (!this.onstack.isEmpty() && this.onstack.size() > this.maxsize) synchronized (this.onstack) {
List<E> l;
if (!this.onstack.isEmpty() && this.onstack.size() > this.maxsize) {
l = this.onstack.remove((this.upward) ? this.onstack.lastKey() : this.onstack.firstKey());
for (E e: l) instack.remove(e);
}
}
}
/**
* return the element with the smallest weight
* @return
*/
public stackElement top() {
// returns the element that is currently on top of the stack
final E element;
final Long w;
synchronized (this.onstack) {
if (this.onstack.isEmpty()) return null;
w = (this.upward) ? this.onstack.firstKey() : this.onstack.lastKey();
final List<E> l = this.onstack.get(w);
element = l.get(0);
}
return new stackElement(element, w);
}
/**
* return the element with the smallest weight and remove it from the stack
* @return
*/
public stackElement pop() {
// returns the element that is currently on top of the stack
// it is removed and added to the offstack list
final E element;
final Long w;
synchronized (this.onstack) {
if (this.onstack.isEmpty()) return null;
w = (this.upward) ? this.onstack.firstKey() : this.onstack.lastKey();
final List<E> l = this.onstack.get(w);
element = l.remove(0);
this.instack.remove(element);
if (l.isEmpty()) this.onstack.remove(w);
}
return new stackElement(element, w);
}
public boolean exists(final E element) {
// uses the hashCode of the element to find out of the element had been on the list or the stack
return this.instack.contains(element);
}
public void remove(final E element) {
synchronized (this.onstack) {
if (!this.instack.contains(element)) return;
for (Map.Entry<Long,List<E>> entry: this.onstack.entrySet()) {
Iterator<E> i = entry.getValue().iterator();
while (i.hasNext()) {
if (i.next().equals(element)) {
i.remove();
if (entry.getValue().isEmpty()) {
this.onstack.remove(entry.getKey());
}
return;
}
}
}
}
}
public boolean bottom(final long weight) {
// returns true if the element with that weight would be on the bottom of the stack after inserting
if (this.onstack.isEmpty()) return true;
Long l;
if (this.upward) {
synchronized (this.onstack) {
l = this.onstack.lastKey();
}
return weight > l.longValue();
} else {
synchronized (this.onstack) {
l = this.onstack.firstKey();
}
return weight < l.longValue();
}
}
public class stackElement {
public Long weight;
public E element;
public stackElement(final E element, final Long weight) {
this.element = element;
this.weight = weight;
}
public String toString() {
return element.toString() + "/" + weight;
}
}
}

@ -1,168 +0,0 @@
// kelondroSortStore.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 20.02.2008 on http://yacy.net
//
// This is a part of YaCy, a peer-to-peer based web search engine
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package net.yacy.kelondro.util;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
/**
* extends the sortStack in such a way that it adds a list where objects, that had
* been pulled from the stack with pop are listed. Provides access methods to address
* specific elements in the list.
* @param <E>
*/
public class SortStore<E> extends SortStack<E> {
private static final Object PRESENT = new Object(); // Dummy value to associate with an Object in the backing Map
private final ArrayList<stackElement> offstack; // objects that had been on the stack but had been removed
private ConcurrentHashMap<E, Object> offset; // keeps track which element has been on the stack or is now in the offstack
private long largest;
public SortStore(boolean upward) {
this(-1, upward);
}
/**
* create a new sort stack
* all elements in the stack are not ordered by their insert order but by a given element weight
* weights that are preferred are returned first when a pop from the stack is made
* the stack may be ordered upward (preferring small weights) or downward (preferring high wights)
* @param maxsize the maximum size of the stack. When the stack exceeds this number, then the worst entries according to entry order are removed
* @param upward is the entry order and controls which elements are returned on pop. if true, then the smallest is returned first
*/
public SortStore(final int maxsize, boolean upward) {
super(maxsize, upward);
this.largest = Long.MIN_VALUE;
this.offstack = new ArrayList<stackElement>();
this.offset = new ConcurrentHashMap<E, Object>();
}
public boolean isEmpty() {
if (!super.isEmpty()) return false;
return this.offstack.isEmpty();
}
public int size() {
return super.size() + this.offstack.size();
}
public void push(final E element, final Long weight) {
if (this.offset.containsKey(element)) return;
if (super.exists(element)) return;
super.push(element, weight);
this.largest = Math.max(this.largest, weight.longValue());
if (this.maxsize <= 0) return;
while (!super.isEmpty() && this.size() > this.maxsize) {
this.pop();
}
}
/**
* return the element that is currently on top of the stack
* it is removed and added to the offstack list
* this is exactly the same as element(offstack.size())
*/
public stackElement pop() {
final stackElement se = super.pop();
if (se == null) return null;
this.offset.put(se.element, PRESENT);
this.offstack.add(se);
return se;
}
public stackElement top() {
return super.top();
}
public boolean exists(final E element) {
return super.exists(element) || this.offset.containsKey(element);
}
/**
* return an element from a specific position. It is either taken from the offstack,
* or removed from the onstack.
* The offstack will grow if elements are not from the offstack and present at the onstack.
* @param position
* @return
*/
public stackElement element(final int position) {
if (position < this.offstack.size()) {
return this.offstack.get(position);
}
if (position >= super.size() + this.offstack.size()) return null; // we don't have that element
while (position >= this.offstack.size()) this.pop();
return this.offstack.get(position);
}
/**
* return the specific amount of entries. If they are not yet present in the offstack, they are shifted there from the onstack
* if count is < 0 then all elements are taken
* the returned list is not cloned from the internal list and shall not be modified in any way (read-only)
* @param count
* @return
*/
public ArrayList<stackElement> list(final int count) {
if (count < 0) {
// shift all elements
while (!super.isEmpty()) this.pop();
return this.offstack;
}
if (count > super.size() + this.offstack.size()) throw new RuntimeException("list(" + count + ") exceeded avaiable number of elements (" + size() + ")");
while (count > this.offstack.size()) this.pop();
return this.offstack;
}
public void remove(final E element) {
super.remove(element);
synchronized (this.offstack) {
Iterator<stackElement> i = this.offstack.iterator();
while (i.hasNext()) {
if (i.next().element.equals(element)) {
i.remove();
return;
}
}
}
}
public synchronized boolean bottom(final long weight) {
if (super.bottom(weight)) return true;
return weight > this.largest;
}
public static void main(String[] args) {
SortStore<String> a = new SortStore<String>(true);
a.push("abc", 1L);
a.pop();
a.push("abc", 2L);
a.push("6s_7dfZk4xvc", 1L);
a.push("6s_7dfZk4xvc", 1L);
a.pop();
System.out.println("size = " + a.size());
}
}
Loading…
Cancel
Save