From efa0425f00f3b0cfbacd3dc49f2a51040d57407e Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Sun, 23 Oct 2022 11:35:32 +0200 Subject: [PATCH] refactoring: moved jsonlist importer to importer class --- .../document/importer/JsonListImporter.java | 300 ++++++++++++++++++ source/net/yacy/search/Switchboard.java | 199 +----------- 2 files changed, 306 insertions(+), 193 deletions(-) create mode 100644 source/net/yacy/document/importer/JsonListImporter.java diff --git a/source/net/yacy/document/importer/JsonListImporter.java b/source/net/yacy/document/importer/JsonListImporter.java new file mode 100644 index 000000000..3fc81457e --- /dev/null +++ b/source/net/yacy/document/importer/JsonListImporter.java @@ -0,0 +1,300 @@ +/** + * JsonListImporter + * Copyright 23.10.2022 by Michael Peter Christen, @orbiterlab + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program in the file lgpl21.txt + * If not, see . + */ + +package net.yacy.document.importer; + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.MalformedURLException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.zip.GZIPInputStream; + +import org.apache.solr.common.SolrInputDocument; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; + +import net.yacy.cora.date.AbstractFormatter; +import net.yacy.cora.date.ISO8601Formatter; +import net.yacy.cora.document.encoding.ASCII; +import net.yacy.cora.document.id.AnchorURL; +import net.yacy.cora.document.id.DigestURL; +import net.yacy.cora.federate.solr.SolrType; +import net.yacy.cora.language.synonyms.SynonymLibrary; +import net.yacy.cora.util.ConcurrentLog; +import net.yacy.document.Document; +import net.yacy.document.LibraryProvider; +import net.yacy.document.Tokenizer; +import net.yacy.document.VocabularyScraper; +import net.yacy.document.content.SurrogateReader; +import net.yacy.search.Switchboard; +import net.yacy.search.schema.CollectionSchema; + +public class JsonListImporter extends Thread implements Importer { + + private static ConcurrentLog log = new ConcurrentLog("JsonListImporter"); + public static JsonListImporter job = null; + + private InputStream source; + private final String name; + + private final long sourceSize; + private long lineCount, startTime, consumed; + private boolean abort = false; + + public JsonListImporter(final File f) throws IOException { + super("JsonListImporter - from file " + f.getName()); + this.lineCount = 0; + this.consumed = 0; + this.name = f.getName(); + this.sourceSize = f.length(); + this.source = new FileInputStream(f); + if (this.name.endsWith(".gz")) this.source = new GZIPInputStream(this.source); + } + + @Override + public void run() { + try { + processSurrogateJson(); + } catch (final IOException e) { + log.warn(e); + } + } + + public void processSurrogateJson() throws IOException { + this.startTime = System.currentTimeMillis(); + + // start indexer threads which mostly care about tokenization and facet + synonym enrichment + final int concurrency = Runtime.getRuntime().availableProcessors(); + final BlockingQueue sidQueue = new ArrayBlockingQueue<>(concurrency * 2); + final Thread[] indexer = new Thread[concurrency]; + for (int t = 0; t < indexer.length; t++) { + indexer[t] = new Thread("Switchboard.processSurrogateJson-" + t) { + @Override + public void run() { + final VocabularyScraper scraper = new VocabularyScraper(); + SolrInputDocument sid; + try { + while ((sid = sidQueue.take()) != SurrogateReader.POISON_DOCUMENT ) { + // enrich the surrogate + final String id = (String) sid.getFieldValue(CollectionSchema.id.getSolrFieldName()); + final String text = (String) sid.getFieldValue(CollectionSchema.text_t.getSolrFieldName()); + DigestURL rootURL; + if (text != null && text.length() > 0 && id != null ) try { + if (SynonymLibrary.size() > 0 || !LibraryProvider.autotagging.isEmpty()) { + rootURL = new DigestURL((String) sid.getFieldValue(CollectionSchema.sku.getSolrFieldName()), ASCII.getBytes(id)); + // run the tokenizer on the text to get vocabularies and synonyms + final Tokenizer tokenizer = new Tokenizer(rootURL, text, LibraryProvider.dymLib, true, scraper); + final Map> facets = Document.computeGenericFacets(tokenizer.tags()); + // overwrite the given vocabularies and synonyms with new computed ones + Switchboard.getSwitchboard().index.fulltext().getDefaultConfiguration().enrich(sid, tokenizer.synonyms(), facets); + } + Switchboard.getSwitchboard().index.putDocument(sid); + } catch (final MalformedURLException e) {} + } + } catch (final InterruptedException e) { + } + } + }; + indexer[t].start(); + } + + final InputStream bis = new BufferedInputStream(this.source); + BufferedReader br = new BufferedReader(new InputStreamReader(bis, StandardCharsets.UTF_8)); + String line; + while ((line = br.readLine()) != null) { + if (this.abort) break; + final JSONTokener jt = new JSONTokener(line); + JSONObject json = null; + try { + json = new JSONObject(jt); + } catch (final JSONException e1) { + throw new IOException(e1.getMessage()); + } + if ((json.opt("index") != null && json.length() == 1) || json.length() == 0) continue; + final SolrInputDocument surrogate = new SolrInputDocument(); + jsonreader: for (final String key: json.keySet()) { + final Object o = json.opt(key); + if (o == null) continue; + if (o instanceof JSONArray) { + // transform this into a list + final JSONArray a = (JSONArray) o; + // patch altered yacy grid schema (yacy grid does not split url lists into protocol and urlstub) + if (key.equals("inboundlinks_sxt")) { + // compute inboundlinks_urlstub_sxt and inboundlinks_protocol_sxt + final List urlstub = new ArrayList<>(); + final List protocol = new ArrayList<>(); + for (int i = 0; i < a.length(); i++) { + final AnchorURL b = new AnchorURL((String) a.opt(i)); + urlstub.add(b.urlstub(true, true)); + protocol.add(b.getProtocol()); + } + CollectionSchema.inboundlinks_urlstub_sxt.add(surrogate, urlstub); + CollectionSchema.inboundlinks_protocol_sxt.add(surrogate, protocol); + continue jsonreader; + } + if (key.equals("outboundlinks_sxt")) { + // compute outboundlinks_urlstub_sxt and outboundlinks_protocol_sxt + final List urlstub = new ArrayList<>(); + final List protocol = new ArrayList<>(); + for (int i = 0; i < a.length(); i++) { + final AnchorURL b = new AnchorURL((String) a.opt(i)); + urlstub.add(b.urlstub(true, true)); + protocol.add(b.getProtocol()); + } + CollectionSchema.outboundlinks_urlstub_sxt.add(surrogate, urlstub); + CollectionSchema.outboundlinks_protocol_sxt.add(surrogate, protocol); + continue jsonreader; + } + if (key.equals("images_sxt")) { + // compute images_urlstub_sxt and images_protocol_sxt + final List urlstub = new ArrayList<>(); + final List protocol = new ArrayList<>(); + for (int i = 0; i < a.length(); i++) { + final AnchorURL b = new AnchorURL((String) a.opt(i)); + urlstub.add(b.urlstub(true, true)); + protocol.add(b.getProtocol()); + } + CollectionSchema.images_urlstub_sxt.add(surrogate, urlstub); + CollectionSchema.images_protocol_sxt.add(surrogate, protocol); + continue jsonreader; + } + + // prepare to read key type + CollectionSchema ctype = null; + try {ctype = CollectionSchema.valueOf(key);} catch (final Exception e) { + log.warn("unknown key for CollectionSchema: " + key); + continue jsonreader; + } + final List list = new ArrayList<>(); + for (int i = 0; i < a.length(); i++) list.add(a.opt(i)); + ctype.add(surrogate, list); + } else { + // first handle exceptional keys / maybe patch for other systems + other names + if (key.equals("url_s") || key.equals("sku")) { + // patch yacy grid altered schema (yacy grid does not have IDs any more, but they can be re-computed here) + final DigestURL durl = new DigestURL(o.toString()); + final String id = ASCII.String(durl.hash()); + surrogate.setField(CollectionSchema.sku.getSolrFieldName(), durl.toNormalform(true)); + surrogate.setField(CollectionSchema.id.getSolrFieldName(), id); + surrogate.setField(CollectionSchema.host_id_s.getSolrFieldName(), id.substring(6)); + continue jsonreader; + } + if (key.equals("referrer_url_s")) { + final DigestURL durl = new DigestURL(o.toString()); + final String id = ASCII.String(durl.hash()); + surrogate.setField(CollectionSchema.referrer_id_s.getSolrFieldName(), id); + continue jsonreader; + } + + // prepare to read key type + CollectionSchema ctype = null; + try {ctype = CollectionSchema.valueOf(key);} catch (final Exception e) { + log.warn("unknown key for CollectionSchema: " + key); + continue jsonreader; + } + if (ctype != null && ctype.getType() == SolrType.date) { + // patch date into something that Solr can understand + final String d = o.toString(); // i.e. Wed Apr 01 02:00:00 CEST 2020 + final Date dd = d == null || d.length() == 0 ? null : AbstractFormatter.parseAny(d); + if (dd != null) surrogate.setField(ctype.getSolrFieldName(), ISO8601Formatter.FORMATTER.format(dd)); // solr dateTime is ISO8601 format + continue jsonreader; + } + + // regular situation, just read content of field + surrogate.setField(key, o.toString()); + } + } + + try { + sidQueue.put(surrogate); + } catch (final InterruptedException e) { + e.printStackTrace(); + } + this.lineCount++; + this.consumed += line.length(); + } + br.close(); + br = null; + + // finish indexing threads by giving them poison + for (int t = 0; t < indexer.length; t++) { + try {sidQueue.put(SurrogateReader.POISON_DOCUMENT);} catch (final InterruptedException e) {} + } + // wait until indexer threads are finished + for (int t = 0; t < indexer.length; t++) { + try {indexer[t].join(10000);} catch (final InterruptedException e) {} + } + + log.info("finished processing json surrogate: " + ((System.currentTimeMillis() - this.startTime) / 1000) + " seconds"); + } + + public void quit() { + this.abort = true; + } + + @Override + public String source() { + return this.name; + } + + @Override + public int count() { + return (int) this.lineCount; + } + + @Override + public int speed() { + if (this.lineCount == 0) return 0; + return (int) (this.lineCount / Math.max(0L, runningTime() )); + } + + @Override + public long runningTime() { + return (System.currentTimeMillis() - this.startTime) / 1000L; + } + + @Override + public long remainingTime() { + if (this.consumed == 0) { + return 0; + } + final long speed = this.consumed / runningTime(); + return (this.sourceSize - this.consumed) / speed; + } + + @Override + public String status() { + return ""; + } + +} diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index 3b38970da..68ff7b426 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -55,7 +55,6 @@ import java.net.InetAddress; import java.net.MalformedURLException; import java.net.ServerSocket; import java.net.Socket; -import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.security.PublicKey; import java.security.spec.InvalidKeySpecException; @@ -99,18 +98,12 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.core.SolrCore; import org.apache.solr.search.SyntaxError; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; -import org.json.JSONTokener; import com.cybozu.labs.langdetect.DetectorFactory; import com.cybozu.labs.langdetect.LangDetectException; import com.google.common.io.Files; -import net.yacy.cora.date.AbstractFormatter; import net.yacy.cora.date.GenericFormatter; -import net.yacy.cora.date.ISO8601Formatter; import net.yacy.cora.document.WordCache; import net.yacy.cora.document.analysis.Classification; import net.yacy.cora.document.encoding.ASCII; @@ -123,12 +116,10 @@ import net.yacy.cora.document.id.DigestURL; import net.yacy.cora.document.id.MultiProtocolURL; import net.yacy.cora.federate.solr.FailCategory; import net.yacy.cora.federate.solr.Ranking; -import net.yacy.cora.federate.solr.SolrType; import net.yacy.cora.federate.solr.connector.ShardSelection; import net.yacy.cora.federate.solr.instance.EmbeddedInstance; import net.yacy.cora.federate.solr.instance.RemoteInstance; import net.yacy.cora.federate.yacy.CacheStrategy; -import net.yacy.cora.language.synonyms.SynonymLibrary; import net.yacy.cora.lod.vocabulary.Tagging; import net.yacy.cora.order.Base64Order; import net.yacy.cora.order.Digest; @@ -182,6 +173,7 @@ import net.yacy.document.Tokenizer; import net.yacy.document.VocabularyScraper; import net.yacy.document.content.DCEntry; import net.yacy.document.content.SurrogateReader; +import net.yacy.document.importer.JsonListImporter; import net.yacy.document.importer.OAIListFriendsLoader; import net.yacy.document.importer.WarcImporter; import net.yacy.document.parser.audioTagParser; @@ -2213,193 +2205,14 @@ public final class Switchboard extends serverSwitch { // parse a file that can be generated with yacy_grid_parser // see https://github.com/yacy/yacy_grid_parser/blob/master/README.md this.log.info("processing json surrogate " + infile); - final long starttime = System.currentTimeMillis(); - - boolean moved = false; - InputStream fis = null; - BufferedReader br = null; - - // start indexer threads which mostly care about tokenization and facet + synonym enrichment - final int concurrency = Runtime.getRuntime().availableProcessors(); - final BlockingQueue sidQueue = new ArrayBlockingQueue<>(concurrency * 2); - final Thread[] indexer = new Thread[concurrency]; - for (int t = 0; t < indexer.length; t++) { - indexer[t] = new Thread("Switchboard.processSurrogateJson-" + t) { - @Override - public void run() { - final VocabularyScraper scraper = new VocabularyScraper(); - SolrInputDocument sid; - try { - while ((sid = sidQueue.take()) != SurrogateReader.POISON_DOCUMENT ) { - // enrich the surrogate - final String id = (String) sid.getFieldValue(CollectionSchema.id.getSolrFieldName()); - final String text = (String) sid.getFieldValue(CollectionSchema.text_t.getSolrFieldName()); - DigestURL rootURL; - if (text != null && text.length() > 0 && id != null ) try { - if (SynonymLibrary.size() > 0 || !LibraryProvider.autotagging.isEmpty()) { - rootURL = new DigestURL((String) sid.getFieldValue(CollectionSchema.sku.getSolrFieldName()), ASCII.getBytes(id)); - // run the tokenizer on the text to get vocabularies and synonyms - final Tokenizer tokenizer = new Tokenizer(rootURL, text, LibraryProvider.dymLib, true, scraper); - final Map> facets = Document.computeGenericFacets(tokenizer.tags()); - // overwrite the given vocabularies and synonyms with new computed ones - Switchboard.this.index.fulltext().getDefaultConfiguration().enrich(sid, tokenizer.synonyms(), facets); - } - Switchboard.this.index.putDocument(sid); - } catch (final MalformedURLException e) {} - } - } catch (final InterruptedException e) { - } - } - }; - indexer[t].start(); - } - try { - fis = infile.getName().endsWith(".gz") ? new GZIPInputStream(new FileInputStream(infile)) : new FileInputStream(infile); - final InputStream bis = new BufferedInputStream(fis); - br = new BufferedReader(new InputStreamReader(bis, StandardCharsets.UTF_8)); - String line; - while ((line = br.readLine()) != null) { - final JSONTokener jt = new JSONTokener(line); - final JSONObject json = new JSONObject(jt); - if ((json.opt("index") != null && json.length() == 1) || json.length() == 0) continue; - final SolrInputDocument surrogate = new SolrInputDocument(); - jsonreader: for (final String key: json.keySet()) { - final Object o = json.opt(key); - if (o == null) continue; - if (o instanceof JSONArray) { - // transform this into a list - final JSONArray a = (JSONArray) o; - // patch altered yacy grid schema (yacy grid does not split url lists into protocol and urlstub) - if (key.equals("inboundlinks_sxt")) { - // compute inboundlinks_urlstub_sxt and inboundlinks_protocol_sxt - final List urlstub = new ArrayList<>(); - final List protocol = new ArrayList<>(); - for (int i = 0; i < a.length(); i++) { - final AnchorURL b = new AnchorURL((String) a.get(i)); - urlstub.add(b.urlstub(true, true)); - protocol.add(b.getProtocol()); - } - CollectionSchema.inboundlinks_urlstub_sxt.add(surrogate, urlstub); - CollectionSchema.inboundlinks_protocol_sxt.add(surrogate, protocol); - continue jsonreader; - } - if (key.equals("outboundlinks_sxt")) { - // compute outboundlinks_urlstub_sxt and outboundlinks_protocol_sxt - final List urlstub = new ArrayList<>(); - final List protocol = new ArrayList<>(); - for (int i = 0; i < a.length(); i++) { - final AnchorURL b = new AnchorURL((String) a.get(i)); - urlstub.add(b.urlstub(true, true)); - protocol.add(b.getProtocol()); - } - CollectionSchema.outboundlinks_urlstub_sxt.add(surrogate, urlstub); - CollectionSchema.outboundlinks_protocol_sxt.add(surrogate, protocol); - continue jsonreader; - } - if (key.equals("images_sxt")) { - // compute images_urlstub_sxt and images_protocol_sxt - final List urlstub = new ArrayList<>(); - final List protocol = new ArrayList<>(); - for (int i = 0; i < a.length(); i++) { - final AnchorURL b = new AnchorURL((String) a.get(i)); - urlstub.add(b.urlstub(true, true)); - protocol.add(b.getProtocol()); - } - CollectionSchema.images_urlstub_sxt.add(surrogate, urlstub); - CollectionSchema.images_protocol_sxt.add(surrogate, protocol); - continue jsonreader; - } - - // prepare to read key type - CollectionSchema ctype = null; - try {ctype = CollectionSchema.valueOf(key);} catch (final Exception e) { - this.log.warn("unknown key for CollectionSchema: " + key); - continue jsonreader; - } - final List list = new ArrayList<>(); - for (int i = 0; i < a.length(); i++) list.add(a.get(i)); - ctype.add(surrogate, list); - } else { - // first handle exceptional keys / maybe patch for other systems + other names - if (key.equals("url_s") || key.equals("sku")) { - // patch yacy grid altered schema (yacy grid does not have IDs any more, but they can be re-computed here) - final DigestURL durl = new DigestURL(o.toString()); - final String id = ASCII.String(durl.hash()); - surrogate.setField(CollectionSchema.sku.getSolrFieldName(), durl.toNormalform(true)); - surrogate.setField(CollectionSchema.id.getSolrFieldName(), id); - surrogate.setField(CollectionSchema.host_id_s.getSolrFieldName(), id.substring(6)); - continue jsonreader; - } - if (key.equals("referrer_url_s")) { - final DigestURL durl = new DigestURL(o.toString()); - final String id = ASCII.String(durl.hash()); - surrogate.setField(CollectionSchema.referrer_id_s.getSolrFieldName(), id); - continue jsonreader; - } - - // prepare to read key type - CollectionSchema ctype = null; - try {ctype = CollectionSchema.valueOf(key);} catch (final Exception e) { - this.log.warn("unknown key for CollectionSchema: " + key); - continue jsonreader; - } - if (ctype != null && ctype.getType() == SolrType.date) { - // patch date into something that Solr can understand - final String d = o.toString(); // i.e. Wed Apr 01 02:00:00 CEST 2020 - final Date dd = d == null || d.length() == 0 ? null : AbstractFormatter.parseAny(d); - if (dd != null) surrogate.setField(ctype.getSolrFieldName(), ISO8601Formatter.FORMATTER.format(dd)); // solr dateTime is ISO8601 format - continue jsonreader; - } - - // regular situation, just read content of field - surrogate.setField(key, o.toString()); - } - } - - try { - sidQueue.put(surrogate); - } catch (final InterruptedException e) { - e.printStackTrace(); - } - } - br.close(); - br = null; - fis = null; - - // finish indexing threads by giving them poison - for (int t = 0; t < indexer.length; t++) { - try {sidQueue.put(SurrogateReader.POISON_DOCUMENT);} catch (final InterruptedException e) {} - } - // wait until indexer threads are finished - for (int t = 0; t < indexer.length; t++) { - try {indexer[t].join(10000);} catch (final InterruptedException e) {} - } - - moved = infile.renameTo(outfile); - } catch (IOException | JSONException ex) { - this.log.warn("IO Error processing flatjson file " + infile); - } finally { - /* Properly release file system resources even in failure cases */ - if(br != null) { - /* buffered reader was successfully created : close it and its underlying streams */ - try { - br.close(); - } catch (final IOException e) { - this.log.warn("Could not close reader on file " + infile); - } - } else if(fis != null) { - /* no buffered reader : maybe a case of exhausted memory. Anyway file input stream has to be closed. */ - try { - fis.close(); - } catch (final IOException e) { - this.log.warn("Could not close input stream on file " + infile); - } - } + final JsonListImporter importer = new JsonListImporter(infile); + importer.run(); + } catch (final IOException e) { + this.log.warn(e); } - this.log.info("finished processing json surrogate: " + ((System.currentTimeMillis() - starttime) / 1000) + " seconds"); - + final boolean moved = infile.renameTo(outfile); return moved; }