more concurrency when normalizing RWI entries + cleanup

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6805 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 15 years ago
parent 555b333041
commit 2e26744f4e

@ -31,6 +31,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import net.yacy.document.Condenser;
import net.yacy.kelondro.data.meta.DigestURI;
@ -44,6 +45,8 @@ import net.yacy.kelondro.util.ScoreCluster;
public class ReferenceOrder {
private static int cores = Runtime.getRuntime().availableProcessors();
protected int maxdomcount;
protected WordReferenceVars min, max;
@ -60,20 +63,74 @@ public class ReferenceOrder {
this.language = language;
}
public class Normalizer extends Thread {
public BlockingQueue<WordReferenceVars> normalizeWith(final ReferenceContainer<WordReference> container) {
BlockingQueue<WordReferenceVars> vars = WordReferenceVars.transform(container);
LinkedBlockingQueue<WordReferenceVars> out = new LinkedBlockingQueue<WordReferenceVars>();
Thread distributor = new NormalizeDistributor(vars, out);
distributor.start();
private final ReferenceContainer<WordReference> container;
// return the resulting queue while the processing queues are still working
return out;
}
public class NormalizeDistributor extends Thread {
BlockingQueue<WordReferenceVars> vars;
LinkedBlockingQueue<WordReferenceVars> out;
public NormalizeDistributor(BlockingQueue<WordReferenceVars> vars, LinkedBlockingQueue<WordReferenceVars> out) {
this.vars = vars;
this.out = out;
}
@Override
public void run() {
// start the transformation threads
int cores0 = cores + 1;
Semaphore termination = new Semaphore(cores0);
NormalizeWorker[] worker = new NormalizeWorker[cores0];
for (int i = 0; i < cores0; i++) {
worker[i] = new NormalizeWorker(out, termination);
worker[i].start();
}
// fill the queue
WordReferenceVars iEntry;
int p = 0;
try {
while ((iEntry = vars.take()) != WordReferenceVars.poison) {
worker[p % cores0].add(iEntry);
p++;
}
} catch (InterruptedException e) {
}
// insert poison to stop the queues
for (int i = 0; i < cores0; i++) worker[i].add(WordReferenceVars.poison);
}
}
public class NormalizeWorker extends Thread {
private final BlockingQueue<WordReferenceVars> out;
private final Semaphore termination;
private final BlockingQueue<WordReferenceVars> decodedEntries;
public Normalizer(final ReferenceContainer<WordReference> container) {
public NormalizeWorker(final BlockingQueue<WordReferenceVars> out, Semaphore termination) {
// normalize ranking: find minimum and maximum of separate ranking criteria
assert (container != null);
this.container = container;
this.out = out;
this.termination = termination;
this.decodedEntries = new LinkedBlockingQueue<WordReferenceVars>();
}
public void add(WordReferenceVars entry) {
try {
decodedEntries.put(entry);
} catch (InterruptedException e) {
}
}
public void run() {
BlockingQueue<WordReferenceVars> vars = WordReferenceVars.transform(container);
HashMap<String, Integer> doms0 = new HashMap<String, Integer>();
Integer int1 = 1;
@ -83,8 +140,8 @@ public class ReferenceOrder {
Integer count;
try {
// calculate min and max for normalization
while ((iEntry = vars.take()) != WordReferenceVars.poison) {
decodedEntries.put(iEntry);
while ((iEntry = decodedEntries.take()) != WordReferenceVars.poison) {
out.put(iEntry);
// find min/max
if (min == null) min = iEntry.clone(); else min.min(iEntry);
if (max == null) max = iEntry.clone(); else max.max(iEntry);
@ -111,21 +168,13 @@ public class ReferenceOrder {
} catch (Exception e) {
Log.logException(e);
} finally {
// insert poison to signal the termination to next queue
try {
decodedEntries.put(WordReferenceVars.poison);
this.termination.acquire();
if (this.termination.availablePermits() == 0) this.out.put(WordReferenceVars.poison);
} catch (InterruptedException e) {}
}
}
public BlockingQueue<WordReferenceVars> decoded() {
return this.decodedEntries;
}
}
public BlockingQueue<WordReferenceVars> normalizeWith(final ReferenceContainer<WordReference> container) {
Normalizer n = new Normalizer(container);
n.start();
return n.decoded();
}
public int authority(final String urlHash) {

@ -938,6 +938,7 @@ public class DigestURI implements Serializable {
private static String[] testTLDs = new String[] { "com", "net", "org", "uk", "fr", "de", "es", "it" };
public static final DigestURI probablyWordURL(final byte[] urlHash, final TreeSet<String> words) {
assert urlHash != null;
final Iterator<String> wi = words.iterator();
String word;
while (wi.hasNext()) {

@ -47,7 +47,7 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
* object for termination of concurrent blocking queue processing
*/
public static final WordReferenceVars poison = new WordReferenceVars();
public static int cores = Runtime.getRuntime().availableProcessors();
private static int cores = Runtime.getRuntime().availableProcessors();
public Bitfield flags;
public long lastModified;
@ -388,56 +388,6 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
* @param container
* @return a blocking queue filled with WordReferenceVars that is still filled when the object is returned
*/
/*
public static BlockingQueue<WordReferenceVars> transform(ReferenceContainer<WordReference> container) {
LinkedBlockingQueue<WordReferenceRow> in = new LinkedBlockingQueue<WordReferenceRow>();
LinkedBlockingQueue<WordReferenceVars> out = new LinkedBlockingQueue<WordReferenceVars>();
// start the transformation threads
int cores = Math.min(Runtime.getRuntime().availableProcessors(), container.size() / 200 + 1);
for (int i = 0; i < cores; i++) new Transformer(in, out).start();
// fill the queue
int p = 0;
try {
while (p < container.size()) {
in.put(new WordReferenceRow(container.get(p++, false)));
}
} catch (InterruptedException e) {}
// insert poison to stop the queues
try {
for (int i = 0; i < cores; i++) in.put(WordReferenceRow.poison);
} catch (InterruptedException e) {}
// return the resulting queue while the processing queues are still working
return out;
}
public static class Transformer extends Thread {
BlockingQueue<WordReferenceRow> in;
BlockingQueue<WordReferenceVars> out;
public Transformer(final BlockingQueue<WordReferenceRow> in, final BlockingQueue<WordReferenceVars> out) {
this.in = in;
this.out = out;
}
@Override
public void run() {
WordReferenceRow row;
try {
while ((row = in.take()) != WordReferenceRow.poison) out.put(new WordReferenceVars(row));
} catch (InterruptedException e) {}
// insert poison to signal the termination to next queue
try {
out.put(WordReferenceVars.poison);
} catch (InterruptedException e) {}
}
}
*/
public static BlockingQueue<WordReferenceVars> transform(ReferenceContainer<WordReference> container) {
LinkedBlockingQueue<WordReferenceVars> out = new LinkedBlockingQueue<WordReferenceVars>();

Loading…
Cancel
Save