added concurrency to file indexing class

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6324 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent 902d16cf6c
commit f99f86c5c5

@ -311,6 +311,7 @@ public class SplitTable implements ObjectIndex {
assert row.objectsize() <= this.rowdef.objectsize; assert row.objectsize() <= this.rowdef.objectsize;
ObjectIndex keeper = keeperOf(row.getColBytes(0)); ObjectIndex keeper = keeperOf(row.getColBytes(0));
if (keeper != null) {keeper.put(row); return;} if (keeper != null) {keeper.put(row); return;}
assert this.current == null || this.tables.get(this.current) != null : "this.current = " + this.current;
keeper = (this.current == null) ? newTable() : checkTable(this.tables.get(this.current)); keeper = (this.current == null) ? newTable() : checkTable(this.tables.get(this.current));
keeper.put(row); keeper.put(row);
} }

@ -30,6 +30,8 @@ package de.anomic.kelondro.text;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Date; import java.util.Date;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import de.anomic.document.Condenser; import de.anomic.document.Condenser;
import de.anomic.document.Document; import de.anomic.document.Document;
@ -54,14 +56,38 @@ public class DocumentIndex extends Segment {
private RankingProfile textRankingDefault = new RankingProfile(QueryParams.CONTENTDOM_TEXT); private RankingProfile textRankingDefault = new RankingProfile(QueryParams.CONTENTDOM_TEXT);
//private Bitfield zeroConstraint = new Bitfield(4); //private Bitfield zeroConstraint = new Bitfield(4);
File poison = new File(".");
BlockingQueue<File> queue;
Worker[] worker;
public DocumentIndex(Log log, final File segmentPath) throws IOException { public DocumentIndex(Log log, final File segmentPath) throws IOException {
super(log, segmentPath, 100000, targetFileSize * 4 - 1, false, false); super(log, segmentPath, 100000, targetFileSize * 4 - 1, false, false);
int cores = Runtime.getRuntime().availableProcessors() + 1;
this.queue = new ArrayBlockingQueue<File>(cores * 2);
this.worker = new Worker[cores];
for (int i = 0; i < cores; i++) {
this.worker[i] = new Worker();
this.worker[i].start();
}
} }
public DocumentIndex(final File segmentPath) throws IOException { public DocumentIndex(final File segmentPath) throws IOException {
this(new Log("DocumentIndex"), segmentPath); this(new Log("DocumentIndex"), segmentPath);
} }
private class Worker extends Thread {
public void run() {
File f;
try {
while ((f = queue.take()) != poison) try {
add(f);
} catch (IOException e) {
if (e.getMessage().indexOf("cannot parse") < 0) e.printStackTrace();
}
} catch (InterruptedException e) {}
}
}
/** /**
* put a single file into the index * put a single file into the index
* @param file * @param file
@ -102,10 +128,8 @@ public class DocumentIndex extends Segment {
assert (start.canRead()); assert (start.canRead());
if (!start.isDirectory()) { if (!start.isDirectory()) {
try { try {
add(start); this.queue.put(start);
} catch (IOException e) { } catch (InterruptedException e) {}
e.printStackTrace();
}
return; return;
} }
String[] s = start.list(); String[] s = start.list();
@ -117,10 +141,8 @@ public class DocumentIndex extends Segment {
addAll(w); addAll(w);
} else { } else {
try { try {
add(w); this.queue.put(w);
} catch (IOException e) { } catch (InterruptedException e) {}
if (e.getMessage().indexOf("cannot parse") < 0) e.printStackTrace();
}
} }
} }
} }
@ -156,6 +178,11 @@ public class DocumentIndex extends Segment {
public void close() { public void close() {
super.close(); super.close();
for (int i = 0; i < this.worker.length; i++) {
try {
this.queue.put(poison);
} catch (InterruptedException e) {}
}
} }
public static void main(String[] args) { public static void main(String[] args) {

Loading…
Cancel
Save