modified export: added maximum number of docs per chunk

The export file can now be many files, called chunks.
By default still only one chunk is exported.
This function is required in case that the exported files shall be
imported to an elasticsearch/opensearch index. The bulk import function
of elasticsearch/opensearch is limited to 100MB. To make it possible to
import YaCy files, those must be splitted into chunks. Right now we
cannot estimate the chunk size as bytes, only as number of documents.
The user must do experiments to find out the optimum chunk max size,
like 50000 docs per chunk. Try this as first attempt.
pull/612/head
Michael Peter Christen 1 year ago
parent 655d8db802
commit c20c4b8a21

@ -21,13 +21,16 @@
<dd><input type="text" name="exportfilepath" value="#[exportfilepath]#" size="120" maxlength="250" />
</dd>
<dt class="TableCellDark">URL Filter</dt>
<dd><input type="text" name="exportfilter" value=".*.*" size="20" maxlength="250" />
<dd><input type="text" name="exportfilter" value=".*.*" size="20" maxlength="250" />&nbsp;.*.* (default) is a catch-all; format: java regex
</dd>
<dt class="TableCellDark">query</dt>
<dd><input type="text" name="exportquery" value="*:*" size="20" maxlength="250" />
<dd><input type="text" name="exportquery" value="*:*" size="20" maxlength="250" />&nbsp;*:* (default) is a catch-all; format: <field-name>:<solr-pattern>
</dd>
<dt class="TableCellDark">maximum age (seconds, -1 = unlimited)</dt>
<dd><input type="text" name="exportmaxseconds" value="-1" size="20" maxlength="250" />
<dt class="TableCellDark">maximum age (seconds)</dt>
<dd><input type="text" name="exportmaxseconds" value="-1" size="20" maxlength="250" />&nbsp;-1 = unlimited -> no document is too old
</dd>
<dt class="TableCellDark">maximum number of records per chunk</dt>
<dd><input type="text" name="maxchunksize" value="-1" size="20" maxlength="250" />&nbsp;if exceeded: several chunks are stored; -1 = unlimited (makes only one chunk)
</dd>
<dt class="TableCellDark">Export Format</dt>
<dd>

@ -64,8 +64,8 @@ public class IndexExport_p {
prop.put("lurlexport", 0);
prop.put("reload", 0);
prop.put("dumprestore", 1);
prop.put("dumprestore_dumpRestoreEnabled", sb.getConfigBool(SwitchboardConstants.CORE_SERVICE_FULLTEXT,
SwitchboardConstants.CORE_SERVICE_FULLTEXT_DEFAULT));
prop.put("dumprestore_dumpRestoreEnabled", sb.getConfigBool(SwitchboardConstants.CORE_SERVICE_FULLTEXT,
SwitchboardConstants.CORE_SERVICE_FULLTEXT_DEFAULT));
List<File> dumpFiles = segment.fulltext().dumpFiles();
prop.put("dumprestore_dumpfile", dumpFiles.size() == 0 ? "" : dumpFiles.get(dumpFiles.size() - 1).getAbsolutePath());
prop.put("dumprestore_optimizemax", 10);
@ -80,7 +80,7 @@ public class IndexExport_p {
prop.put("lurlexportfinished", 0);
prop.put("lurlexporterror", 0);
prop.put("lurlexport_exportfile", export.file().toString());
prop.put("lurlexport_urlcount", export.count());
prop.put("lurlexport_urlcount", export.docCount());
prop.put("reload", 1);
} else {
prop.put("lurlexport", 1);
@ -93,7 +93,7 @@ public class IndexExport_p {
// an export was running but has finished
prop.put("lurlexportfinished", 1);
prop.put("lurlexportfinished_exportfile", export.file().toString());
prop.put("lurlexportfinished_urlcount", export.count());
prop.put("lurlexportfinished_urlcount", export.docCount());
if (export.failed() == null) {
prop.put("lurlexporterror", 0);
} else {
@ -123,6 +123,8 @@ public class IndexExport_p {
final String filter = post.get("exportfilter", ".*");
final String query = post.get("exportquery", "*:*");
final int maxseconds = post.getInt("exportmaxseconds", -1);
long maxChunkSize = post.getLong("maxchunksize", Long.MAX_VALUE);
if (maxChunkSize <= 0) maxChunkSize = Long.MAX_VALUE;
final String path = post.get("exportfilepath", "");
// store this call as api call: we do this even if there is a chance that it fails because recurring calls may do not fail
@ -130,7 +132,7 @@ public class IndexExport_p {
// start the export
try {
export = sb.index.fulltext().export(format, filter, query, maxseconds, new File(path), dom, text);
export = sb.index.fulltext().export(format, filter, query, maxseconds, new File(path), dom, text, maxChunkSize);
} catch (final IOException e) {
prop.put("lurlexporterror", 1);
prop.put("lurlexporterror_exportfile", "-no export-");
@ -140,7 +142,7 @@ public class IndexExport_p {
// show result
prop.put("lurlexport_exportfile", export.file().toString());
prop.put("lurlexport_urlcount", export.count());
prop.put("lurlexport_urlcount", export.docCount());
if ((export != null) && (export.failed() == null)) {
prop.put("lurlexport", 2);
}
@ -148,34 +150,34 @@ public class IndexExport_p {
}
if (post.containsKey("indexdump")) {
try {
final File dump = segment.fulltext().dumpEmbeddedSolr();
prop.put("indexdump", 1);
prop.put("indexdump_dumpfile", dump.getAbsolutePath());
dumpFiles = segment.fulltext().dumpFiles();
prop.put("dumprestore_dumpfile", dumpFiles.size() == 0 ? "" : dumpFiles.get(dumpFiles.size() - 1).getAbsolutePath());
// sb.tables.recordAPICall(post, "IndexExport_p.html", WorkTables.TABLE_API_TYPE_STEERING, "solr dump generation");
} catch(final SolrException e) {
if(ErrorCode.SERVICE_UNAVAILABLE.code == e.code()) {
prop.put("indexdump", 2);
} else {
prop.put("indexdump", 3);
}
}
try {
final File dump = segment.fulltext().dumpEmbeddedSolr();
prop.put("indexdump", 1);
prop.put("indexdump_dumpfile", dump.getAbsolutePath());
dumpFiles = segment.fulltext().dumpFiles();
prop.put("dumprestore_dumpfile", dumpFiles.size() == 0 ? "" : dumpFiles.get(dumpFiles.size() - 1).getAbsolutePath());
// sb.tables.recordAPICall(post, "IndexExport_p.html", WorkTables.TABLE_API_TYPE_STEERING, "solr dump generation");
} catch(final SolrException e) {
if(ErrorCode.SERVICE_UNAVAILABLE.code == e.code()) {
prop.put("indexdump", 2);
} else {
prop.put("indexdump", 3);
}
}
}
if (post.containsKey("indexrestore")) {
try {
final File dump = new File(post.get("dumpfile", ""));
segment.fulltext().restoreEmbeddedSolr(dump);
prop.put("indexRestore", 1);
} catch(final SolrException e) {
if(ErrorCode.SERVICE_UNAVAILABLE.code == e.code()) {
prop.put("indexRestore", 2);
} else {
prop.put("indexRestore", 3);
}
}
try {
final File dump = new File(post.get("dumpfile", ""));
segment.fulltext().restoreEmbeddedSolr(dump);
prop.put("indexRestore", 1);
} catch(final SolrException e) {
if(ErrorCode.SERVICE_UNAVAILABLE.code == e.code()) {
prop.put("indexRestore", 2);
} else {
prop.put("indexRestore", 3);
}
}
}
// insert constants

@ -695,7 +695,10 @@ public final class Fulltext {
}
public final static String yacy_dump_prefix = "yacy_dump_";
public Export export(Fulltext.ExportFormat format, String filter, String query, final int maxseconds, File path, boolean dom, boolean text) throws IOException {
public Export export(
Fulltext.ExportFormat format, String filter, String query,
final int maxseconds, File path, boolean dom, boolean text,
long maxChunkSize) throws IOException {
// modify query according to maxseconds
final long now = System.currentTimeMillis();
@ -760,27 +763,26 @@ public final class Fulltext {
}
}
String s = new File(path, yacy_dump_prefix +
String filename = yacy_dump_prefix +
"f" + GenericFormatter.SHORT_MINUTE_FORMATTER.format(firstdate) + "_" +
"l" + GenericFormatter.SHORT_MINUTE_FORMATTER.format(lastdate) + "_" +
"n" + GenericFormatter.SHORT_MINUTE_FORMATTER.format(new Date(now)) + "_" +
"c" + String.format("%1$012d", doccount)).getAbsolutePath() + "_tc"; // the name ends with the transaction token ('c' = 'created')
"c" + String.format("%1$012d", doccount)+ "_tc"; // the name ends with the transaction token ('c' = 'created')
// create export file name
if (s.indexOf('.',0) < 0) s += "." + format.getExt();
final File f = new File(s);
f.getParentFile().mkdirs();
return export(f, filter, query, format, dom, text);
return export(path, filename, format.getExt(), filter, query, format, dom, text, maxChunkSize);
}
// export methods
public Export export(final File f, final String filter, final String query, final ExportFormat format, final boolean dom, final boolean text) {
public Export export(
final File path, final String filename,
final String fileext, final String filter, final String query,
final ExportFormat format, final boolean dom, final boolean text,
long maxChunkSize) {
if ((this.exportthread != null) && (this.exportthread.isAlive())) {
ConcurrentLog.warn("LURL-EXPORT", "cannot start another export thread, already one running");
return this.exportthread;
}
this.exportthread = new Export(f, filter, query, format, dom, text);
this.exportthread = new Export(path, filename, fileext, filter, query, format, dom, text, maxChunkSize);
this.exportthread.start();
return this.exportthread;
}
@ -795,69 +797,95 @@ public final class Fulltext {
}
public class Export extends Thread {
private final File f;
private final File path;
private final String filename, fileext;
private final Pattern pattern;
private int count;
private String failure;
private final String query;
private final ExportFormat format;
private final boolean dom, text;
private Export(final File f, final String filter, final String query, final ExportFormat format, final boolean dom, final boolean text) {
private int docCount, chunkSize, chunkCount;
private final long maxChunkSize;
private Export(
final File path, final String filename,
final String fileext, final String filter, final String query,
final ExportFormat format, final boolean dom, final boolean text,
long maxChunkSize) {
super("Fulltext.Export");
// format: 0=text, 1=html, 2=rss/xml
this.f = f;
this.path = path;
this.filename = filename;
this.fileext = fileext;
this.pattern = filter == null ? null : Pattern.compile(filter);
this.query = query == null? AbstractSolrConnector.CATCHALL_QUERY : query;
this.count = 0;
this.failure = null;
this.format = format;
this.dom = dom;
this.text = text;
this.docCount = 0; // number of all documents exported so far
this.chunkSize = 0; // number of documents in the current chunk
this.chunkCount = 0; // number of chunks opened so far
this.maxChunkSize = maxChunkSize; // number of maximum document count per chunk
//if ((dom) && (format == 2)) dom = false;
}
private void printHead(PrintWriter pw) {
if (this.format == ExportFormat.html) {
pw.println("<html><head></head><body>");
}
if (this.format == ExportFormat.rss) {
pw.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
pw.println("<?xml-stylesheet type='text/xsl' href='/yacysearch.xsl' version='1.0'?>");
pw.println("<rss version=\"2.0\" xmlns:yacy=\"http://www.yacy.net/\" xmlns:opensearch=\"http://a9.com/-/spec/opensearch/1.1/\" xmlns:atom=\"http://www.w3.org/2005/Atom\">");
pw.println("<channel>");
pw.println("<title>YaCy Peer-to-Peer - Web-Search URL Export</title>");
pw.println("<description></description>");
pw.println("<link>http://yacy.net</link>");
}
if (this.format == ExportFormat.solr) {
pw.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
pw.println("<response>");
pw.println("<lst name=\"responseHeader\">");
pw.println(" <str format=\"yacy.index.export.solr.xml\"/>");
pw.println(" <lst name=\"params\">");
pw.println(" <str name=\"q\">" + this.query + "</str>");
pw.println(" </lst>");
pw.println("</lst>");
pw.println("<result>");
}
}
private void printTail(PrintWriter pw) {
if (this.format == ExportFormat.html) {
pw.println("</body></html>");
}
if (this.format == ExportFormat.rss) {
pw.println("</channel>");
pw.println("</rss>");
}
if (this.format == ExportFormat.solr) {
pw.println("</result>");
pw.println("</response>");
}
}
@Override
public void run() {
try {
final File parentf = this.f.getParentFile();
if (parentf != null) {
parentf.mkdirs();
}
if (this.path != null) this.path.mkdirs();
} catch(final Exception e) {
ConcurrentLog.logException(e);
this.failure = e.getMessage();
return;
}
try (/* Resources automatically closed by this try-with-resources statement */
final OutputStream os = new FileOutputStream(this.format == ExportFormat.solr ? new File(this.f.getAbsolutePath() + ".gz") : this.f);
final OutputStream wrappedStream = ((this.format == ExportFormat.solr)) ? new GZIPOutputStream(os, 65536){{this.def.setLevel(Deflater.BEST_COMPRESSION);}} : os;
final PrintWriter pw = new PrintWriter(new BufferedOutputStream(wrappedStream));
) {
if (this.format == ExportFormat.html) {
pw.println("<html><head></head><body>");
}
if (this.format == ExportFormat.rss) {
pw.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
pw.println("<?xml-stylesheet type='text/xsl' href='/yacysearch.xsl' version='1.0'?>");
pw.println("<rss version=\"2.0\" xmlns:yacy=\"http://www.yacy.net/\" xmlns:opensearch=\"http://a9.com/-/spec/opensearch/1.1/\" xmlns:atom=\"http://www.w3.org/2005/Atom\">");
pw.println("<channel>");
pw.println("<title>YaCy Peer-to-Peer - Web-Search URL Export</title>");
pw.println("<description></description>");
pw.println("<link>http://yacy.net</link>");
}
if (this.format == ExportFormat.solr) {
pw.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
pw.println("<response>");
pw.println("<lst name=\"responseHeader\">");
pw.println(" <str format=\"yacy.index.export.solr.xml\"/>");
pw.println(" <lst name=\"params\">");
pw.println(" <str name=\"q\">" + this.query + "</str>");
pw.println(" </lst>");
pw.println("</lst>");
pw.println("<result>");
}
try {
docCount = 0;
chunkSize = 0;
chunkCount = 0;
PrintWriter pw = getWriter();
printHead(pw);
if (this.dom) {
final Map<String, ReversibleScoreMap<String>> scores = Fulltext.this.getDefaultConnector().getFacets(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", 100000000, CollectionSchema.host_s.getSolrFieldName());
final ReversibleScoreMap<String> stats = scores.get(CollectionSchema.host_s.getSolrFieldName());
@ -865,7 +893,7 @@ public final class Fulltext {
if (this.pattern != null && !this.pattern.matcher(host).matches()) continue;
if (this.format == ExportFormat.text) pw.println(host);
if (this.format == ExportFormat.html) pw.println("<a href=\"http://" + host + "\">" + host + "</a><br>");
this.count++;
this.docCount++; this.chunkSize++;
}
} else {
if (this.format == ExportFormat.solr || this.format == ExportFormat.elasticsearch || (this.text && this.format == ExportFormat.text)) {
@ -882,7 +910,14 @@ public final class Fulltext {
if (this.format == ExportFormat.elasticsearch) pw.println("{\"index\":{}}");
final String d = sw.toString();
pw.println(d);
this.count++;
this.docCount++; this.chunkSize++;
if (this.chunkSize >= this.maxChunkSize) {
printTail(pw);
pw.close();
pw = getWriter(); // increases chunkCount as side-effect
printHead(pw);
this.chunkSize = 0;
}
}
} else {
final BlockingQueue<SolrDocument> docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", null, 0, 100000000, Long.MAX_VALUE, 100, 1, true,
@ -918,21 +953,19 @@ public final class Fulltext {
pw.println("<guid isPermaLink=\"false\">" + hash + "</guid>");
pw.println("</item>");
}
this.count++;
this.docCount++; this.chunkSize++;
if (this.chunkSize >= this.maxChunkSize) {
printTail(pw);
pw.close();
pw = getWriter(); // increases chunkCount as side-effect
printHead(pw);
this.chunkSize = 0;
}
}
}
}
if (this.format == ExportFormat.html) {
pw.println("</body></html>");
}
if (this.format == ExportFormat.rss) {
pw.println("</channel>");
pw.println("</rss>");
}
if (this.format == ExportFormat.solr) {
pw.println("</result>");
pw.println("</response>");
}
printTail(pw);
pw.close();
} catch (final Exception e) {
/* Catch but log any IO exception that can occur on copy, automatic closing or streams creation */
ConcurrentLog.logException(e);
@ -942,15 +975,47 @@ public final class Fulltext {
}
public File file() {
return this.f;
final File f = new File(this.path, this.filename + "_" + chunkcount(this.chunkCount) + "." + this.fileext);
return f;
}
private PrintWriter getWriter() throws IOException {
File f = file();
final OutputStream os = new FileOutputStream(this.format == ExportFormat.solr ? new File(f.getAbsolutePath() + ".gz") : f);
final PrintWriter pw = new PrintWriter(new BufferedOutputStream(((this.format == ExportFormat.solr)) ? new GZIPOutputStream(os, 65536){{this.def.setLevel(Deflater.BEST_COMPRESSION);}} : os));
this.chunkCount++;
return pw;
}
private String chunkcount(int count) {
if (count < 10) return "000" + count;
if (count < 100) return "00" + count;
if (count < 1000) return "0" + count;
return "" + count;
}
public File path() {
return this.path;
}
public String filename() {
return this.filename;
}
public String fileext() {
return this.fileext;
}
public String failed() {
return this.failure;
}
public int count() {
return this.count;
public int docCount() {
return this.docCount;
}
public int chunkCount() {
return this.chunkCount;
}
@SuppressWarnings("unchecked")

Loading…
Cancel
Save