some concurrency for wikipedia dump reader

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5855 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent dec495ac78
commit 1b9e532c87

@ -292,6 +292,9 @@ public class mediawikiIndex {
this.end = end;
}
}
public wikiparserrecord newRecord() {
return new wikiparserrecord(null, null);
}
public wikiparserrecord newRecord(String title, StringBuilder sb) {
return new wikiparserrecord(title, sb);
}
@ -415,6 +418,205 @@ public class mediawikiIndex {
return null;
}
private static class convertConsumer implements Callable<Integer> {
private BlockingQueue<wikiparserrecord> in, out;
private wikiparserrecord poison;
public convertConsumer(BlockingQueue<wikiparserrecord> in, BlockingQueue<wikiparserrecord> out, wikiparserrecord poison) {
this.poison = poison;
this.in = in;
this.out = out;
}
public Integer call() {
wikiparserrecord record;
try {
while(true) {
record = in.take();
if (record == poison) {
System.out.println("convertConsumer / got poison");
break;
}
try {
record.genHTML();
record.genDocument();
} catch (RuntimeException e) {
e.printStackTrace();
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (ParserException e) {
e.printStackTrace();
}
out.put(record);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return Integer.valueOf(0);
}
}
private static class convertWriter implements Callable<Integer> {
private BlockingQueue<wikiparserrecord> in;
private wikiparserrecord poison;
private OutputStreamWriter osw;
private String targetstub;
private File targetdir;
private int fc, rc;
private String outputfilename;
public convertWriter(
BlockingQueue<wikiparserrecord> in,
wikiparserrecord poison,
File targetdir,
String targetstub) {
this.poison = poison;
this.in = in;
this.osw = null;
this.targetdir = targetdir;
this.targetstub = targetstub;
this.fc = 0;
this.rc = 0;
this.outputfilename = null;
}
public Integer call() {
wikiparserrecord record;
try {
while(true) {
record = in.take();
if (record == poison) {
System.out.println("convertConsumer / got poison");
break;
}
if (osw == null) {
// start writing a new file
this.outputfilename = targetstub + "." + fc + ".xml.tmp";
this.osw = new OutputStreamWriter(new BufferedOutputStream(new FileOutputStream(new File(targetdir, outputfilename))), "UTF-8");
osw.write("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<surrogates xmlns:dc=\"http://purl.org/dc/elements/1.1/\">\n");
}
System.out.println("Title: " + record.title);
record.document.writeXML(osw, new Date());
rc++;
if (rc >= 10000) {
osw.write("</surrogates>\n");
osw.close();
String finalfilename = targetstub + "." + fc + ".xml";
new File(targetdir, outputfilename).renameTo(new File(targetdir, finalfilename));
rc = 0;
fc++;
outputfilename = targetstub + "." + fc + ".xml.tmp";
osw = new OutputStreamWriter(new BufferedOutputStream(new FileOutputStream(new File(targetdir, outputfilename))), "UTF-8");
osw.write("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<surrogates xmlns:dc=\"http://purl.org/dc/elements/1.1/\">\n");
}
osw.write("</surrogates>\n");
osw.close();
String finalfilename = targetstub + "." + fc + ".xml";
new File(targetdir, outputfilename).renameTo(new File(targetdir, finalfilename));
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return Integer.valueOf(0);
}
}
public static void convert(File sourcefile, File targetdir, String urlStub) throws IOException {
String targetstub = sourcefile.getName();
targetstub = targetstub.substring(0, targetstub.length() - 8);
InputStream is = new FileInputStream(sourcefile);
if (sourcefile.getName().endsWith(".bz2")) {
int b = is.read();
if (b != 'B') throw new IOException("Invalid bz2 content.");
b = is.read();
if (b != 'Z') throw new IOException("Invalid bz2 content.");
is = new CBZip2InputStream(is);
}
BufferedReader r = new BufferedReader(new java.io.InputStreamReader(is, "UTF-8"));
String t;
StringBuilder sb = new StringBuilder();
boolean page = false, text = false;
String title = null;
plasmaParser.initHTMLParsableMimeTypes("text/html");
plasmaParser.initParseableMimeTypes(plasmaParser.PARSER_MODE_CRAWLER, "text/html");
mediawikiIndex mi = new mediawikiIndex(urlStub);
BlockingQueue<wikiparserrecord> in = new ArrayBlockingQueue<wikiparserrecord>(10);
BlockingQueue<wikiparserrecord> out = new ArrayBlockingQueue<wikiparserrecord>(10);
wikiparserrecord poison = mi.newRecord();
ExecutorService service = Executors.newFixedThreadPool(2);
convertConsumer consumer = new convertConsumer(in, out, poison);
Future<Integer> consumerResult = service.submit(consumer);
convertWriter writer = new convertWriter(out, poison, targetdir, targetstub);
Future<Integer> writerResult = service.submit(writer);
wikiparserrecord record;
while ((t = r.readLine()) != null) {
if (t.indexOf(pagestart) >= 0) {
page = true;
continue;
}
if (t.indexOf(textstart) >= 0) {
text = page;
continue;
}
if (t.indexOf(textend) >= 0) {
text = false;
System.out.println("Title: " + title);
record = mi.newRecord(title, sb);
try {
in.put(record);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
sb.setLength(0);
continue;
}
if (t.indexOf(pageend) >= 0) {
page = false;
continue;
}
if (t.indexOf("<title>") >= 0) {
title = t.substring(t.indexOf("<title>") + 7);
int p = title.indexOf("</title>");
if (p >= 0) title = title.substring(0, p);
continue;
}
if (text) {
sb.append(t);
sb.append('\n');
}
}
r.close();
try {
in.put(poison);
consumerResult.get(10000, TimeUnit.MILLISECONDS);
out.put(poison);
writerResult.get(10000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
public static void main(String[] s) {
if (s.length == 0) {
System.out.println("usage:");
@ -427,92 +629,14 @@ public class mediawikiIndex {
// example:
// java -Xmx2000m -cp classes:lib/bzip2.jar de.anomic.tools.mediawikiIndex -convert DATA/HTCACHE/dewiki-20090311-pages-articles.xml.bz2 DATA/SURROGATES/in/ http://de.wikipedia.org/wiki/
if (s[0].equals("-convert") && s.length > 2 && s[1].endsWith(".xml.bz2") && s[3].startsWith("http://")) {
File sourcefile = new File(s[1]);
File targetdir = new File(s[2]);
String targetstub = sourcefile.getName();
targetstub = targetstub.substring(0, targetstub.length() - 8);
String urlStub = s[3]; // i.e. http://de.wikipedia.org/wiki/
//String language = urlStub.substring(7,9);
try {
InputStream is = new FileInputStream(sourcefile);
if (s[1].endsWith(".bz2")) {
int b = is.read();
if (b != 'B') throw new IOException("Invalid bz2 content.");
b = is.read();
if (b != 'Z') throw new IOException("Invalid bz2 content.");
is = new CBZip2InputStream(is);
}
BufferedReader r = new BufferedReader(new java.io.InputStreamReader(is, "UTF-8"));
String t;
StringBuilder sb = new StringBuilder();
boolean page = false, text = false;
String title = null;
plasmaParser.initHTMLParsableMimeTypes("text/html");
plasmaParser.initParseableMimeTypes(plasmaParser.PARSER_MODE_CRAWLER, "text/html");
mediawikiIndex mi = new mediawikiIndex(urlStub);
wikiparserrecord record;
int fc = 0;
int rc = 0;
String outputfilename = targetstub + "." + fc + ".xml.tmp";
OutputStreamWriter osw = new OutputStreamWriter(new BufferedOutputStream(new FileOutputStream(new File(targetdir, outputfilename))), "UTF-8");
osw.write("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<surrogates xmlns:dc=\"http://purl.org/dc/elements/1.1/\">\n");
while ((t = r.readLine()) != null) {
if (t.indexOf(pagestart) >= 0) {
page = true;
continue;
}
if (t.indexOf(textstart) >= 0) {
text = page;
continue;
}
if (t.indexOf(textend) >= 0) {
text = false;
System.out.println("Title: " + title);
record = mi.newRecord(title, sb);
record.genHTML();
try {
record.genDocument();
record.document.writeXML(osw, new Date());
rc++;
if (rc >= 10000) {
osw.write("</surrogates>\n");
osw.close();
String finalfilename = targetstub + "." + fc + ".xml";
new File(targetdir, outputfilename).renameTo(new File(targetdir, finalfilename));
rc = 0;
fc++;
outputfilename = targetstub + "." + fc + ".xml.tmp";
osw = new OutputStreamWriter(new BufferedOutputStream(new FileOutputStream(new File(targetdir, outputfilename))), "UTF-8");
osw.write("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<surrogates xmlns:dc=\"http://purl.org/dc/elements/1.1/\">\n");
}
} catch (InterruptedException e) {
} catch (ParserException e) {
}
sb.setLength(0);
continue;
}
if (t.indexOf(pageend) >= 0) {
page = false;
continue;
}
if (t.indexOf("<title>") >= 0) {
title = t.substring(t.indexOf("<title>") + 7);
int p = title.indexOf("</title>");
if (p >= 0) title = title.substring(0, p);
continue;
}
if (text) {
sb.append(t);
sb.append('\n');
}
}
osw.write("</surrogates>\n");
osw.close();
String finalfilename = targetstub + "." + fc + ".xml";
new File(targetdir, outputfilename).renameTo(new File(targetdir, finalfilename));
r.close();
convert(sourcefile, targetdir, urlStub);
} catch (IOException e) {
e.printStackTrace();
}

Loading…
Cancel
Save