diff --git a/source/net/yacy/document/content/SurrogateReader.java b/source/net/yacy/document/content/SurrogateReader.java index a24f77678..983fe8e72 100644 --- a/source/net/yacy/document/content/SurrogateReader.java +++ b/source/net/yacy/document/content/SurrogateReader.java @@ -82,6 +82,7 @@ public class SurrogateReader extends DefaultHandler implements Runnable { private final PushbackInputStream inputStream; private final CrawlStacker crawlStacker; private final CollectionConfiguration configuration; + private final int concurrency; private static final ThreadLocal tlSax = new ThreadLocal(); private static SAXParser getParser() throws SAXException { @@ -97,13 +98,14 @@ public class SurrogateReader extends DefaultHandler implements Runnable { return parser; } - public SurrogateReader(final InputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration) throws IOException { - this(new PushbackInputStream(stream, 200), queueSize, crawlStacker, configuration); + public SurrogateReader(final InputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration, int concurrency) throws IOException { + this(new PushbackInputStream(stream, 200), queueSize, crawlStacker, configuration, concurrency); } - public SurrogateReader(final PushbackInputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration) throws IOException { + public SurrogateReader(final PushbackInputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration, int concurrency) throws IOException { this.crawlStacker = crawlStacker; this.configuration = configuration; + this.concurrency = concurrency; this.buffer = new StringBuilder(300); this.parsingValue = false; this.dcEntry = null; @@ -163,11 +165,13 @@ public class SurrogateReader extends DefaultHandler implements Runnable { } catch (final IOException e) { ConcurrentLog.logException(e); } finally { - try { - this.surrogates.put(POISON_DOCUMENT); - } catch (final InterruptedException e1) { - ConcurrentLog.logException(e1); - } + for (int i = 0; i < this.concurrency; i++) { + try { + this.surrogates.put(POISON_DOCUMENT); + } catch (final InterruptedException e1) { + ConcurrentLog.logException(e1); + } + } try { this.inputStream.close(); } catch (final IOException e) { diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index c0bbed0cd..70a067a81 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -1981,16 +1981,37 @@ public final class Switchboard extends serverSwitch { } public void processSurrogate(final InputStream is, final String name) throws IOException { - final SurrogateReader reader = new SurrogateReader(is, 100, this.crawlStacker, this.index.fulltext().getDefaultConfiguration()); + final int concurrency = Runtime.getRuntime().availableProcessors(); + + // start reader thread + final SurrogateReader reader = new SurrogateReader(is, 100, this.crawlStacker, this.index.fulltext().getDefaultConfiguration(), concurrency); final Thread readerThread = new Thread(reader, name); + readerThread.setPriority(Thread.MAX_PRIORITY); // we must have maximum prio here because this thread feeds the other threads. It must always be ahead of them. readerThread.start(); - SolrInputDocument surrogate; - while ((surrogate = reader.take()) != SurrogateReader.POISON_DOCUMENT ) { - // check if url is in accepted domain - assert surrogate != null; - assert this.crawlStacker != null; - this.index.putDocument(surrogate); - if (shallTerminate()) break; + + // start indexer threads + assert this.crawlStacker != null; + Thread[] indexer = new Thread[concurrency]; + for (int t = 0; t < concurrency; t++) { + indexer[t] = new Thread() { + @Override + public void run() { + SolrInputDocument surrogate; + while ((surrogate = reader.take()) != SurrogateReader.POISON_DOCUMENT ) { + // check if url is in accepted domain + assert surrogate != null; + Switchboard.this.index.putDocument(surrogate); + if (shallTerminate()) break; + } + } + }; + indexer[t].setPriority(5); + indexer[t].start(); + } + + // wait for termination of indexer threads + for (int t = 0; t < concurrency; t++) { + try {indexer[t].join();} catch (InterruptedException e) {} } }