From 2e3186189b9a21207314838cc3741d9fa730689d Mon Sep 17 00:00:00 2001 From: orbiter Date: Sat, 25 Apr 2009 21:52:21 +0000 Subject: [PATCH] fix for mediawikiIndex surrogate producer + added concurrency git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5880 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/de/anomic/plasma/plasmaParser.java | 2 +- source/de/anomic/tools/mediawikiIndex.java | 48 ++++++++++++++-------- 2 files changed, 33 insertions(+), 17 deletions(-) diff --git a/source/de/anomic/plasma/plasmaParser.java b/source/de/anomic/plasma/plasmaParser.java index 353b70a06..e76b1dced 100644 --- a/source/de/anomic/plasma/plasmaParser.java +++ b/source/de/anomic/plasma/plasmaParser.java @@ -544,7 +544,7 @@ public final class plasmaParser { // testing if the resource is not empty if (sourceArray == null || sourceArray.length == 0) { - final String errorMsg = "No resource content available (1) " + ((sourceArray == null) ? "source == null" : "source.length() == 0"); + final String errorMsg = "No resource content available (1) " + (((sourceArray == null) ? "source == null" : "source.length() == 0") + ", url = " + location.toNormalform(true, false)); theLogger.logInfo("Unable to parse '" + location + "'. " + errorMsg); throw new ParserException(errorMsg,location, errorMsg); } diff --git a/source/de/anomic/tools/mediawikiIndex.java b/source/de/anomic/tools/mediawikiIndex.java index 5e3d9f6bc..282760975 100644 --- a/source/de/anomic/tools/mediawikiIndex.java +++ b/source/de/anomic/tools/mediawikiIndex.java @@ -441,6 +441,7 @@ public class mediawikiIndex { try { record.genHTML(); record.genDocument(); + out.put(record); } catch (RuntimeException e) { e.printStackTrace(); } catch (MalformedURLException e) { @@ -448,11 +449,11 @@ public class mediawikiIndex { } catch (ParserException e) { e.printStackTrace(); } - out.put(record); } } catch (InterruptedException e) { e.printStackTrace(); } + System.out.println("*** convertConsumer has terminated"); return Integer.valueOf(0); } @@ -500,7 +501,7 @@ public class mediawikiIndex { osw.write("\n\n"); } - System.out.println("Title: " + record.title); + System.out.println("[CONSUME] Title: " + record.title); record.document.writeXML(osw, new Date()); rc++; if (rc >= 10000) { @@ -515,10 +516,7 @@ public class mediawikiIndex { osw.write("\n\n"); } - osw.write("\n"); - osw.close(); - String finalfilename = targetstub + "." + fc + ".xml"; - new File(targetdir, outputfilename).renameTo(new File(targetdir, finalfilename)); + } } catch (InterruptedException e) { e.printStackTrace(); @@ -528,8 +526,17 @@ public class mediawikiIndex { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); + } finally { + try { + osw.write("\n"); + osw.close(); + String finalfilename = targetstub + "." + fc + ".xml"; + new File(targetdir, outputfilename).renameTo(new File(targetdir, finalfilename)); + } catch (IOException e) { + e.printStackTrace(); + } } - + System.out.println("*** convertWriter has terminated"); return Integer.valueOf(0); } @@ -546,7 +553,7 @@ public class mediawikiIndex { if (b != 'Z') throw new IOException("Invalid bz2 content."); is = new CBZip2InputStream(is); } - BufferedReader r = new BufferedReader(new java.io.InputStreamReader(is, "UTF-8")); + BufferedReader r = new BufferedReader(new java.io.InputStreamReader(is, "UTF-8"), 10 * 1024 * 1024); String t; StringBuilder sb = new StringBuilder(); boolean page = false, text = false; @@ -554,12 +561,17 @@ public class mediawikiIndex { plasmaParser.initHTMLParsableMimeTypes("text/html"); plasmaParser.initParseableMimeTypes(plasmaParser.PARSER_MODE_CRAWLER, "text/html"); mediawikiIndex mi = new mediawikiIndex(urlStub); - BlockingQueue in = new ArrayBlockingQueue(10); - BlockingQueue out = new ArrayBlockingQueue(10); wikiparserrecord poison = mi.newRecord(); - ExecutorService service = Executors.newFixedThreadPool(2); - convertConsumer consumer = new convertConsumer(in, out, poison); - Future consumerResult = service.submit(consumer); + int threads = Math.max(1, Runtime.getRuntime().availableProcessors() - 1); + BlockingQueue in = new ArrayBlockingQueue(threads * 10); + BlockingQueue out = new ArrayBlockingQueue(threads * 10); + ExecutorService service = Executors.newFixedThreadPool(threads + 1); + convertConsumer[] consumers = new convertConsumer[threads]; + Future[] consumerResults = new Future[threads]; + for (int i = 0; i < threads; i++) { + consumers[i] = new convertConsumer(in, out, poison); + consumerResults[i] = service.submit(consumers[i]); + } convertWriter writer = new convertWriter(out, poison, targetdir, targetstub); Future writerResult = service.submit(writer); @@ -575,7 +587,7 @@ public class mediawikiIndex { } if (t.indexOf(textend) >= 0) { text = false; - System.out.println("Title: " + title); + System.out.println("[INJECT] Title: " + title); record = mi.newRecord(title, sb); try { in.put(record); @@ -603,8 +615,12 @@ public class mediawikiIndex { r.close(); try { - in.put(poison); - consumerResult.get(10000, TimeUnit.MILLISECONDS); + for (int i = 0; i < threads; i++) { + in.put(poison); + } + for (int i = 0; i < threads; i++) { + consumerResults[i].get(10000, TimeUnit.MILLISECONDS); + } out.put(poison); writerResult.get(10000, TimeUnit.MILLISECONDS); } catch (InterruptedException e1) {