enhanced surrogate import process speed (dramatically!)

pull/8/head
Michael Peter Christen 10 years ago
parent 3c4c69adea
commit 593de05922

@ -82,6 +82,7 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
private final PushbackInputStream inputStream; private final PushbackInputStream inputStream;
private final CrawlStacker crawlStacker; private final CrawlStacker crawlStacker;
private final CollectionConfiguration configuration; private final CollectionConfiguration configuration;
private final int concurrency;
private static final ThreadLocal<SAXParser> tlSax = new ThreadLocal<SAXParser>(); private static final ThreadLocal<SAXParser> tlSax = new ThreadLocal<SAXParser>();
private static SAXParser getParser() throws SAXException { private static SAXParser getParser() throws SAXException {
@ -97,13 +98,14 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
return parser; return parser;
} }
public SurrogateReader(final InputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration) throws IOException { public SurrogateReader(final InputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration, int concurrency) throws IOException {
this(new PushbackInputStream(stream, 200), queueSize, crawlStacker, configuration); this(new PushbackInputStream(stream, 200), queueSize, crawlStacker, configuration, concurrency);
} }
public SurrogateReader(final PushbackInputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration) throws IOException { public SurrogateReader(final PushbackInputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration, int concurrency) throws IOException {
this.crawlStacker = crawlStacker; this.crawlStacker = crawlStacker;
this.configuration = configuration; this.configuration = configuration;
this.concurrency = concurrency;
this.buffer = new StringBuilder(300); this.buffer = new StringBuilder(300);
this.parsingValue = false; this.parsingValue = false;
this.dcEntry = null; this.dcEntry = null;
@ -163,11 +165,13 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
} catch (final IOException e) { } catch (final IOException e) {
ConcurrentLog.logException(e); ConcurrentLog.logException(e);
} finally { } finally {
try { for (int i = 0; i < this.concurrency; i++) {
this.surrogates.put(POISON_DOCUMENT); try {
} catch (final InterruptedException e1) { this.surrogates.put(POISON_DOCUMENT);
ConcurrentLog.logException(e1); } catch (final InterruptedException e1) {
} ConcurrentLog.logException(e1);
}
}
try { try {
this.inputStream.close(); this.inputStream.close();
} catch (final IOException e) { } catch (final IOException e) {

@ -1981,16 +1981,37 @@ public final class Switchboard extends serverSwitch {
} }
public void processSurrogate(final InputStream is, final String name) throws IOException { public void processSurrogate(final InputStream is, final String name) throws IOException {
final SurrogateReader reader = new SurrogateReader(is, 100, this.crawlStacker, this.index.fulltext().getDefaultConfiguration()); final int concurrency = Runtime.getRuntime().availableProcessors();
// start reader thread
final SurrogateReader reader = new SurrogateReader(is, 100, this.crawlStacker, this.index.fulltext().getDefaultConfiguration(), concurrency);
final Thread readerThread = new Thread(reader, name); final Thread readerThread = new Thread(reader, name);
readerThread.setPriority(Thread.MAX_PRIORITY); // we must have maximum prio here because this thread feeds the other threads. It must always be ahead of them.
readerThread.start(); readerThread.start();
SolrInputDocument surrogate;
while ((surrogate = reader.take()) != SurrogateReader.POISON_DOCUMENT ) { // start indexer threads
// check if url is in accepted domain assert this.crawlStacker != null;
assert surrogate != null; Thread[] indexer = new Thread[concurrency];
assert this.crawlStacker != null; for (int t = 0; t < concurrency; t++) {
this.index.putDocument(surrogate); indexer[t] = new Thread() {
if (shallTerminate()) break; @Override
public void run() {
SolrInputDocument surrogate;
while ((surrogate = reader.take()) != SurrogateReader.POISON_DOCUMENT ) {
// check if url is in accepted domain
assert surrogate != null;
Switchboard.this.index.putDocument(surrogate);
if (shallTerminate()) break;
}
}
};
indexer[t].setPriority(5);
indexer[t].start();
}
// wait for termination of indexer threads
for (int t = 0; t < concurrency; t++) {
try {indexer[t].join();} catch (InterruptedException e) {}
} }
} }

Loading…
Cancel
Save