fix for mediawikiIndex surrogate producer + added concurrency

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5880 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent 6f5ea7b1a8
commit 2e3186189b

@ -544,7 +544,7 @@ public final class plasmaParser {
// testing if the resource is not empty
if (sourceArray == null || sourceArray.length == 0) {
final String errorMsg = "No resource content available (1) " + ((sourceArray == null) ? "source == null" : "source.length() == 0");
final String errorMsg = "No resource content available (1) " + (((sourceArray == null) ? "source == null" : "source.length() == 0") + ", url = " + location.toNormalform(true, false));
theLogger.logInfo("Unable to parse '" + location + "'. " + errorMsg);
throw new ParserException(errorMsg,location, errorMsg);
}

@ -441,6 +441,7 @@ public class mediawikiIndex {
try {
record.genHTML();
record.genDocument();
out.put(record);
} catch (RuntimeException e) {
e.printStackTrace();
} catch (MalformedURLException e) {
@ -448,11 +449,11 @@ public class mediawikiIndex {
} catch (ParserException e) {
e.printStackTrace();
}
out.put(record);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("*** convertConsumer has terminated");
return Integer.valueOf(0);
}
@ -500,7 +501,7 @@ public class mediawikiIndex {
osw.write("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<surrogates xmlns:dc=\"http://purl.org/dc/elements/1.1/\">\n");
}
System.out.println("Title: " + record.title);
System.out.println("[CONSUME] Title: " + record.title);
record.document.writeXML(osw, new Date());
rc++;
if (rc >= 10000) {
@ -515,10 +516,7 @@ public class mediawikiIndex {
osw.write("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<surrogates xmlns:dc=\"http://purl.org/dc/elements/1.1/\">\n");
}
osw.write("</surrogates>\n");
osw.close();
String finalfilename = targetstub + "." + fc + ".xml";
new File(targetdir, outputfilename).renameTo(new File(targetdir, finalfilename));
}
} catch (InterruptedException e) {
e.printStackTrace();
@ -528,8 +526,17 @@ public class mediawikiIndex {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
osw.write("</surrogates>\n");
osw.close();
String finalfilename = targetstub + "." + fc + ".xml";
new File(targetdir, outputfilename).renameTo(new File(targetdir, finalfilename));
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("*** convertWriter has terminated");
return Integer.valueOf(0);
}
@ -546,7 +553,7 @@ public class mediawikiIndex {
if (b != 'Z') throw new IOException("Invalid bz2 content.");
is = new CBZip2InputStream(is);
}
BufferedReader r = new BufferedReader(new java.io.InputStreamReader(is, "UTF-8"));
BufferedReader r = new BufferedReader(new java.io.InputStreamReader(is, "UTF-8"), 10 * 1024 * 1024);
String t;
StringBuilder sb = new StringBuilder();
boolean page = false, text = false;
@ -554,12 +561,17 @@ public class mediawikiIndex {
plasmaParser.initHTMLParsableMimeTypes("text/html");
plasmaParser.initParseableMimeTypes(plasmaParser.PARSER_MODE_CRAWLER, "text/html");
mediawikiIndex mi = new mediawikiIndex(urlStub);
BlockingQueue<wikiparserrecord> in = new ArrayBlockingQueue<wikiparserrecord>(10);
BlockingQueue<wikiparserrecord> out = new ArrayBlockingQueue<wikiparserrecord>(10);
wikiparserrecord poison = mi.newRecord();
ExecutorService service = Executors.newFixedThreadPool(2);
convertConsumer consumer = new convertConsumer(in, out, poison);
Future<Integer> consumerResult = service.submit(consumer);
int threads = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
BlockingQueue<wikiparserrecord> in = new ArrayBlockingQueue<wikiparserrecord>(threads * 10);
BlockingQueue<wikiparserrecord> out = new ArrayBlockingQueue<wikiparserrecord>(threads * 10);
ExecutorService service = Executors.newFixedThreadPool(threads + 1);
convertConsumer[] consumers = new convertConsumer[threads];
Future<Integer>[] consumerResults = new Future[threads];
for (int i = 0; i < threads; i++) {
consumers[i] = new convertConsumer(in, out, poison);
consumerResults[i] = service.submit(consumers[i]);
}
convertWriter writer = new convertWriter(out, poison, targetdir, targetstub);
Future<Integer> writerResult = service.submit(writer);
@ -575,7 +587,7 @@ public class mediawikiIndex {
}
if (t.indexOf(textend) >= 0) {
text = false;
System.out.println("Title: " + title);
System.out.println("[INJECT] Title: " + title);
record = mi.newRecord(title, sb);
try {
in.put(record);
@ -603,8 +615,12 @@ public class mediawikiIndex {
r.close();
try {
in.put(poison);
consumerResult.get(10000, TimeUnit.MILLISECONDS);
for (int i = 0; i < threads; i++) {
in.put(poison);
}
for (int i = 0; i < threads; i++) {
consumerResults[i].get(10000, TimeUnit.MILLISECONDS);
}
out.put(poison);
writerResult.get(10000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {

Loading…
Cancel
Save