From c20c4b8a21364bf06d1f91c63650fcf3b434ba04 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Sun, 12 Nov 2023 22:11:55 +0100 Subject: [PATCH] modified export: added maximum number of docs per chunk The export file can now be many files, called chunks. By default still only one chunk is exported. This function is required in case that the exported files shall be imported to an elasticsearch/opensearch index. The bulk import function of elasticsearch/opensearch is limited to 100MB. To make it possible to import YaCy files, those must be splitted into chunks. Right now we cannot estimate the chunk size as bytes, only as number of documents. The user must do experiments to find out the optimum chunk max size, like 50000 docs per chunk. Try this as first attempt. --- htroot/IndexExport_p.html | 11 +- source/net/yacy/htroot/IndexExport_p.java | 64 +++---- source/net/yacy/search/index/Fulltext.java | 197 ++++++++++++++------- 3 files changed, 171 insertions(+), 101 deletions(-) diff --git a/htroot/IndexExport_p.html b/htroot/IndexExport_p.html index eb3e1f188..87ee4b62d 100644 --- a/htroot/IndexExport_p.html +++ b/htroot/IndexExport_p.html @@ -21,13 +21,16 @@
URL Filter
-
+
 .*.* (default) is a catch-all; format: java regex
query
-
+
 *:* (default) is a catch-all; format: :
-
maximum age (seconds, -1 = unlimited)
-
+
maximum age (seconds)
+
 -1 = unlimited -> no document is too old +
+
maximum number of records per chunk
+
 if exceeded: several chunks are stored; -1 = unlimited (makes only one chunk)
Export Format
diff --git a/source/net/yacy/htroot/IndexExport_p.java b/source/net/yacy/htroot/IndexExport_p.java index 78cc94132..aa5fc6f09 100644 --- a/source/net/yacy/htroot/IndexExport_p.java +++ b/source/net/yacy/htroot/IndexExport_p.java @@ -64,8 +64,8 @@ public class IndexExport_p { prop.put("lurlexport", 0); prop.put("reload", 0); prop.put("dumprestore", 1); - prop.put("dumprestore_dumpRestoreEnabled", sb.getConfigBool(SwitchboardConstants.CORE_SERVICE_FULLTEXT, - SwitchboardConstants.CORE_SERVICE_FULLTEXT_DEFAULT)); + prop.put("dumprestore_dumpRestoreEnabled", sb.getConfigBool(SwitchboardConstants.CORE_SERVICE_FULLTEXT, + SwitchboardConstants.CORE_SERVICE_FULLTEXT_DEFAULT)); List dumpFiles = segment.fulltext().dumpFiles(); prop.put("dumprestore_dumpfile", dumpFiles.size() == 0 ? "" : dumpFiles.get(dumpFiles.size() - 1).getAbsolutePath()); prop.put("dumprestore_optimizemax", 10); @@ -80,7 +80,7 @@ public class IndexExport_p { prop.put("lurlexportfinished", 0); prop.put("lurlexporterror", 0); prop.put("lurlexport_exportfile", export.file().toString()); - prop.put("lurlexport_urlcount", export.count()); + prop.put("lurlexport_urlcount", export.docCount()); prop.put("reload", 1); } else { prop.put("lurlexport", 1); @@ -93,7 +93,7 @@ public class IndexExport_p { // an export was running but has finished prop.put("lurlexportfinished", 1); prop.put("lurlexportfinished_exportfile", export.file().toString()); - prop.put("lurlexportfinished_urlcount", export.count()); + prop.put("lurlexportfinished_urlcount", export.docCount()); if (export.failed() == null) { prop.put("lurlexporterror", 0); } else { @@ -123,6 +123,8 @@ public class IndexExport_p { final String filter = post.get("exportfilter", ".*"); final String query = post.get("exportquery", "*:*"); final int maxseconds = post.getInt("exportmaxseconds", -1); + long maxChunkSize = post.getLong("maxchunksize", Long.MAX_VALUE); + if (maxChunkSize <= 0) maxChunkSize = Long.MAX_VALUE; final String path = post.get("exportfilepath", ""); // store this call as api call: we do this even if there is a chance that it fails because recurring calls may do not fail @@ -130,7 +132,7 @@ public class IndexExport_p { // start the export try { - export = sb.index.fulltext().export(format, filter, query, maxseconds, new File(path), dom, text); + export = sb.index.fulltext().export(format, filter, query, maxseconds, new File(path), dom, text, maxChunkSize); } catch (final IOException e) { prop.put("lurlexporterror", 1); prop.put("lurlexporterror_exportfile", "-no export-"); @@ -140,7 +142,7 @@ public class IndexExport_p { // show result prop.put("lurlexport_exportfile", export.file().toString()); - prop.put("lurlexport_urlcount", export.count()); + prop.put("lurlexport_urlcount", export.docCount()); if ((export != null) && (export.failed() == null)) { prop.put("lurlexport", 2); } @@ -148,34 +150,34 @@ public class IndexExport_p { } if (post.containsKey("indexdump")) { - try { - final File dump = segment.fulltext().dumpEmbeddedSolr(); - prop.put("indexdump", 1); - prop.put("indexdump_dumpfile", dump.getAbsolutePath()); - dumpFiles = segment.fulltext().dumpFiles(); - prop.put("dumprestore_dumpfile", dumpFiles.size() == 0 ? "" : dumpFiles.get(dumpFiles.size() - 1).getAbsolutePath()); - // sb.tables.recordAPICall(post, "IndexExport_p.html", WorkTables.TABLE_API_TYPE_STEERING, "solr dump generation"); - } catch(final SolrException e) { - if(ErrorCode.SERVICE_UNAVAILABLE.code == e.code()) { - prop.put("indexdump", 2); - } else { - prop.put("indexdump", 3); - } - } + try { + final File dump = segment.fulltext().dumpEmbeddedSolr(); + prop.put("indexdump", 1); + prop.put("indexdump_dumpfile", dump.getAbsolutePath()); + dumpFiles = segment.fulltext().dumpFiles(); + prop.put("dumprestore_dumpfile", dumpFiles.size() == 0 ? "" : dumpFiles.get(dumpFiles.size() - 1).getAbsolutePath()); + // sb.tables.recordAPICall(post, "IndexExport_p.html", WorkTables.TABLE_API_TYPE_STEERING, "solr dump generation"); + } catch(final SolrException e) { + if(ErrorCode.SERVICE_UNAVAILABLE.code == e.code()) { + prop.put("indexdump", 2); + } else { + prop.put("indexdump", 3); + } + } } if (post.containsKey("indexrestore")) { - try { - final File dump = new File(post.get("dumpfile", "")); - segment.fulltext().restoreEmbeddedSolr(dump); - prop.put("indexRestore", 1); - } catch(final SolrException e) { - if(ErrorCode.SERVICE_UNAVAILABLE.code == e.code()) { - prop.put("indexRestore", 2); - } else { - prop.put("indexRestore", 3); - } - } + try { + final File dump = new File(post.get("dumpfile", "")); + segment.fulltext().restoreEmbeddedSolr(dump); + prop.put("indexRestore", 1); + } catch(final SolrException e) { + if(ErrorCode.SERVICE_UNAVAILABLE.code == e.code()) { + prop.put("indexRestore", 2); + } else { + prop.put("indexRestore", 3); + } + } } // insert constants diff --git a/source/net/yacy/search/index/Fulltext.java b/source/net/yacy/search/index/Fulltext.java index 718be0099..d8a1754a7 100644 --- a/source/net/yacy/search/index/Fulltext.java +++ b/source/net/yacy/search/index/Fulltext.java @@ -695,7 +695,10 @@ public final class Fulltext { } public final static String yacy_dump_prefix = "yacy_dump_"; - public Export export(Fulltext.ExportFormat format, String filter, String query, final int maxseconds, File path, boolean dom, boolean text) throws IOException { + public Export export( + Fulltext.ExportFormat format, String filter, String query, + final int maxseconds, File path, boolean dom, boolean text, + long maxChunkSize) throws IOException { // modify query according to maxseconds final long now = System.currentTimeMillis(); @@ -760,27 +763,26 @@ public final class Fulltext { } } - String s = new File(path, yacy_dump_prefix + + String filename = yacy_dump_prefix + "f" + GenericFormatter.SHORT_MINUTE_FORMATTER.format(firstdate) + "_" + "l" + GenericFormatter.SHORT_MINUTE_FORMATTER.format(lastdate) + "_" + "n" + GenericFormatter.SHORT_MINUTE_FORMATTER.format(new Date(now)) + "_" + - "c" + String.format("%1$012d", doccount)).getAbsolutePath() + "_tc"; // the name ends with the transaction token ('c' = 'created') + "c" + String.format("%1$012d", doccount)+ "_tc"; // the name ends with the transaction token ('c' = 'created') - // create export file name - if (s.indexOf('.',0) < 0) s += "." + format.getExt(); - final File f = new File(s); - f.getParentFile().mkdirs(); - - return export(f, filter, query, format, dom, text); + return export(path, filename, format.getExt(), filter, query, format, dom, text, maxChunkSize); } // export methods - public Export export(final File f, final String filter, final String query, final ExportFormat format, final boolean dom, final boolean text) { + public Export export( + final File path, final String filename, + final String fileext, final String filter, final String query, + final ExportFormat format, final boolean dom, final boolean text, + long maxChunkSize) { if ((this.exportthread != null) && (this.exportthread.isAlive())) { ConcurrentLog.warn("LURL-EXPORT", "cannot start another export thread, already one running"); return this.exportthread; } - this.exportthread = new Export(f, filter, query, format, dom, text); + this.exportthread = new Export(path, filename, fileext, filter, query, format, dom, text, maxChunkSize); this.exportthread.start(); return this.exportthread; } @@ -795,69 +797,95 @@ public final class Fulltext { } public class Export extends Thread { - private final File f; + private final File path; + private final String filename, fileext; private final Pattern pattern; - private int count; private String failure; private final String query; private final ExportFormat format; private final boolean dom, text; - - private Export(final File f, final String filter, final String query, final ExportFormat format, final boolean dom, final boolean text) { + private int docCount, chunkSize, chunkCount; + private final long maxChunkSize; + + private Export( + final File path, final String filename, + final String fileext, final String filter, final String query, + final ExportFormat format, final boolean dom, final boolean text, + long maxChunkSize) { super("Fulltext.Export"); // format: 0=text, 1=html, 2=rss/xml - this.f = f; + this.path = path; + this.filename = filename; + this.fileext = fileext; this.pattern = filter == null ? null : Pattern.compile(filter); this.query = query == null? AbstractSolrConnector.CATCHALL_QUERY : query; - this.count = 0; this.failure = null; this.format = format; this.dom = dom; this.text = text; + this.docCount = 0; // number of all documents exported so far + this.chunkSize = 0; // number of documents in the current chunk + this.chunkCount = 0; // number of chunks opened so far + this.maxChunkSize = maxChunkSize; // number of maximum document count per chunk //if ((dom) && (format == 2)) dom = false; } + private void printHead(PrintWriter pw) { + if (this.format == ExportFormat.html) { + pw.println(""); + } + if (this.format == ExportFormat.rss) { + pw.println(""); + pw.println(""); + pw.println(""); + pw.println(""); + pw.println("YaCy Peer-to-Peer - Web-Search URL Export"); + pw.println(""); + pw.println("http://yacy.net"); + } + if (this.format == ExportFormat.solr) { + pw.println(""); + pw.println(""); + pw.println(""); + pw.println(" "); + pw.println(" "); + pw.println(" " + this.query + ""); + pw.println(" "); + pw.println(""); + pw.println(""); + } + } + + private void printTail(PrintWriter pw) { + if (this.format == ExportFormat.html) { + pw.println(""); + } + if (this.format == ExportFormat.rss) { + pw.println(""); + pw.println(""); + } + if (this.format == ExportFormat.solr) { + pw.println(""); + pw.println(""); + } + } + @Override public void run() { try { - final File parentf = this.f.getParentFile(); - if (parentf != null) { - parentf.mkdirs(); - } + if (this.path != null) this.path.mkdirs(); } catch(final Exception e) { ConcurrentLog.logException(e); this.failure = e.getMessage(); return; } - try (/* Resources automatically closed by this try-with-resources statement */ - final OutputStream os = new FileOutputStream(this.format == ExportFormat.solr ? new File(this.f.getAbsolutePath() + ".gz") : this.f); - final OutputStream wrappedStream = ((this.format == ExportFormat.solr)) ? new GZIPOutputStream(os, 65536){{this.def.setLevel(Deflater.BEST_COMPRESSION);}} : os; - final PrintWriter pw = new PrintWriter(new BufferedOutputStream(wrappedStream)); - ) { - if (this.format == ExportFormat.html) { - pw.println(""); - } - if (this.format == ExportFormat.rss) { - pw.println(""); - pw.println(""); - pw.println(""); - pw.println(""); - pw.println("YaCy Peer-to-Peer - Web-Search URL Export"); - pw.println(""); - pw.println("http://yacy.net"); - } - if (this.format == ExportFormat.solr) { - pw.println(""); - pw.println(""); - pw.println(""); - pw.println(" "); - pw.println(" "); - pw.println(" " + this.query + ""); - pw.println(" "); - pw.println(""); - pw.println(""); - } + try { + docCount = 0; + chunkSize = 0; + chunkCount = 0; + PrintWriter pw = getWriter(); + printHead(pw); if (this.dom) { final Map> scores = Fulltext.this.getDefaultConnector().getFacets(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", 100000000, CollectionSchema.host_s.getSolrFieldName()); final ReversibleScoreMap stats = scores.get(CollectionSchema.host_s.getSolrFieldName()); @@ -865,7 +893,7 @@ public final class Fulltext { if (this.pattern != null && !this.pattern.matcher(host).matches()) continue; if (this.format == ExportFormat.text) pw.println(host); if (this.format == ExportFormat.html) pw.println("" + host + "
"); - this.count++; + this.docCount++; this.chunkSize++; } } else { if (this.format == ExportFormat.solr || this.format == ExportFormat.elasticsearch || (this.text && this.format == ExportFormat.text)) { @@ -882,7 +910,14 @@ public final class Fulltext { if (this.format == ExportFormat.elasticsearch) pw.println("{\"index\":{}}"); final String d = sw.toString(); pw.println(d); - this.count++; + this.docCount++; this.chunkSize++; + if (this.chunkSize >= this.maxChunkSize) { + printTail(pw); + pw.close(); + pw = getWriter(); // increases chunkCount as side-effect + printHead(pw); + this.chunkSize = 0; + } } } else { final BlockingQueue docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", null, 0, 100000000, Long.MAX_VALUE, 100, 1, true, @@ -918,21 +953,19 @@ public final class Fulltext { pw.println("" + hash + ""); pw.println(""); } - this.count++; + this.docCount++; this.chunkSize++; + if (this.chunkSize >= this.maxChunkSize) { + printTail(pw); + pw.close(); + pw = getWriter(); // increases chunkCount as side-effect + printHead(pw); + this.chunkSize = 0; + } } } } - if (this.format == ExportFormat.html) { - pw.println(""); - } - if (this.format == ExportFormat.rss) { - pw.println("
"); - pw.println("
"); - } - if (this.format == ExportFormat.solr) { - pw.println(""); - pw.println(""); - } + printTail(pw); + pw.close(); } catch (final Exception e) { /* Catch but log any IO exception that can occur on copy, automatic closing or streams creation */ ConcurrentLog.logException(e); @@ -942,15 +975,47 @@ public final class Fulltext { } public File file() { - return this.f; + final File f = new File(this.path, this.filename + "_" + chunkcount(this.chunkCount) + "." + this.fileext); + return f; + } + + private PrintWriter getWriter() throws IOException { + File f = file(); + final OutputStream os = new FileOutputStream(this.format == ExportFormat.solr ? new File(f.getAbsolutePath() + ".gz") : f); + final PrintWriter pw = new PrintWriter(new BufferedOutputStream(((this.format == ExportFormat.solr)) ? new GZIPOutputStream(os, 65536){{this.def.setLevel(Deflater.BEST_COMPRESSION);}} : os)); + this.chunkCount++; + return pw; + } + + private String chunkcount(int count) { + if (count < 10) return "000" + count; + if (count < 100) return "00" + count; + if (count < 1000) return "0" + count; + return "" + count; + } + + public File path() { + return this.path; + } + + public String filename() { + return this.filename; + } + + public String fileext() { + return this.fileext; } public String failed() { return this.failure; } - public int count() { - return this.count; + public int docCount() { + return this.docCount; + } + + public int chunkCount() { + return this.chunkCount; } @SuppressWarnings("unchecked")