From 70e62229786c96625638ff77d4d32164a9038c9f Mon Sep 17 00:00:00 2001 From: orbiter Date: Tue, 13 Apr 2010 11:12:36 +0000 Subject: [PATCH] more concurrency during search requests git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6801 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- .../kelondro/data/word/WordReferenceRow.java | 14 ++-- .../kelondro/data/word/WordReferenceVars.java | 82 ++++++++++++++++++- 2 files changed, 89 insertions(+), 7 deletions(-) diff --git a/source/net/yacy/kelondro/data/word/WordReferenceRow.java b/source/net/yacy/kelondro/data/word/WordReferenceRow.java index bbbe48ad3..13fc4362f 100644 --- a/source/net/yacy/kelondro/data/word/WordReferenceRow.java +++ b/source/net/yacy/kelondro/data/word/WordReferenceRow.java @@ -41,12 +41,7 @@ import net.yacy.kelondro.rwi.Reference; */ public final class WordReferenceRow extends AbstractReference implements WordReference, Cloneable { - /** - * object for termination of concurrent blocking queue processing - */ - public static final WordReferenceRow poison = new WordReferenceRow((Row.Entry) null); - - + public static final Row urlEntryRow = new Row(new Column[]{ new Column("h", Column.celltype_string, Column.encoder_bytes, Word.commonHashLength, "urlhash"), new Column("a", Column.celltype_cardinal, Column.encoder_b256, 2, "lastModified"), @@ -73,6 +68,13 @@ public final class WordReferenceRow extends AbstractReference implements WordRef ); // available chars: b,e,j,q + /** + * object for termination of concurrent blocking queue processing + */ + public static final Row.Entry poisonRowEntry = urlEntryRow.newEntry(); + public static final WordReferenceRow poison = new WordReferenceRow(poisonRowEntry); + + // static properties private static final int col_urlhash = 0; // h 12 the url hash b64-encoded private static final int col_lastModified = 1; // a 2 last-modified time of the document where word appears diff --git a/source/net/yacy/kelondro/data/word/WordReferenceVars.java b/source/net/yacy/kelondro/data/word/WordReferenceVars.java index 834c8859f..71b7caafb 100644 --- a/source/net/yacy/kelondro/data/word/WordReferenceVars.java +++ b/source/net/yacy/kelondro/data/word/WordReferenceVars.java @@ -29,6 +29,7 @@ package net.yacy.kelondro.data.word; import java.util.ArrayList; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; import net.yacy.kelondro.index.Row.Entry; import net.yacy.kelondro.order.Base64Order; @@ -37,6 +38,7 @@ import net.yacy.kelondro.order.MicroDate; import net.yacy.kelondro.rwi.AbstractReference; import net.yacy.kelondro.rwi.Reference; import net.yacy.kelondro.rwi.ReferenceContainer; +import net.yacy.kelondro.index.Row; public class WordReferenceVars extends AbstractReference implements WordReference, Reference, Cloneable, Comparable { @@ -45,7 +47,7 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc * object for termination of concurrent blocking queue processing */ public static final WordReferenceVars poison = new WordReferenceVars(); - + public static int cores = Runtime.getRuntime().availableProcessors(); public Bitfield flags; public long lastModified; @@ -386,6 +388,7 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc * @param container * @return a blocking queue filled with WordReferenceVars that is still filled when the object is returned */ + /* public static BlockingQueue transform(ReferenceContainer container) { LinkedBlockingQueue in = new LinkedBlockingQueue(); LinkedBlockingQueue out = new LinkedBlockingQueue(); @@ -434,5 +437,82 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc } catch (InterruptedException e) {} } } + */ + + public static BlockingQueue transform(ReferenceContainer container) { + LinkedBlockingQueue out = new LinkedBlockingQueue(); + Thread distributor = new TransformDistributor(container, out); + distributor.start(); + + // return the resulting queue while the processing queues are still working + return out; + } + + public static class TransformDistributor extends Thread { + + ReferenceContainer container; + LinkedBlockingQueue out; + + public TransformDistributor(ReferenceContainer container, LinkedBlockingQueue out) { + this.container = container; + this.out = out; + } + + @Override + public void run() { + // start the transformation threads + int cores0 = Math.min(cores, container.size() / 100) + 1; + Semaphore termination = new Semaphore(cores0); + TransformWorker[] worker = new TransformWorker[cores0]; + for (int i = 0; i < cores0; i++) { + worker[i] = new TransformWorker(out, termination); + worker[i].start(); + } + + // fill the queue + int p = container.size(); + while (p > 0) { + p--; + worker[p % cores0].add(container.get(p, false)); + } + + // insert poison to stop the queues + for (int i = 0; i < cores0; i++) worker[i].add(WordReferenceRow.poisonRowEntry); + } + } + + public static class TransformWorker extends Thread { + + BlockingQueue in; + BlockingQueue out; + Semaphore termination; + + public TransformWorker(final BlockingQueue out, Semaphore termination) { + this.in = new LinkedBlockingQueue(); + this.out = out; + this.termination = termination; + } + + public void add(Row.Entry entry) { + try { + in.put(entry); + } catch (InterruptedException e) { + } + } + + @Override + public void run() { + Row.Entry entry; + try { + while ((entry = in.take()) != WordReferenceRow.poisonRowEntry) out.put(new WordReferenceVars(new WordReferenceRow(entry))); + } catch (InterruptedException e) {} + + // insert poison to signal the termination to next queue + try { + this.termination.acquire(); + if (this.termination.availablePermits() == 0) this.out.put(WordReferenceVars.poison); + } catch (InterruptedException e) {} + } + } }