diff --git a/source/de/anomic/search/ReferenceOrder.java b/source/de/anomic/search/ReferenceOrder.java index e9964b0bc..6f1c9cd21 100644 --- a/source/de/anomic/search/ReferenceOrder.java +++ b/source/de/anomic/search/ReferenceOrder.java @@ -31,6 +31,7 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import net.yacy.document.Condenser; import net.yacy.kelondro.data.meta.DigestURI; @@ -44,6 +45,8 @@ import net.yacy.kelondro.util.ScoreCluster; public class ReferenceOrder { + + private static int cores = Runtime.getRuntime().availableProcessors(); protected int maxdomcount; protected WordReferenceVars min, max; @@ -60,20 +63,74 @@ public class ReferenceOrder { this.language = language; } - public class Normalizer extends Thread { + public BlockingQueue normalizeWith(final ReferenceContainer container) { + BlockingQueue vars = WordReferenceVars.transform(container); + LinkedBlockingQueue out = new LinkedBlockingQueue(); + Thread distributor = new NormalizeDistributor(vars, out); + distributor.start(); - private final ReferenceContainer container; + // return the resulting queue while the processing queues are still working + return out; + } + + public class NormalizeDistributor extends Thread { + + BlockingQueue vars; + LinkedBlockingQueue out; + + public NormalizeDistributor(BlockingQueue vars, LinkedBlockingQueue out) { + this.vars = vars; + this.out = out; + } + + @Override + public void run() { + // start the transformation threads + int cores0 = cores + 1; + Semaphore termination = new Semaphore(cores0); + NormalizeWorker[] worker = new NormalizeWorker[cores0]; + for (int i = 0; i < cores0; i++) { + worker[i] = new NormalizeWorker(out, termination); + worker[i].start(); + } + + // fill the queue + WordReferenceVars iEntry; + int p = 0; + try { + while ((iEntry = vars.take()) != WordReferenceVars.poison) { + worker[p % cores0].add(iEntry); + p++; + } + } catch (InterruptedException e) { + } + + // insert poison to stop the queues + for (int i = 0; i < cores0; i++) worker[i].add(WordReferenceVars.poison); + } + } + + public class NormalizeWorker extends Thread { + + private final BlockingQueue out; + private final Semaphore termination; private final BlockingQueue decodedEntries; - public Normalizer(final ReferenceContainer container) { + public NormalizeWorker(final BlockingQueue out, Semaphore termination) { // normalize ranking: find minimum and maximum of separate ranking criteria - assert (container != null); - this.container = container; + this.out = out; + this.termination = termination; this.decodedEntries = new LinkedBlockingQueue(); } + public void add(WordReferenceVars entry) { + try { + decodedEntries.put(entry); + } catch (InterruptedException e) { + } + } + public void run() { - BlockingQueue vars = WordReferenceVars.transform(container); HashMap doms0 = new HashMap(); Integer int1 = 1; @@ -83,8 +140,8 @@ public class ReferenceOrder { Integer count; try { // calculate min and max for normalization - while ((iEntry = vars.take()) != WordReferenceVars.poison) { - decodedEntries.put(iEntry); + while ((iEntry = decodedEntries.take()) != WordReferenceVars.poison) { + out.put(iEntry); // find min/max if (min == null) min = iEntry.clone(); else min.min(iEntry); if (max == null) max = iEntry.clone(); else max.max(iEntry); @@ -111,21 +168,13 @@ public class ReferenceOrder { } catch (Exception e) { Log.logException(e); } finally { + // insert poison to signal the termination to next queue try { - decodedEntries.put(WordReferenceVars.poison); + this.termination.acquire(); + if (this.termination.availablePermits() == 0) this.out.put(WordReferenceVars.poison); } catch (InterruptedException e) {} } } - - public BlockingQueue decoded() { - return this.decodedEntries; - } - } - - public BlockingQueue normalizeWith(final ReferenceContainer container) { - Normalizer n = new Normalizer(container); - n.start(); - return n.decoded(); } public int authority(final String urlHash) { diff --git a/source/net/yacy/kelondro/data/meta/DigestURI.java b/source/net/yacy/kelondro/data/meta/DigestURI.java index 232b0c9a5..e3f80121d 100644 --- a/source/net/yacy/kelondro/data/meta/DigestURI.java +++ b/source/net/yacy/kelondro/data/meta/DigestURI.java @@ -938,6 +938,7 @@ public class DigestURI implements Serializable { private static String[] testTLDs = new String[] { "com", "net", "org", "uk", "fr", "de", "es", "it" }; public static final DigestURI probablyWordURL(final byte[] urlHash, final TreeSet words) { + assert urlHash != null; final Iterator wi = words.iterator(); String word; while (wi.hasNext()) { diff --git a/source/net/yacy/kelondro/data/word/WordReferenceVars.java b/source/net/yacy/kelondro/data/word/WordReferenceVars.java index 71b7caafb..42674f6f9 100644 --- a/source/net/yacy/kelondro/data/word/WordReferenceVars.java +++ b/source/net/yacy/kelondro/data/word/WordReferenceVars.java @@ -47,7 +47,7 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc * object for termination of concurrent blocking queue processing */ public static final WordReferenceVars poison = new WordReferenceVars(); - public static int cores = Runtime.getRuntime().availableProcessors(); + private static int cores = Runtime.getRuntime().availableProcessors(); public Bitfield flags; public long lastModified; @@ -388,56 +388,6 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc * @param container * @return a blocking queue filled with WordReferenceVars that is still filled when the object is returned */ - /* - public static BlockingQueue transform(ReferenceContainer container) { - LinkedBlockingQueue in = new LinkedBlockingQueue(); - LinkedBlockingQueue out = new LinkedBlockingQueue(); - - // start the transformation threads - int cores = Math.min(Runtime.getRuntime().availableProcessors(), container.size() / 200 + 1); - for (int i = 0; i < cores; i++) new Transformer(in, out).start(); - - // fill the queue - int p = 0; - try { - while (p < container.size()) { - in.put(new WordReferenceRow(container.get(p++, false))); - } - } catch (InterruptedException e) {} - - // insert poison to stop the queues - try { - for (int i = 0; i < cores; i++) in.put(WordReferenceRow.poison); - } catch (InterruptedException e) {} - - // return the resulting queue while the processing queues are still working - return out; - } - - public static class Transformer extends Thread { - - BlockingQueue in; - BlockingQueue out; - - public Transformer(final BlockingQueue in, final BlockingQueue out) { - this.in = in; - this.out = out; - } - - @Override - public void run() { - WordReferenceRow row; - try { - while ((row = in.take()) != WordReferenceRow.poison) out.put(new WordReferenceVars(row)); - } catch (InterruptedException e) {} - - // insert poison to signal the termination to next queue - try { - out.put(WordReferenceVars.poison); - } catch (InterruptedException e) {} - } - } - */ public static BlockingQueue transform(ReferenceContainer container) { LinkedBlockingQueue out = new LinkedBlockingQueue();