From f99f86c5c570e1fe905c70b99a5b519af3e3401a Mon Sep 17 00:00:00 2001 From: orbiter Date: Thu, 17 Sep 2009 22:37:21 +0000 Subject: [PATCH] added concurrency to file indexing class git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6324 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- .../de/anomic/kelondro/table/SplitTable.java | 1 + .../anomic/kelondro/text/DocumentIndex.java | 43 +++++++++++++++---- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/source/de/anomic/kelondro/table/SplitTable.java b/source/de/anomic/kelondro/table/SplitTable.java index c65384ff2..c2abc477c 100644 --- a/source/de/anomic/kelondro/table/SplitTable.java +++ b/source/de/anomic/kelondro/table/SplitTable.java @@ -311,6 +311,7 @@ public class SplitTable implements ObjectIndex { assert row.objectsize() <= this.rowdef.objectsize; ObjectIndex keeper = keeperOf(row.getColBytes(0)); if (keeper != null) {keeper.put(row); return;} + assert this.current == null || this.tables.get(this.current) != null : "this.current = " + this.current; keeper = (this.current == null) ? newTable() : checkTable(this.tables.get(this.current)); keeper.put(row); } diff --git a/source/de/anomic/kelondro/text/DocumentIndex.java b/source/de/anomic/kelondro/text/DocumentIndex.java index e745aba69..c74d3e436 100644 --- a/source/de/anomic/kelondro/text/DocumentIndex.java +++ b/source/de/anomic/kelondro/text/DocumentIndex.java @@ -30,6 +30,8 @@ package de.anomic.kelondro.text; import java.io.File; import java.io.IOException; import java.util.Date; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import de.anomic.document.Condenser; import de.anomic.document.Document; @@ -54,14 +56,38 @@ public class DocumentIndex extends Segment { private RankingProfile textRankingDefault = new RankingProfile(QueryParams.CONTENTDOM_TEXT); //private Bitfield zeroConstraint = new Bitfield(4); + File poison = new File("."); + BlockingQueue queue; + Worker[] worker; + public DocumentIndex(Log log, final File segmentPath) throws IOException { super(log, segmentPath, 100000, targetFileSize * 4 - 1, false, false); + int cores = Runtime.getRuntime().availableProcessors() + 1; + this.queue = new ArrayBlockingQueue(cores * 2); + this.worker = new Worker[cores]; + for (int i = 0; i < cores; i++) { + this.worker[i] = new Worker(); + this.worker[i].start(); + } } public DocumentIndex(final File segmentPath) throws IOException { this(new Log("DocumentIndex"), segmentPath); } + private class Worker extends Thread { + public void run() { + File f; + try { + while ((f = queue.take()) != poison) try { + add(f); + } catch (IOException e) { + if (e.getMessage().indexOf("cannot parse") < 0) e.printStackTrace(); + } + } catch (InterruptedException e) {} + } + } + /** * put a single file into the index * @param file @@ -102,10 +128,8 @@ public class DocumentIndex extends Segment { assert (start.canRead()); if (!start.isDirectory()) { try { - add(start); - } catch (IOException e) { - e.printStackTrace(); - } + this.queue.put(start); + } catch (InterruptedException e) {} return; } String[] s = start.list(); @@ -117,10 +141,8 @@ public class DocumentIndex extends Segment { addAll(w); } else { try { - add(w); - } catch (IOException e) { - if (e.getMessage().indexOf("cannot parse") < 0) e.printStackTrace(); - } + this.queue.put(w); + } catch (InterruptedException e) {} } } } @@ -156,6 +178,11 @@ public class DocumentIndex extends Segment { public void close() { super.close(); + for (int i = 0; i < this.worker.length; i++) { + try { + this.queue.put(poison); + } catch (InterruptedException e) {} + } } public static void main(String[] args) {