From ce7924d712d56456ccdd7b5b947c8d62dffbac22 Mon Sep 17 00:00:00 2001 From: orbiter Date: Thu, 27 Aug 2009 22:06:52 +0000 Subject: [PATCH] better concurrency for rwi entry parsing during search processing git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6273 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- .../anomic/kelondro/blob/MapDataMining.java | 2 +- .../anomic/kelondro/text/ReferenceOrder.java | 143 +++++++----------- .../referencePrototype/WordReferenceRow.java | 12 +- .../referencePrototype/WordReferenceVars.java | 93 +++++++++++- test/de/anomic/document/ParserTest.java | 2 +- 5 files changed, 159 insertions(+), 93 deletions(-) diff --git a/source/de/anomic/kelondro/blob/MapDataMining.java b/source/de/anomic/kelondro/blob/MapDataMining.java index 71b683d70..0b4964ebd 100644 --- a/source/de/anomic/kelondro/blob/MapDataMining.java +++ b/source/de/anomic/kelondro/blob/MapDataMining.java @@ -328,7 +328,7 @@ public class MapDataMining extends MapView { return super.size(); } - public void close() { + public synchronized void close() { // close cluster if (sortClusterMap != null) { for (int i = 0; i < sortfields.length; i++) sortClusterMap.remove(sortfields[i]); diff --git a/source/de/anomic/kelondro/text/ReferenceOrder.java b/source/de/anomic/kelondro/text/ReferenceOrder.java index a814055ff..0c4fde03f 100644 --- a/source/de/anomic/kelondro/text/ReferenceOrder.java +++ b/source/de/anomic/kelondro/text/ReferenceOrder.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.BlockingQueue; import de.anomic.document.Condenser; import de.anomic.kelondro.order.Bitfield; @@ -39,7 +40,6 @@ import de.anomic.kelondro.text.referencePrototype.WordReferenceVars; import de.anomic.kelondro.util.ScoreCluster; import de.anomic.search.RankingProfile; import de.anomic.search.RankingProcess; -import de.anomic.server.serverProcessor; import de.anomic.yacy.yacyURL; public class ReferenceOrder { @@ -61,54 +61,45 @@ public class ReferenceOrder { public ArrayList normalizeWith(final ReferenceContainer container) { // normalize ranking: find minimum and maxiumum of separate ranking criteria assert (container != null); - ArrayList result = null; + BlockingQueue vars = WordReferenceVars.transform(container); - //long s0 = System.currentTimeMillis(); - if ((serverProcessor.useCPU > 1) && (container.size() > 600)) { - // run minmax with two threads - final int middle = container.size() / 2; - final minmaxfinder mmf0 = new minmaxfinder(container, 0, middle); - mmf0.start(); // fork here - final minmaxfinder mmf1 = new minmaxfinder(container, middle, container.size()); - mmf1.run(); // execute other fork in this thread - if (this.min == null) this.min = mmf1.entryMin.clone(); else this.min.min(mmf1.entryMin); - if (this.max == null) this.max = mmf1.entryMax.clone(); else this.max.max(mmf1.entryMax); - Map.Entry entry; - Iterator> di = mmf1.domcount().entrySet().iterator(); - while (di.hasNext()) { - entry = di.next(); - this.doms.addScore(entry.getKey(), (entry.getValue()).intValue()); - } - try {mmf0.join();} catch (final InterruptedException e) {} // wait for fork thread to finish - if (this.min == null) this.min = mmf0.entryMin.clone(); else this.min.min(mmf0.entryMin); - if (this.max == null) this.max = mmf0.entryMax.clone(); else this.max.max(mmf0.entryMax); - di = mmf0.domcount().entrySet().iterator(); - while (di.hasNext()) { - entry = di.next(); - this.doms.addScore(entry.getKey(), (entry.getValue()).intValue()); - } - result = mmf0.decodedContainer(); - result.addAll(mmf1.decodedContainer()); - //long s1= System.currentTimeMillis(), sc = Math.max(1, s1 - s0); - //System.out.println("***DEBUG*** indexRWIEntry.Order (2-THREADED): " + sc + " milliseconds for " + container.size() + " entries, " + (container.size() / sc) + " entries/millisecond"); - } else if (container.size() > 0) { - // run minmax in one thread - final minmaxfinder mmf = new minmaxfinder(container, 0, container.size()); - mmf.run(); // execute without multi-threading - if (this.min == null) this.min = mmf.entryMin.clone(); else this.min.min(mmf.entryMin); - if (this.max == null) this.max = mmf.entryMax.clone(); else this.max.max(mmf.entryMax); - Map.Entry entry; - final Iterator> di = mmf.domcount().entrySet().iterator(); - while (di.hasNext()) { - entry = di.next(); - this.doms.addScore(entry.getKey(), (entry.getValue()).intValue()); - } - result = mmf.decodedContainer(); - //long s1= System.currentTimeMillis(), sc = Math.max(1, s1 - s0); - //System.out.println("***DEBUG*** indexRWIEntry.Order (ONETHREAD): " + sc + " milliseconds for " + container.size() + " entries, " + (container.size() / sc) + " entries/millisecond"); + WordReferenceVars entryMin = null; + WordReferenceVars entryMax = null; + HashMap doms = new HashMap(); + Integer int1 = 1; + ArrayList decodedEntries = new ArrayList(); + + WordReferenceVars iEntry; + String dom; + Integer count; + try { + while ((iEntry = vars.take()) != WordReferenceVars.poison) { + decodedEntries.add(iEntry); + // find min/max + if (entryMin == null) entryMin = iEntry.clone(); else entryMin.min(iEntry); + if (entryMax == null) entryMax = iEntry.clone(); else entryMax.max(iEntry); + // update domcount + dom = iEntry.metadataHash().substring(6); + count = doms.get(dom); + if (count == null) { + doms.put(dom, int1); + } else { + doms.put(dom, Integer.valueOf(count.intValue() + 1)); + } + } + } catch (InterruptedException e) {} + + if (this.min == null) this.min = entryMin.clone(); else this.min.min(entryMin); + if (this.max == null) this.max = entryMax.clone(); else this.max.max(entryMax); + Map.Entry entry; + final Iterator> di = doms.entrySet().iterator(); + while (di.hasNext()) { + entry = di.next(); + this.doms.addScore(entry.getKey(), (entry.getValue()).intValue()); } + if (this.doms.size() > 0) this.maxdomcount = this.doms.getMaxScore(); - return result; + return decodedEntries; } public int authority(final String urlHash) { @@ -164,61 +155,41 @@ public class ReferenceOrder { if (l.equals("uk")) return "en"; else return l; } - public static class minmaxfinder extends Thread { + public static class minmaxfinder { - WordReferenceVars entryMin; - WordReferenceVars entryMax; - private final ReferenceContainer container; - private final int start, end; + private WordReferenceVars entryMin; + private WordReferenceVars entryMax; private final HashMap doms; private final Integer int1; - ArrayList decodedEntries; + private final ArrayList decodedEntries; - public minmaxfinder(final ReferenceContainer container, final int start /*including*/, final int end /*excluding*/) { - this.container = container; - this.start = start; - this.end = end; + public minmaxfinder(final BlockingQueue vars) { this.doms = new HashMap(); this.int1 = 1; this.decodedEntries = new ArrayList(); - } - - public void run() { - // find min/max to obtain limits for normalization this.entryMin = null; this.entryMax = null; WordReferenceVars iEntry; - int p = this.start; String dom; Integer count; try { - while (p < this.end) { - iEntry = new WordReferenceVars(new WordReferenceRow(container.get(p++, false))); - this.decodedEntries.add(iEntry); - // find min/max - if (this.entryMin == null) this.entryMin = iEntry.clone(); else this.entryMin.min(iEntry); - if (this.entryMax == null) this.entryMax = iEntry.clone(); else this.entryMax.max(iEntry); - // update domcount - dom = iEntry.metadataHash().substring(6); - count = doms.get(dom); - if (count == null) { - doms.put(dom, int1); - } else { - doms.put(dom, Integer.valueOf(count.intValue() + 1)); - } - } - } catch (final Exception e) { - e.printStackTrace(); - } + while ((iEntry = vars.take()) != WordReferenceVars.poison) { + this.decodedEntries.add(iEntry); + // find min/max + if (this.entryMin == null) this.entryMin = iEntry.clone(); else this.entryMin.min(iEntry); + if (this.entryMax == null) this.entryMax = iEntry.clone(); else this.entryMax.max(iEntry); + // update domcount + dom = iEntry.metadataHash().substring(6); + count = doms.get(dom); + if (count == null) { + doms.put(dom, int1); + } else { + doms.put(dom, Integer.valueOf(count.intValue() + 1)); + } + } + } catch (InterruptedException e) {} } - public ArrayList decodedContainer() { - return this.decodedEntries; - } - - public HashMap domcount() { - return this.doms; - } } } diff --git a/source/de/anomic/kelondro/text/referencePrototype/WordReferenceRow.java b/source/de/anomic/kelondro/text/referencePrototype/WordReferenceRow.java index ef5afedf2..fe3de17f9 100644 --- a/source/de/anomic/kelondro/text/referencePrototype/WordReferenceRow.java +++ b/source/de/anomic/kelondro/text/referencePrototype/WordReferenceRow.java @@ -37,10 +37,18 @@ import de.anomic.kelondro.text.AbstractReference; import de.anomic.kelondro.text.Reference; import de.anomic.yacy.yacySeedDB; +/** + * this object stores attributes to URL references inside RWI collections + * + */ public final class WordReferenceRow extends AbstractReference implements WordReference, Cloneable { - // this object stores attributes to URL references inside RWI collections - + /** + * object for termination of concurrent blocking queue processing + */ + public static final WordReferenceRow poison = new WordReferenceRow((Row.Entry) null); + + public static final Row urlEntryRow = new Row(new Column[]{ new Column("h", Column.celltype_string, Column.encoder_bytes, yacySeedDB.commonHashLength, "urlhash"), new Column("a", Column.celltype_cardinal, Column.encoder_b256, 2, "lastModified"), diff --git a/source/de/anomic/kelondro/text/referencePrototype/WordReferenceVars.java b/source/de/anomic/kelondro/text/referencePrototype/WordReferenceVars.java index 6db820821..de00027c5 100644 --- a/source/de/anomic/kelondro/text/referencePrototype/WordReferenceVars.java +++ b/source/de/anomic/kelondro/text/referencePrototype/WordReferenceVars.java @@ -27,15 +27,24 @@ package de.anomic.kelondro.text.referencePrototype; import java.util.ArrayList; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import de.anomic.kelondro.index.Row.Entry; import de.anomic.kelondro.order.Bitfield; import de.anomic.kelondro.order.MicroDate; import de.anomic.kelondro.text.AbstractReference; import de.anomic.kelondro.text.Reference; +import de.anomic.kelondro.text.ReferenceContainer; -public class WordReferenceVars extends AbstractReference implements WordReference, Reference, Cloneable { +public class WordReferenceVars extends AbstractReference implements WordReference, Reference, Cloneable { + /** + * object for termination of concurrent blocking queue processing + */ + public static final WordReferenceVars poison = new WordReferenceVars(); + + public Bitfield flags; public long lastModified; public String language, urlHash; @@ -47,7 +56,8 @@ public class WordReferenceVars extends AbstractReference implements WordReferen ArrayList positions; public double termFrequency; - public WordReferenceVars(final String urlHash, + public WordReferenceVars( + final String urlHash, final int urlLength, // byte-length of complete URL final int urlComps, // number of path components final int titleLength, // length of description/length (longer are better?) @@ -114,6 +124,30 @@ public class WordReferenceVars extends AbstractReference implements WordReferen this.termFrequency = e.termFrequency(); } + /** + * initializer for special poison object + */ + public WordReferenceVars() { + this.flags = null; + this.lastModified = 0; + this.language = null; + this.urlHash = null; + this.type = ' '; + this.hitcount = 0; + this.llocal = 0; + this.lother = 0; + this.phrasesintext = 0; + this.positions = null; + this.posinphrase = 0; + this.posofphrase = 0; + this.urlcomps = 0; + this.urllength = 0; + this.virtualAge = 0; + this.wordsintext = 0; + this.wordsintitle = 0; + this.termFrequency = 0.0; + } + public WordReferenceVars clone() { final WordReferenceVars c = new WordReferenceVars( this.urlHash, @@ -265,6 +299,7 @@ public class WordReferenceVars extends AbstractReference implements WordReferen } public final void min(final WordReferenceVars other) { + if (other == null) return; int v; long w; double d; @@ -286,6 +321,7 @@ public class WordReferenceVars extends AbstractReference implements WordReferen } public final void max(final WordReferenceVars other) { + if (other == null) return; int v; long w; double d; @@ -327,6 +363,57 @@ public class WordReferenceVars extends AbstractReference implements WordReferen public void addPosition(int position) { this.positions.add(position); } - + /** + * transform a reference container into a stream of parsed entries + * @param container + * @return a blocking queue filled with WordReferenceVars that is still filled when the object is returned + */ + public static BlockingQueue transform(ReferenceContainer container) { + LinkedBlockingQueue in = new LinkedBlockingQueue(); + LinkedBlockingQueue out = new LinkedBlockingQueue(); + + // start the transformation threads + int cores = Math.min(Runtime.getRuntime().availableProcessors(), container.size() / 200 + 1); + for (int i = 0; i < cores; i++) new Transformer(in, out).start(); + + // fill the queue + int p = 0; + try { + while (p < container.size()) { + in.put(new WordReferenceRow(container.get(p++, false))); + } + } catch (InterruptedException e) {} + + // insert poison to stop the queues + try { + for (int i = 0; i < cores; i++) in.put(WordReferenceRow.poison); + } catch (InterruptedException e) {} + + // return the resulting queue while the processing queues are still working + return out; + } + + public static class Transformer extends Thread { + + BlockingQueue in; + BlockingQueue out; + + public Transformer(BlockingQueue in, BlockingQueue out) { + this.in = in; + this.out = out; + } + + public void run() { + WordReferenceRow row; + try { + while ((row = in.take()) != WordReferenceRow.poison) out.put(new WordReferenceVars(row)); + } catch (InterruptedException e) {} + + // insert poison to signal the termination to next queue + try { + out.put(WordReferenceVars.poison); + } catch (InterruptedException e) {} + } + } } diff --git a/test/de/anomic/document/ParserTest.java b/test/de/anomic/document/ParserTest.java index 541b424ca..a815ce5e8 100644 --- a/test/de/anomic/document/ParserTest.java +++ b/test/de/anomic/document/ParserTest.java @@ -1,6 +1,6 @@ package de.anomic.document; -import static org.junit.Assert.*; +//import static org.junit.Assert.*; import org.junit.Test; import java.io.File;