From 8f876a8c7281ca8af0f128742d84c0467268f46c Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Tue, 30 Mar 2021 12:07:36 +0200 Subject: [PATCH] added concurrency to enhance indexing speed during json surrogate import --- source/net/yacy/search/Switchboard.java | 312 ++++++++++++++---------- 1 file changed, 181 insertions(+), 131 deletions(-) diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 0ce6a6cb9..85a78900f 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -78,6 +78,8 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; @@ -97,7 +99,6 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.core.SolrCore; import org.apache.solr.search.SyntaxError; -import org.eclipse.jetty.http.DateParser; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -129,6 +130,7 @@ import net.yacy.cora.federate.solr.connector.ShardSelection; import net.yacy.cora.federate.solr.instance.EmbeddedInstance; import net.yacy.cora.federate.solr.instance.RemoteInstance; import net.yacy.cora.federate.yacy.CacheStrategy; +import net.yacy.cora.language.synonyms.SynonymLibrary; import net.yacy.cora.lod.vocabulary.Tagging; import net.yacy.cora.order.Base64Order; import net.yacy.cora.order.Digest; @@ -2168,7 +2170,7 @@ public final class Switchboard extends serverSwitch { baos.write(buffer, 0, size); } baos.flush(); - processSurrogate(new ByteArrayInputStream(baos.toByteArray()), entry.getName()); + processSurrogateXML(new ByteArrayInputStream(baos.toByteArray()), entry.getName()); baos.close(); if (shallTerminate()) break; } @@ -2196,138 +2198,13 @@ public final class Switchboard extends serverSwitch { } return moved; } else if (s.endsWith(".jsonlist") || s.endsWith(".flatjson")) { - // parse a file that can be generated with yacy_grid_parser - // see https://github.com/yacy/yacy_grid_parser/blob/master/README.md - FileInputStream fis = null; - BufferedReader br = null; - VocabularyScraper scraper = new VocabularyScraper(); - try { - fis = new FileInputStream(infile); - InputStream is = new BufferedInputStream(fis); - br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)); - String line; - while ((line = br.readLine()) != null) { - JSONTokener jt = new JSONTokener(line); - JSONObject json = new JSONObject(jt); - if ((json.opt("index") != null && json.length() == 1) || json.length() == 0) continue; - SolrInputDocument surrogate = new SolrInputDocument(); - for (String key: json.keySet()) { - Object o = json.opt(key); - if (o == null) continue; - if (o instanceof JSONArray) { - // transform this into a list - JSONArray a = (JSONArray) o; - // patch altered yacy grid schema (yacy grid does not split url lists into protocol and urlstub) - if (key.equals("inboundlinks_sxt")) { - // compute inboundlinks_urlstub_sxt and inboundlinks_protocol_sxt - List urlstub = new ArrayList<>(); - List protocol = new ArrayList<>(); - for (int i = 0; i < a.length(); i++) { - AnchorURL b = new AnchorURL((String) a.get(i)); - urlstub.add(b.urlstub(true, true)); - protocol.add(b.getProtocol()); - } - CollectionSchema.inboundlinks_urlstub_sxt.add(surrogate, urlstub); - CollectionSchema.inboundlinks_protocol_sxt.add(surrogate, protocol); - } else if (key.equals("outboundlinks_sxt")) { - // compute outboundlinks_urlstub_sxt and outboundlinks_protocol_sxt - List urlstub = new ArrayList<>(); - List protocol = new ArrayList<>(); - for (int i = 0; i < a.length(); i++) { - AnchorURL b = new AnchorURL((String) a.get(i)); - urlstub.add(b.urlstub(true, true)); - protocol.add(b.getProtocol()); - } - CollectionSchema.outboundlinks_urlstub_sxt.add(surrogate, urlstub); - CollectionSchema.outboundlinks_protocol_sxt.add(surrogate, protocol); - } else if (key.equals("images_sxt")) { - // compute images_urlstub_sxt and images_protocol_sxt - List urlstub = new ArrayList<>(); - List protocol = new ArrayList<>(); - for (int i = 0; i < a.length(); i++) { - AnchorURL b = new AnchorURL((String) a.get(i)); - urlstub.add(b.urlstub(true, true)); - protocol.add(b.getProtocol()); - } - CollectionSchema.images_urlstub_sxt.add(surrogate, urlstub); - CollectionSchema.images_protocol_sxt.add(surrogate, protocol); - } else { - List list = new ArrayList<>(); - for (int i = 0; i < a.length(); i++) list.add(a.get(i)); - CollectionSchema schema = CollectionSchema.valueOf(key); - schema.add(surrogate, list); - } - } else { - CollectionSchema ctype = null; - try {ctype = CollectionSchema.valueOf(key);} catch (IllegalArgumentException e) {} - if (key.equals("url_s") || key.equals("sku")) { - ctype = CollectionSchema.sku; - // patch yacy grid altered schema (yacy grid does not have IDs any more, but they can be re-computed here) - DigestURL durl = new DigestURL(o.toString()); - String id = ASCII.String(durl.hash()); - surrogate.setField(CollectionSchema.sku.getSolrFieldName(), durl.toNormalform(true)); - surrogate.setField(CollectionSchema.id.getSolrFieldName(), id); - surrogate.setField(CollectionSchema.host_id_s.getSolrFieldName(), id.substring(6)); - } else if (key.equals("referrer_url_s")) { - DigestURL durl = new DigestURL(o.toString()); - String id = ASCII.String(durl.hash()); - surrogate.setField(CollectionSchema.referrer_id_s.getSolrFieldName(), id); - } else if (ctype != null && ctype.getType() == SolrType.date) { - // patch date into something that Solr can understand - String d = o.toString(); // i.e. Wed Apr 01 02:00:00 CEST 2020 - Date dd = d == null || d.length() == 0 ? null : AbstractFormatter.parseAny(d); - if (dd != null) surrogate.setField(ctype.getSolrFieldName(), ISO8601Formatter.FORMATTER.format(dd)); // solr dateTime is ISO8601 format - } else { - surrogate.setField(key, o.toString()); - } - } - } - - // enrich the surrogate - final String id = (String) surrogate.getFieldValue(CollectionSchema.id.getSolrFieldName()); - final String text = (String) surrogate.getFieldValue(CollectionSchema.text_t.getSolrFieldName()); - final DigestURL rootURL = new DigestURL((String) surrogate.getFieldValue(CollectionSchema.sku.getSolrFieldName()), ASCII.getBytes(id)); - if (text != null && text.length() > 0 && id != null ) { - // run the tokenizer on the text to get vocabularies and synonyms - final Tokenizer tokenizer = new Tokenizer(rootURL, text, LibraryProvider.dymLib, true, scraper); - final Map> facets = Document.computeGenericFacets(tokenizer.tags()); - // overwrite the given vocabularies and synonyms with new computed ones - Switchboard.this.index.fulltext().getDefaultConfiguration().enrich(surrogate, tokenizer.synonyms(), facets); - } - - Switchboard.this.index.putDocument(surrogate); - } - br.close(); - br = null; - fis = null; - moved = infile.renameTo(outfile); - } catch (IOException | JSONException ex) { - log.warn("IO Error processing flatjson file " + infile); - } finally { - /* Properly release file system resources even in failure cases */ - if(br != null) { - /* buffered reader was successfully created : close it and its underlying streams */ - try { - br.close(); - } catch (IOException e) { - log.warn("Could not close reader on file " + infile); - } - } else if(fis != null) { - /* no buffered reader : maybe a case of exhausted memory. Anyway file input stream has to be closed. */ - try { - fis.close(); - } catch (IOException e) { - log.warn("Could not close input stream on file " + infile); - } - } - } - return moved; + return processSurrogateJson(infile, outfile); } InputStream is = null; try { is = new BufferedInputStream(new FileInputStream(infile)); if (s.endsWith(".gz")) is = new GZIPInputStream(is, 65535); - processSurrogate(is, infile.getName()); + processSurrogateXML(is, infile.getName()); } catch (final IOException e ) { ConcurrentLog.logException(e); } finally { @@ -2366,7 +2243,180 @@ public final class Switchboard extends serverSwitch { return moved; } - public void processSurrogate(final InputStream is, final String name) throws IOException { + private boolean processSurrogateJson(File infile, File outfile) { + // parse a file that can be generated with yacy_grid_parser + // see https://github.com/yacy/yacy_grid_parser/blob/master/README.md + log.info("processing json surrogate " + infile); + long starttime = System.currentTimeMillis(); + + boolean moved = false; + FileInputStream fis = null; + BufferedReader br = null; + + // start indexer threads which mostly care about tokenization and facet + synonym enrichment + final int concurrency = Runtime.getRuntime().availableProcessors(); + final BlockingQueue sidQueue = new ArrayBlockingQueue<>(concurrency * 2); + final Thread[] indexer = new Thread[concurrency]; + for (int t = 0; t < indexer.length; t++) { + indexer[t] = new Thread("Switchboard.processSurrogateJson-" + t) { + @Override + public void run() { + VocabularyScraper scraper = new VocabularyScraper(); + SolrInputDocument sid; + try { + while ((sid = sidQueue.take()) != SurrogateReader.POISON_DOCUMENT ) { + // enrich the surrogate + final String id = (String) sid.getFieldValue(CollectionSchema.id.getSolrFieldName()); + final String text = (String) sid.getFieldValue(CollectionSchema.text_t.getSolrFieldName()); + DigestURL rootURL; + if (text != null && text.length() > 0 && id != null ) try { + if (SynonymLibrary.size() > 0 || !LibraryProvider.autotagging.isEmpty()) { + rootURL = new DigestURL((String) sid.getFieldValue(CollectionSchema.sku.getSolrFieldName()), ASCII.getBytes(id)); + // run the tokenizer on the text to get vocabularies and synonyms + final Tokenizer tokenizer = new Tokenizer(rootURL, text, LibraryProvider.dymLib, true, scraper); + final Map> facets = Document.computeGenericFacets(tokenizer.tags()); + // overwrite the given vocabularies and synonyms with new computed ones + Switchboard.this.index.fulltext().getDefaultConfiguration().enrich(sid, tokenizer.synonyms(), facets); + } + Switchboard.this.index.putDocument(sid); + } catch (MalformedURLException e) {} + } + } catch (InterruptedException e) { + } + } + }; + indexer[t].start(); + } + + try { + fis = new FileInputStream(infile); + InputStream is = new BufferedInputStream(fis); + br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)); + String line; + while ((line = br.readLine()) != null) { + JSONTokener jt = new JSONTokener(line); + JSONObject json = new JSONObject(jt); + if ((json.opt("index") != null && json.length() == 1) || json.length() == 0) continue; + SolrInputDocument surrogate = new SolrInputDocument(); + for (String key: json.keySet()) { + Object o = json.opt(key); + if (o == null) continue; + if (o instanceof JSONArray) { + // transform this into a list + JSONArray a = (JSONArray) o; + // patch altered yacy grid schema (yacy grid does not split url lists into protocol and urlstub) + if (key.equals("inboundlinks_sxt")) { + // compute inboundlinks_urlstub_sxt and inboundlinks_protocol_sxt + List urlstub = new ArrayList<>(); + List protocol = new ArrayList<>(); + for (int i = 0; i < a.length(); i++) { + AnchorURL b = new AnchorURL((String) a.get(i)); + urlstub.add(b.urlstub(true, true)); + protocol.add(b.getProtocol()); + } + CollectionSchema.inboundlinks_urlstub_sxt.add(surrogate, urlstub); + CollectionSchema.inboundlinks_protocol_sxt.add(surrogate, protocol); + } else if (key.equals("outboundlinks_sxt")) { + // compute outboundlinks_urlstub_sxt and outboundlinks_protocol_sxt + List urlstub = new ArrayList<>(); + List protocol = new ArrayList<>(); + for (int i = 0; i < a.length(); i++) { + AnchorURL b = new AnchorURL((String) a.get(i)); + urlstub.add(b.urlstub(true, true)); + protocol.add(b.getProtocol()); + } + CollectionSchema.outboundlinks_urlstub_sxt.add(surrogate, urlstub); + CollectionSchema.outboundlinks_protocol_sxt.add(surrogate, protocol); + } else if (key.equals("images_sxt")) { + // compute images_urlstub_sxt and images_protocol_sxt + List urlstub = new ArrayList<>(); + List protocol = new ArrayList<>(); + for (int i = 0; i < a.length(); i++) { + AnchorURL b = new AnchorURL((String) a.get(i)); + urlstub.add(b.urlstub(true, true)); + protocol.add(b.getProtocol()); + } + CollectionSchema.images_urlstub_sxt.add(surrogate, urlstub); + CollectionSchema.images_protocol_sxt.add(surrogate, protocol); + } else { + List list = new ArrayList<>(); + for (int i = 0; i < a.length(); i++) list.add(a.get(i)); + CollectionSchema schema = CollectionSchema.valueOf(key); + schema.add(surrogate, list); + } + } else { + CollectionSchema ctype = null; + try {ctype = CollectionSchema.valueOf(key);} catch (IllegalArgumentException e) {} + if (key.equals("url_s") || key.equals("sku")) { + ctype = CollectionSchema.sku; + // patch yacy grid altered schema (yacy grid does not have IDs any more, but they can be re-computed here) + DigestURL durl = new DigestURL(o.toString()); + String id = ASCII.String(durl.hash()); + surrogate.setField(CollectionSchema.sku.getSolrFieldName(), durl.toNormalform(true)); + surrogate.setField(CollectionSchema.id.getSolrFieldName(), id); + surrogate.setField(CollectionSchema.host_id_s.getSolrFieldName(), id.substring(6)); + } else if (key.equals("referrer_url_s")) { + DigestURL durl = new DigestURL(o.toString()); + String id = ASCII.String(durl.hash()); + surrogate.setField(CollectionSchema.referrer_id_s.getSolrFieldName(), id); + } else if (ctype != null && ctype.getType() == SolrType.date) { + // patch date into something that Solr can understand + String d = o.toString(); // i.e. Wed Apr 01 02:00:00 CEST 2020 + Date dd = d == null || d.length() == 0 ? null : AbstractFormatter.parseAny(d); + if (dd != null) surrogate.setField(ctype.getSolrFieldName(), ISO8601Formatter.FORMATTER.format(dd)); // solr dateTime is ISO8601 format + } else { + surrogate.setField(key, o.toString()); + } + } + } + + try { + sidQueue.put(surrogate); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + br.close(); + br = null; + fis = null; + + // finish indexing threads by giving them poison + for (int t = 0; t < indexer.length; t++) { + try {sidQueue.put(SurrogateReader.POISON_DOCUMENT);} catch (InterruptedException e) {} + } + // wait until indexer threads are finished + for (int t = 0; t < indexer.length; t++) { + try {indexer[t].join(10000);} catch (InterruptedException e) {} + } + + moved = infile.renameTo(outfile); + } catch (IOException | JSONException ex) { + log.warn("IO Error processing flatjson file " + infile); + } finally { + /* Properly release file system resources even in failure cases */ + if(br != null) { + /* buffered reader was successfully created : close it and its underlying streams */ + try { + br.close(); + } catch (IOException e) { + log.warn("Could not close reader on file " + infile); + } + } else if(fis != null) { + /* no buffered reader : maybe a case of exhausted memory. Anyway file input stream has to be closed. */ + try { + fis.close(); + } catch (IOException e) { + log.warn("Could not close input stream on file " + infile); + } + } + } + + log.info("finished processing json surrogate: " + ((System.currentTimeMillis() - starttime) / 1000) + " seconds"); + + return moved; + } + + private void processSurrogateXML(final InputStream is, final String name) throws IOException { final int concurrency = Runtime.getRuntime().availableProcessors(); // start reader thread @@ -2379,7 +2429,7 @@ public final class Switchboard extends serverSwitch { assert this.crawlStacker != null; Thread[] indexer = new Thread[concurrency]; for (int t = 0; t < concurrency; t++) { - indexer[t] = new Thread("Switchboard.processSurrogate-" + t) { + indexer[t] = new Thread("Switchboard.processSurrogateXML-" + t) { @Override public void run() { VocabularyScraper scraper = new VocabularyScraper();