Extended Mediawiki dump import to remote URLs.

When using a public HTTP URL in /IndexImportMediawiki_p.html, the remote
file now is directly streamed and processed, allowing import of several
GB dumps even with a low memory remote peer, and without need to
manually download the dump file first.
pull/122/head
luccioman 8 years ago
parent e5c3b16748
commit f66438442e

@ -1,3 +1,3 @@
#!/usr/bin/env sh
cd "`dirname $0`"
./apicall.sh /IndexImportMediawiki_p.html?file=$1 > /dev/null
./protectedPostApiCall.sh "IndexImportMediawiki_p.html" "file=$1"

@ -13,29 +13,33 @@
<h2>MediaWiki Dump Import</h2>
#(import)#
<p>#(prevStatus)#
::<div class="alert alert-danger" role="alert">Error on last import : #[message]#</div>
#(/prevStatus)#</p>
<p>#(status)#<div class="alert alert-info" role="alert">No import thread is running, you can start a new thread here</div>
::<div class="alert alert-danger" role="alert">Error : file argument must be a path to a document in the local file system</div>
::<div class="alert alert-danger" role="alert">Error : dump <abbr title="Uniform Resource Locator">URL</abbr> is malformed.</div>
::<div class="alert alert-danger" role="alert">Error : file not found "#[sourceFile]#"</div>
::<div class="alert alert-danger" role="alert">Error : can not read file "#[sourceFile]#"</div>
::<div class="alert alert-danger" role="alert">Error : you selected a directory ("#[sourceFile]#")</div>
#(/status)#</p>
<form action="IndexImportMediawiki_p.html" method="post" accept-charset="UTF-8" class="form-horizontal">
<input type="hidden" name="transactionToken" value="#[transactionToken]#"/>
<fieldset>
<legend>MediaWiki Dump File Selection: select an XML file (which may be bz2- or gz-encoded)</legend>
<legend>MediaWiki Dump File Selection</legend>
<p>
You can import MediaWiki dumps here. An example is the file
<a href="http://dumps.wikimedia.org/dewiki/latest/dewiki-latest-pages-articles.xml.bz2">
http://dumps.wikimedia.org/dewiki/latest/dewiki-latest-pages-articles.xml.bz2</a>.
<a href="https://dumps.wikimedia.org/dewiki/latest/dewiki-latest-pages-articles.xml.bz2">
https://dumps.wikimedia.org/dewiki/latest/dewiki-latest-pages-articles.xml.bz2</a>.
</p>
<p>
Dumps must be stored in the local file system in XML format and may be compressed in gz or bz2.
Dumps can be stored in the local file system or on a remote server in XML format and may be compressed in gz or bz2.
</p>
<div class="form-group">
<div class="col-sm-3 col-md-2 col-lg-2">
<label for="file" class="control-label" >Dump file path</label>
<label for="file" class="control-label" >Dump file path or <abbr title="Uniform Resource Locator">URL</abbr></label>
</div>
<div class="col-sm-9 col-md-8 col-lg-8">
<input id="file" class="form-control" name="file" type="text" title="Dump file path on this YaCy server file system" required="required"/>
<input id="file" class="form-control" name="file" type="text" title="Dump file path on this YaCy server file system, or any remote URL" required="required"/>
</div>
</div>
<input name="submit" class="btn btn-primary" type="submit" value="Import MediaWiki Dump" />
@ -69,6 +73,8 @@
</ul>
<br />
::
<p>#(status)#::<div class="alert alert-danger" role="alert">Error encountered : #[message]#</div>
#(/status)#</p>
<form><fieldset><legend>Import Process</legend>
<dl>
<dt>Thread:</dt><dd>#[thread]#</dd>

@ -23,8 +23,11 @@
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import java.io.File;
import java.net.MalformedURLException;
import net.yacy.cora.document.id.MultiProtocolURL;
import net.yacy.cora.protocol.RequestHeader;
import net.yacy.data.TransactionManager;
import net.yacy.document.importer.MediawikiImporter;
import net.yacy.search.Switchboard;
import net.yacy.server.serverObjects;
@ -54,6 +57,11 @@ public class IndexImportMediawiki_p {
if (MediawikiImporter.job != null && MediawikiImporter.job.isAlive()) {
// one import is running, no option to insert anything
prop.put("import", 1);
final String jobErrorMessage = MediawikiImporter.job.status();
if( jobErrorMessage != null && !jobErrorMessage.isEmpty()) {
prop.put("import_status", 1);
prop.put("import_status_message", jobErrorMessage);
}
prop.put("import_thread", "running");
prop.put("import_dump", MediawikiImporter.job.source());
prop.put("import_count", MediawikiImporter.job.count());
@ -64,33 +72,63 @@ public class IndexImportMediawiki_p {
prop.put("import_remainingMinutes", (MediawikiImporter.job.remainingTime() / 60) % 60);
} else {
prop.put("import", 0);
if(MediawikiImporter.job != null) {
/* Report eventual fail report from the last terminated import (for example an HTTP 404 status)
* that else could be missed by the user because of page refresh */
final String jobErrorMessage = MediawikiImporter.job.status();
if( jobErrorMessage != null && !jobErrorMessage.isEmpty()) {
prop.put("import_prevStatus", 1);
prop.put("import_prevStatus_message", jobErrorMessage);
}
}
if (post == null) {
prop.put("import_status", 0);
/* Acquire a transaction token for the next POST form submission */
final String token = TransactionManager.getTransactionToken(header);
prop.put(TransactionManager.TRANSACTION_TOKEN_PARAM, token);
prop.put("import_" + TransactionManager.TRANSACTION_TOKEN_PARAM, token);
} else {
if (post.containsKey("file")) {
/* Check the transaction is valid */
TransactionManager.checkPostTransaction(header, post);
String file = post.get("file");
if (file.startsWith("file://")) file = file.substring(7);
if (file.startsWith("http")) {
prop.put("import_status", 1);
} else {
final File sourcefile = new File(file);
if (!sourcefile.exists()) {
prop.put("import_status", 2);
prop.put("import_status_sourceFile", sourcefile.getAbsolutePath());
} else if(!sourcefile.canRead()) {
prop.put("import_status", 3);
prop.put("import_status_sourceFile", sourcefile.getAbsolutePath());
} else if(sourcefile.isDirectory()) {
prop.put("import_status", 4);
prop.put("import_status_sourceFile", sourcefile.getAbsolutePath());
} else {
MediawikiImporter.job = new MediawikiImporter(sourcefile, sb.surrogatesInPath);
MediawikiImporter.job.start();
prop.put("import_dump", MediawikiImporter.job.source());
prop.put("import_thread", "started");
prop.put("import", 1);
}
}
MultiProtocolURL sourceURL = null;
int status = 0;
String sourceFilePath = "";
try {
sourceURL = new MultiProtocolURL(file);
if(sourceURL.isFile()) {
final File sourcefile = sourceURL.getFSFile();
sourceFilePath = sourcefile.getAbsolutePath();
if (!sourcefile.exists()) {
status = 2;
} else if (!sourcefile.canRead()) {
status = 3;
} else if (sourcefile.isDirectory()) {
status = 4;
}
}
} catch (MalformedURLException e) {
status = 1;
}
if (status == 0) {
MediawikiImporter.job = new MediawikiImporter(sourceURL, sb.surrogatesInPath);
MediawikiImporter.job.start();
prop.put("import_dump", MediawikiImporter.job.source());
prop.put("import_thread", "started");
prop.put("import", 1);
} else {
prop.put("import_status", status);
prop.put("import_status_sourceFile", sourceFilePath);
/* Acquire a transaction token for the next POST form submission */
final String token = TransactionManager.getTransactionToken(header);
prop.put(TransactionManager.TRANSACTION_TOKEN_PARAM, token);
prop.put("import_" + TransactionManager.TRANSACTION_TOKEN_PARAM, token);
}
prop.put("import_count", 0);
prop.put("import_speed", 0);
prop.put("import_runningHours", 0);

@ -48,6 +48,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.http.HttpStatus;
import jcifs.smb.SmbException;
import jcifs.smb.SmbFile;
import jcifs.smb.SmbFileInputStream;
@ -62,6 +64,7 @@ import net.yacy.cora.protocol.ftp.FTPClient;
import net.yacy.cora.protocol.http.HTTPClient;
import net.yacy.cora.util.CommonPattern;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.cora.util.HTTPInputStream;
import net.yacy.crawler.retrieval.Response;
/**
@ -2290,7 +2293,14 @@ public class MultiProtocolURL implements Serializable, Comparable<MultiProtocolU
return null;
}
public InputStream getInputStream(final ClientIdentification.Agent agent, final String username, final String pass) throws IOException {
/**
* Open an input stream on the resource described by this URL.
* <strong>Please don't forget to release resources by closing the returned stream.</strong>
* @param agent user agent identifier to use when the protocul is HTTP
* @return an open input stream
* @throws IOException when the stream can not be opened
*/
public InputStream getInputStream(final ClientIdentification.Agent agent) throws IOException {
if (isFile()) return new BufferedInputStream(new FileInputStream(getFSFile()));
if (isSMB()) return new BufferedInputStream(new SmbFileInputStream(getSmbFile()));
if (isFTP()) {
@ -2303,7 +2313,12 @@ public class MultiProtocolURL implements Serializable, Comparable<MultiProtocolU
if (isHTTP() || isHTTPS()) {
final HTTPClient client = new HTTPClient(agent);
client.setHost(getHost());
return new ByteArrayInputStream(client.GETbytes(this, username, pass, false));
client.GET(this, false);
if (client.getStatusCode() != HttpStatus.SC_OK) {
throw new IOException("Unable to open http stream on " + this.toString() +
"\nServer returned status: " + client.getHttpResponse().getStatusLine());
}
return new HTTPInputStream(client);
}
return null;

@ -138,7 +138,7 @@ public class FileLoader {
}
// load the resource
InputStream is = url.getInputStream(ClientIdentification.yacyInternetCrawlerAgent, null, null);
InputStream is = url.getInputStream(ClientIdentification.yacyInternetCrawlerAgent);
byte[] b = FileUtils.read(is);
// create response with loaded content

@ -156,7 +156,7 @@ public class SMBLoader {
}
// load the resource
InputStream is = url.getInputStream(ClientIdentification.yacyInternetCrawlerAgent, null, null);
InputStream is = url.getInputStream(ClientIdentification.yacyInternetCrawlerAgent);
byte[] b = FileUtils.read(is);
// create response with loaded content

@ -54,6 +54,9 @@ import java.util.zip.GZIPInputStream;
import net.yacy.cora.document.encoding.UTF8;
import net.yacy.cora.document.id.AnchorURL;
import net.yacy.cora.document.id.DigestURL;
import net.yacy.cora.document.id.MultiProtocolURL;
import net.yacy.cora.protocol.ClientIdentification;
import net.yacy.cora.protocol.http.HTTPClient;
import net.yacy.cora.util.ByteBuffer;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.cora.util.NumberTools;
@ -86,17 +89,18 @@ public class MediawikiImporter extends Thread implements Importer {
public static Importer job; // if started from a servlet, this object is used to store the thread
public File sourcefile;
public MultiProtocolURL sourcefile;
public File targetdir;
public int count;
private long start;
private final long docsize;
private final int approxdocs;
private String hostport, urlStub;
private String errorMessage;
public MediawikiImporter(final File sourcefile, final File targetdir) {
super("MediawikiImporter(" + sourcefile != null ? sourcefile.getAbsolutePath() : "null sourcefile" +")");
public MediawikiImporter(final MultiProtocolURL sourcefile, final File targetdir) {
super("MediawikiImporter(" + sourcefile != null ? sourcefile.toNormalform(true) : "null sourcefile" +")");
this.sourcefile = sourcefile;
this.docsize = sourcefile.length();
this.approxdocs = (int) (this.docsize * docspermbinxmlbz2 / 1024L / 1024L);
@ -105,6 +109,7 @@ public class MediawikiImporter extends Thread implements Importer {
this.start = 0;
this.hostport = null;
this.urlStub = null;
this.errorMessage = null;
}
@Override
@ -114,12 +119,15 @@ public class MediawikiImporter extends Thread implements Importer {
@Override
public String source() {
return this.sourcefile.getAbsolutePath();
return this.sourcefile.toNormalform(true);
}
/**
* @return an empty string or the error message when an exception occurred
*/
@Override
public String status() {
return "";
return this.errorMessage != null ? this.errorMessage : "";
}
/**
@ -152,17 +160,18 @@ public class MediawikiImporter extends Thread implements Importer {
// regardless of any exception (e.g. eof memory) a add(poison) is added to the most outer final block
final BlockingQueue<wikiparserrecord> out = new ArrayBlockingQueue<wikiparserrecord>(threads * 10);
final wikiparserrecord poison = newRecord();
BufferedReader reader = null;
try {
String targetstub = this.sourcefile.getName();
String targetstub = this.sourcefile.getFileName();
int p = targetstub.lastIndexOf("\\.");
if (p > 0) targetstub = targetstub.substring(0, p);
InputStream is = new BufferedInputStream(new FileInputStream(this.sourcefile), 1024 * 1024);
if (this.sourcefile.getName().endsWith(".bz2")) {
InputStream is = new BufferedInputStream(this.sourcefile.getInputStream(ClientIdentification.yacyInternetCrawlerAgent), 1024 * 1024);
if (this.sourcefile.getFileName().endsWith(".bz2")) {
is = new BZip2CompressorInputStream(is);
} else if (this.sourcefile.getName().endsWith(".gz")) {
} else if (this.sourcefile.getFileName().endsWith(".gz")) {
is = new GZIPInputStream(is);
}
final BufferedReader r = new BufferedReader(new java.io.InputStreamReader(is, StandardCharsets.UTF_8), 4 * 1024 * 1024);
reader = new BufferedReader(new java.io.InputStreamReader(is, StandardCharsets.UTF_8), 4 * 1024 * 1024);
String t;
StringBuilder sb = new StringBuilder();
boolean page = false, text = false;
@ -181,7 +190,7 @@ public class MediawikiImporter extends Thread implements Importer {
wikiparserrecord record;
int q;
while ((t = r.readLine()) != null) {
while ((t = reader.readLine()) != null) {
if ((p = t.indexOf("<base>",0)) >= 0 && (q = t.indexOf("</base>", p)) > 0) {
//urlStub = "http://" + lang + ".wikipedia.org/wiki/";
this.urlStub = t.substring(p + 6, q);
@ -256,7 +265,6 @@ public class MediawikiImporter extends Thread implements Importer {
sb.append('\n');
}
}
r.close();
try {
for (int i = 0; i < threads; i++) {
@ -265,23 +273,24 @@ public class MediawikiImporter extends Thread implements Importer {
for (int i = 0; i < threads; i++) {
consumerResults[i].get(10000, TimeUnit.MILLISECONDS);
}
} catch (final InterruptedException e) {
ConcurrentLog.logException(e);
} catch (final ExecutionException e) {
ConcurrentLog.logException(e);
} catch (final TimeoutException e) {
ConcurrentLog.logException(e);
} catch (final Exception e) {
this.errorMessage = e.getMessage();
ConcurrentLog.logException(e);
} finally {
out.put(poison); // output thread condition (for file.close)
writerResult.get(10000, TimeUnit.MILLISECONDS);
}
} catch (final IOException e) {
ConcurrentLog.logException(e);
} catch (final Exception e) {
this.errorMessage = e.getMessage();
ConcurrentLog.logException(e);
} finally {
if(reader != null) {
try {
reader.close();
} catch (IOException e) {
ConcurrentLog.warn("WIKITRANSLATION", "Could not close dump reader : " + e.getMessage());
}
}
try {
out.put(poison); // out keeps output file open until poisened, to close file if exception happend in this block
} catch (InterruptedException ex) { }
@ -767,7 +776,7 @@ public class MediawikiImporter extends Thread implements Importer {
System.out.println(" -index <wikipedia-dump>");
System.out.println(" -read <start> <len> <idx-file>");
System.out.println(" -find <title> <wikipedia-dump>");
System.out.println(" -convert <wikipedia-dump-xml.bz2> <convert-target-dir> <url-stub>");
System.out.println(" -convert <wikipedia-dump-xml.bz2> <convert-target-dir>");
ConcurrentLog.shutdown();
return;
}
@ -779,17 +788,22 @@ public class MediawikiImporter extends Thread implements Importer {
// DATA/HTCACHE/dewiki-20090311-pages-articles.xml.bz2
// DATA/SURROGATES/in/ http://de.wikipedia.org/wiki/
if (s[0].equals("-convert") && s.length > 2) {
final File sourcefile = new File(s[1]);
if (s[0].equals("-convert")) {
if(s.length < 3) {
System.out.println("usage:");
System.out.println(" -convert <wikipedia-dump-xml.bz2> <convert-target-dir>");
ConcurrentLog.shutdown();
return;
}
final File targetdir = new File(s[2]);
// String urlStub = s[3]; // i.e. http://de.wikipedia.org/wiki/
// String language = urlStub.substring(7,9);
try {
final MediawikiImporter mi = new MediawikiImporter(sourcefile, targetdir);
final MediawikiImporter mi = new MediawikiImporter(new MultiProtocolURL(s[1]), targetdir);
mi.start();
mi.join();
} catch (final InterruptedException e) {
ConcurrentLog.logException(e);
} catch (MalformedURLException e) {
ConcurrentLog.logException(e);
}
}
@ -821,6 +835,11 @@ public class MediawikiImporter extends Thread implements Importer {
}
} finally {
try {
HTTPClient.closeConnectionManager();
} catch (InterruptedException e) {
e.printStackTrace();
}
ConcurrentLog.shutdown();
}
}

@ -380,8 +380,20 @@ public class htmlParser extends AbstractParser implements Parser {
locationSnapshot = new DigestURL(location.toNormalform(true) + "?_escaped_fragment_=");
}
Charset[] detectedcharsetcontainer = new Charset[]{null};
ContentScraper scraperSnapshot = parseToScraper(location, documentCharset, vocscraper, detectedcharsetcontainer, timezoneOffset, locationSnapshot.getInputStream(ClientIdentification.yacyInternetCrawlerAgent, null, null), maxLinks);
documentSnapshot = transformScraper(location, mimeType, detectedcharsetcontainer[0].name(), scraperSnapshot);
InputStream snapshotStream = null;
try {
snapshotStream = locationSnapshot.getInputStream(ClientIdentification.yacyInternetCrawlerAgent);
ContentScraper scraperSnapshot = parseToScraper(location, documentCharset, vocscraper, detectedcharsetcontainer, timezoneOffset, snapshotStream, maxLinks);
documentSnapshot = transformScraper(location, mimeType, detectedcharsetcontainer[0].name(), scraperSnapshot);
} finally {
if(snapshotStream != null) {
try {
snapshotStream.close();
} catch(IOException e) {
AbstractParser.log.warn("Could not close snapshot stream : " + e.getMessage());
}
}
}
AbstractParser.log.info("parse snapshot "+locationSnapshot.toString() + " additional to " + location.toString());
} catch (IOException | Failure ex) { }
return documentSnapshot;

@ -394,7 +394,7 @@ public final class LoaderDispatcher {
inStream = this.httpLoader.openInputStream(request, crawlProfile, 1, maxFileSize, blacklistType, agent);
} else if (protocol.equals("ftp") || protocol.equals("smb") || protocol.equals("file")) {
// may also open directly stream with ftp loader
inStream = url.getInputStream(agent, null, null);
inStream = url.getInputStream(agent);
} else {
throw new IOException("Unsupported protocol '" + protocol + "' in url " + url);
}

@ -28,6 +28,7 @@ package net.yacy.search.index;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@ -158,10 +159,20 @@ public class DocumentIndex extends Segment {
} catch (final Exception e ) {
length = -1;
}
InputStream sourceStream = null;
try {
documents = TextParser.parseSource(url, null, null, new VocabularyScraper(), timezoneOffset, 0, length, url.getInputStream(ClientIdentification.yacyInternetCrawlerAgent, null, null));
sourceStream = url.getInputStream(ClientIdentification.yacyInternetCrawlerAgent);
documents = TextParser.parseSource(url, null, null, new VocabularyScraper(), timezoneOffset, 0, length, sourceStream);
} catch (final Exception e ) {
throw new IOException("cannot parse " + url.toNormalform(false) + ": " + e.getMessage());
} finally {
if(sourceStream != null) {
try {
sourceStream.close();
} catch(IOException e) {
ConcurrentLog.warn("DocumentIndex", "Could not close source stream : " + e.getMessage());
}
}
}
//Document document = Document.mergeDocuments(url, null, documents);
final SolrInputDocument[] rows = new SolrInputDocument[documents.length];

Loading…
Cancel
Save