more concurrency during search requests

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6801 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 15 years ago
parent 4917f96729
commit 70e6222978

@ -41,11 +41,6 @@ import net.yacy.kelondro.rwi.Reference;
*/
public final class WordReferenceRow extends AbstractReference implements WordReference, Cloneable {
/**
* object for termination of concurrent blocking queue processing
*/
public static final WordReferenceRow poison = new WordReferenceRow((Row.Entry) null);
public static final Row urlEntryRow = new Row(new Column[]{
new Column("h", Column.celltype_string, Column.encoder_bytes, Word.commonHashLength, "urlhash"),
@ -73,6 +68,13 @@ public final class WordReferenceRow extends AbstractReference implements WordRef
);
// available chars: b,e,j,q
/**
* object for termination of concurrent blocking queue processing
*/
public static final Row.Entry poisonRowEntry = urlEntryRow.newEntry();
public static final WordReferenceRow poison = new WordReferenceRow(poisonRowEntry);
// static properties
private static final int col_urlhash = 0; // h 12 the url hash b64-encoded
private static final int col_lastModified = 1; // a 2 last-modified time of the document where word appears

@ -29,6 +29,7 @@ package net.yacy.kelondro.data.word;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import net.yacy.kelondro.index.Row.Entry;
import net.yacy.kelondro.order.Base64Order;
@ -37,6 +38,7 @@ import net.yacy.kelondro.order.MicroDate;
import net.yacy.kelondro.rwi.AbstractReference;
import net.yacy.kelondro.rwi.Reference;
import net.yacy.kelondro.rwi.ReferenceContainer;
import net.yacy.kelondro.index.Row;
public class WordReferenceVars extends AbstractReference implements WordReference, Reference, Cloneable, Comparable<WordReferenceVars> {
@ -45,7 +47,7 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
* object for termination of concurrent blocking queue processing
*/
public static final WordReferenceVars poison = new WordReferenceVars();
public static int cores = Runtime.getRuntime().availableProcessors();
public Bitfield flags;
public long lastModified;
@ -386,6 +388,7 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
* @param container
* @return a blocking queue filled with WordReferenceVars that is still filled when the object is returned
*/
/*
public static BlockingQueue<WordReferenceVars> transform(ReferenceContainer<WordReference> container) {
LinkedBlockingQueue<WordReferenceRow> in = new LinkedBlockingQueue<WordReferenceRow>();
LinkedBlockingQueue<WordReferenceVars> out = new LinkedBlockingQueue<WordReferenceVars>();
@ -434,5 +437,82 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
} catch (InterruptedException e) {}
}
}
*/
public static BlockingQueue<WordReferenceVars> transform(ReferenceContainer<WordReference> container) {
LinkedBlockingQueue<WordReferenceVars> out = new LinkedBlockingQueue<WordReferenceVars>();
Thread distributor = new TransformDistributor(container, out);
distributor.start();
// return the resulting queue while the processing queues are still working
return out;
}
public static class TransformDistributor extends Thread {
ReferenceContainer<WordReference> container;
LinkedBlockingQueue<WordReferenceVars> out;
public TransformDistributor(ReferenceContainer<WordReference> container, LinkedBlockingQueue<WordReferenceVars> out) {
this.container = container;
this.out = out;
}
@Override
public void run() {
// start the transformation threads
int cores0 = Math.min(cores, container.size() / 100) + 1;
Semaphore termination = new Semaphore(cores0);
TransformWorker[] worker = new TransformWorker[cores0];
for (int i = 0; i < cores0; i++) {
worker[i] = new TransformWorker(out, termination);
worker[i].start();
}
// fill the queue
int p = container.size();
while (p > 0) {
p--;
worker[p % cores0].add(container.get(p, false));
}
// insert poison to stop the queues
for (int i = 0; i < cores0; i++) worker[i].add(WordReferenceRow.poisonRowEntry);
}
}
public static class TransformWorker extends Thread {
BlockingQueue<Row.Entry> in;
BlockingQueue<WordReferenceVars> out;
Semaphore termination;
public TransformWorker(final BlockingQueue<WordReferenceVars> out, Semaphore termination) {
this.in = new LinkedBlockingQueue<Row.Entry>();
this.out = out;
this.termination = termination;
}
public void add(Row.Entry entry) {
try {
in.put(entry);
} catch (InterruptedException e) {
}
}
@Override
public void run() {
Row.Entry entry;
try {
while ((entry = in.take()) != WordReferenceRow.poisonRowEntry) out.put(new WordReferenceVars(new WordReferenceRow(entry)));
} catch (InterruptedException e) {}
// insert poison to signal the termination to next queue
try {
this.termination.acquire();
if (this.termination.availablePermits() == 0) this.out.put(WordReferenceVars.poison);
} catch (InterruptedException e) {}
}
}
}

Loading…
Cancel
Save