added concurrency to enhance indexing speed during json surrogate import

pull/408/head
Michael Peter Christen 4 years ago
parent f8cbaeef93
commit 8f876a8c72

@ -78,6 +78,8 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
@ -97,7 +99,6 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.core.SolrCore;
import org.apache.solr.search.SyntaxError;
import org.eclipse.jetty.http.DateParser;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
@ -129,6 +130,7 @@ import net.yacy.cora.federate.solr.connector.ShardSelection;
import net.yacy.cora.federate.solr.instance.EmbeddedInstance;
import net.yacy.cora.federate.solr.instance.RemoteInstance;
import net.yacy.cora.federate.yacy.CacheStrategy;
import net.yacy.cora.language.synonyms.SynonymLibrary;
import net.yacy.cora.lod.vocabulary.Tagging;
import net.yacy.cora.order.Base64Order;
import net.yacy.cora.order.Digest;
@ -2168,7 +2170,7 @@ public final class Switchboard extends serverSwitch {
baos.write(buffer, 0, size);
}
baos.flush();
processSurrogate(new ByteArrayInputStream(baos.toByteArray()), entry.getName());
processSurrogateXML(new ByteArrayInputStream(baos.toByteArray()), entry.getName());
baos.close();
if (shallTerminate()) break;
}
@ -2196,138 +2198,13 @@ public final class Switchboard extends serverSwitch {
}
return moved;
} else if (s.endsWith(".jsonlist") || s.endsWith(".flatjson")) {
// parse a file that can be generated with yacy_grid_parser
// see https://github.com/yacy/yacy_grid_parser/blob/master/README.md
FileInputStream fis = null;
BufferedReader br = null;
VocabularyScraper scraper = new VocabularyScraper();
try {
fis = new FileInputStream(infile);
InputStream is = new BufferedInputStream(fis);
br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
String line;
while ((line = br.readLine()) != null) {
JSONTokener jt = new JSONTokener(line);
JSONObject json = new JSONObject(jt);
if ((json.opt("index") != null && json.length() == 1) || json.length() == 0) continue;
SolrInputDocument surrogate = new SolrInputDocument();
for (String key: json.keySet()) {
Object o = json.opt(key);
if (o == null) continue;
if (o instanceof JSONArray) {
// transform this into a list
JSONArray a = (JSONArray) o;
// patch altered yacy grid schema (yacy grid does not split url lists into protocol and urlstub)
if (key.equals("inboundlinks_sxt")) {
// compute inboundlinks_urlstub_sxt and inboundlinks_protocol_sxt
List<Object> urlstub = new ArrayList<>();
List<Object> protocol = new ArrayList<>();
for (int i = 0; i < a.length(); i++) {
AnchorURL b = new AnchorURL((String) a.get(i));
urlstub.add(b.urlstub(true, true));
protocol.add(b.getProtocol());
}
CollectionSchema.inboundlinks_urlstub_sxt.add(surrogate, urlstub);
CollectionSchema.inboundlinks_protocol_sxt.add(surrogate, protocol);
} else if (key.equals("outboundlinks_sxt")) {
// compute outboundlinks_urlstub_sxt and outboundlinks_protocol_sxt
List<Object> urlstub = new ArrayList<>();
List<Object> protocol = new ArrayList<>();
for (int i = 0; i < a.length(); i++) {
AnchorURL b = new AnchorURL((String) a.get(i));
urlstub.add(b.urlstub(true, true));
protocol.add(b.getProtocol());
}
CollectionSchema.outboundlinks_urlstub_sxt.add(surrogate, urlstub);
CollectionSchema.outboundlinks_protocol_sxt.add(surrogate, protocol);
} else if (key.equals("images_sxt")) {
// compute images_urlstub_sxt and images_protocol_sxt
List<Object> urlstub = new ArrayList<>();
List<Object> protocol = new ArrayList<>();
for (int i = 0; i < a.length(); i++) {
AnchorURL b = new AnchorURL((String) a.get(i));
urlstub.add(b.urlstub(true, true));
protocol.add(b.getProtocol());
}
CollectionSchema.images_urlstub_sxt.add(surrogate, urlstub);
CollectionSchema.images_protocol_sxt.add(surrogate, protocol);
} else {
List<Object> list = new ArrayList<>();
for (int i = 0; i < a.length(); i++) list.add(a.get(i));
CollectionSchema schema = CollectionSchema.valueOf(key);
schema.add(surrogate, list);
}
} else {
CollectionSchema ctype = null;
try {ctype = CollectionSchema.valueOf(key);} catch (IllegalArgumentException e) {}
if (key.equals("url_s") || key.equals("sku")) {
ctype = CollectionSchema.sku;
// patch yacy grid altered schema (yacy grid does not have IDs any more, but they can be re-computed here)
DigestURL durl = new DigestURL(o.toString());
String id = ASCII.String(durl.hash());
surrogate.setField(CollectionSchema.sku.getSolrFieldName(), durl.toNormalform(true));
surrogate.setField(CollectionSchema.id.getSolrFieldName(), id);
surrogate.setField(CollectionSchema.host_id_s.getSolrFieldName(), id.substring(6));
} else if (key.equals("referrer_url_s")) {
DigestURL durl = new DigestURL(o.toString());
String id = ASCII.String(durl.hash());
surrogate.setField(CollectionSchema.referrer_id_s.getSolrFieldName(), id);
} else if (ctype != null && ctype.getType() == SolrType.date) {
// patch date into something that Solr can understand
String d = o.toString(); // i.e. Wed Apr 01 02:00:00 CEST 2020
Date dd = d == null || d.length() == 0 ? null : AbstractFormatter.parseAny(d);
if (dd != null) surrogate.setField(ctype.getSolrFieldName(), ISO8601Formatter.FORMATTER.format(dd)); // solr dateTime is ISO8601 format
} else {
surrogate.setField(key, o.toString());
}
}
}
// enrich the surrogate
final String id = (String) surrogate.getFieldValue(CollectionSchema.id.getSolrFieldName());
final String text = (String) surrogate.getFieldValue(CollectionSchema.text_t.getSolrFieldName());
final DigestURL rootURL = new DigestURL((String) surrogate.getFieldValue(CollectionSchema.sku.getSolrFieldName()), ASCII.getBytes(id));
if (text != null && text.length() > 0 && id != null ) {
// run the tokenizer on the text to get vocabularies and synonyms
final Tokenizer tokenizer = new Tokenizer(rootURL, text, LibraryProvider.dymLib, true, scraper);
final Map<String, Set<String>> facets = Document.computeGenericFacets(tokenizer.tags());
// overwrite the given vocabularies and synonyms with new computed ones
Switchboard.this.index.fulltext().getDefaultConfiguration().enrich(surrogate, tokenizer.synonyms(), facets);
}
Switchboard.this.index.putDocument(surrogate);
}
br.close();
br = null;
fis = null;
moved = infile.renameTo(outfile);
} catch (IOException | JSONException ex) {
log.warn("IO Error processing flatjson file " + infile);
} finally {
/* Properly release file system resources even in failure cases */
if(br != null) {
/* buffered reader was successfully created : close it and its underlying streams */
try {
br.close();
} catch (IOException e) {
log.warn("Could not close reader on file " + infile);
}
} else if(fis != null) {
/* no buffered reader : maybe a case of exhausted memory. Anyway file input stream has to be closed. */
try {
fis.close();
} catch (IOException e) {
log.warn("Could not close input stream on file " + infile);
}
}
}
return moved;
return processSurrogateJson(infile, outfile);
}
InputStream is = null;
try {
is = new BufferedInputStream(new FileInputStream(infile));
if (s.endsWith(".gz")) is = new GZIPInputStream(is, 65535);
processSurrogate(is, infile.getName());
processSurrogateXML(is, infile.getName());
} catch (final IOException e ) {
ConcurrentLog.logException(e);
} finally {
@ -2366,7 +2243,180 @@ public final class Switchboard extends serverSwitch {
return moved;
}
public void processSurrogate(final InputStream is, final String name) throws IOException {
private boolean processSurrogateJson(File infile, File outfile) {
// parse a file that can be generated with yacy_grid_parser
// see https://github.com/yacy/yacy_grid_parser/blob/master/README.md
log.info("processing json surrogate " + infile);
long starttime = System.currentTimeMillis();
boolean moved = false;
FileInputStream fis = null;
BufferedReader br = null;
// start indexer threads which mostly care about tokenization and facet + synonym enrichment
final int concurrency = Runtime.getRuntime().availableProcessors();
final BlockingQueue<SolrInputDocument> sidQueue = new ArrayBlockingQueue<>(concurrency * 2);
final Thread[] indexer = new Thread[concurrency];
for (int t = 0; t < indexer.length; t++) {
indexer[t] = new Thread("Switchboard.processSurrogateJson-" + t) {
@Override
public void run() {
VocabularyScraper scraper = new VocabularyScraper();
SolrInputDocument sid;
try {
while ((sid = sidQueue.take()) != SurrogateReader.POISON_DOCUMENT ) {
// enrich the surrogate
final String id = (String) sid.getFieldValue(CollectionSchema.id.getSolrFieldName());
final String text = (String) sid.getFieldValue(CollectionSchema.text_t.getSolrFieldName());
DigestURL rootURL;
if (text != null && text.length() > 0 && id != null ) try {
if (SynonymLibrary.size() > 0 || !LibraryProvider.autotagging.isEmpty()) {
rootURL = new DigestURL((String) sid.getFieldValue(CollectionSchema.sku.getSolrFieldName()), ASCII.getBytes(id));
// run the tokenizer on the text to get vocabularies and synonyms
final Tokenizer tokenizer = new Tokenizer(rootURL, text, LibraryProvider.dymLib, true, scraper);
final Map<String, Set<String>> facets = Document.computeGenericFacets(tokenizer.tags());
// overwrite the given vocabularies and synonyms with new computed ones
Switchboard.this.index.fulltext().getDefaultConfiguration().enrich(sid, tokenizer.synonyms(), facets);
}
Switchboard.this.index.putDocument(sid);
} catch (MalformedURLException e) {}
}
} catch (InterruptedException e) {
}
}
};
indexer[t].start();
}
try {
fis = new FileInputStream(infile);
InputStream is = new BufferedInputStream(fis);
br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
String line;
while ((line = br.readLine()) != null) {
JSONTokener jt = new JSONTokener(line);
JSONObject json = new JSONObject(jt);
if ((json.opt("index") != null && json.length() == 1) || json.length() == 0) continue;
SolrInputDocument surrogate = new SolrInputDocument();
for (String key: json.keySet()) {
Object o = json.opt(key);
if (o == null) continue;
if (o instanceof JSONArray) {
// transform this into a list
JSONArray a = (JSONArray) o;
// patch altered yacy grid schema (yacy grid does not split url lists into protocol and urlstub)
if (key.equals("inboundlinks_sxt")) {
// compute inboundlinks_urlstub_sxt and inboundlinks_protocol_sxt
List<Object> urlstub = new ArrayList<>();
List<Object> protocol = new ArrayList<>();
for (int i = 0; i < a.length(); i++) {
AnchorURL b = new AnchorURL((String) a.get(i));
urlstub.add(b.urlstub(true, true));
protocol.add(b.getProtocol());
}
CollectionSchema.inboundlinks_urlstub_sxt.add(surrogate, urlstub);
CollectionSchema.inboundlinks_protocol_sxt.add(surrogate, protocol);
} else if (key.equals("outboundlinks_sxt")) {
// compute outboundlinks_urlstub_sxt and outboundlinks_protocol_sxt
List<Object> urlstub = new ArrayList<>();
List<Object> protocol = new ArrayList<>();
for (int i = 0; i < a.length(); i++) {
AnchorURL b = new AnchorURL((String) a.get(i));
urlstub.add(b.urlstub(true, true));
protocol.add(b.getProtocol());
}
CollectionSchema.outboundlinks_urlstub_sxt.add(surrogate, urlstub);
CollectionSchema.outboundlinks_protocol_sxt.add(surrogate, protocol);
} else if (key.equals("images_sxt")) {
// compute images_urlstub_sxt and images_protocol_sxt
List<Object> urlstub = new ArrayList<>();
List<Object> protocol = new ArrayList<>();
for (int i = 0; i < a.length(); i++) {
AnchorURL b = new AnchorURL((String) a.get(i));
urlstub.add(b.urlstub(true, true));
protocol.add(b.getProtocol());
}
CollectionSchema.images_urlstub_sxt.add(surrogate, urlstub);
CollectionSchema.images_protocol_sxt.add(surrogate, protocol);
} else {
List<Object> list = new ArrayList<>();
for (int i = 0; i < a.length(); i++) list.add(a.get(i));
CollectionSchema schema = CollectionSchema.valueOf(key);
schema.add(surrogate, list);
}
} else {
CollectionSchema ctype = null;
try {ctype = CollectionSchema.valueOf(key);} catch (IllegalArgumentException e) {}
if (key.equals("url_s") || key.equals("sku")) {
ctype = CollectionSchema.sku;
// patch yacy grid altered schema (yacy grid does not have IDs any more, but they can be re-computed here)
DigestURL durl = new DigestURL(o.toString());
String id = ASCII.String(durl.hash());
surrogate.setField(CollectionSchema.sku.getSolrFieldName(), durl.toNormalform(true));
surrogate.setField(CollectionSchema.id.getSolrFieldName(), id);
surrogate.setField(CollectionSchema.host_id_s.getSolrFieldName(), id.substring(6));
} else if (key.equals("referrer_url_s")) {
DigestURL durl = new DigestURL(o.toString());
String id = ASCII.String(durl.hash());
surrogate.setField(CollectionSchema.referrer_id_s.getSolrFieldName(), id);
} else if (ctype != null && ctype.getType() == SolrType.date) {
// patch date into something that Solr can understand
String d = o.toString(); // i.e. Wed Apr 01 02:00:00 CEST 2020
Date dd = d == null || d.length() == 0 ? null : AbstractFormatter.parseAny(d);
if (dd != null) surrogate.setField(ctype.getSolrFieldName(), ISO8601Formatter.FORMATTER.format(dd)); // solr dateTime is ISO8601 format
} else {
surrogate.setField(key, o.toString());
}
}
}
try {
sidQueue.put(surrogate);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
br.close();
br = null;
fis = null;
// finish indexing threads by giving them poison
for (int t = 0; t < indexer.length; t++) {
try {sidQueue.put(SurrogateReader.POISON_DOCUMENT);} catch (InterruptedException e) {}
}
// wait until indexer threads are finished
for (int t = 0; t < indexer.length; t++) {
try {indexer[t].join(10000);} catch (InterruptedException e) {}
}
moved = infile.renameTo(outfile);
} catch (IOException | JSONException ex) {
log.warn("IO Error processing flatjson file " + infile);
} finally {
/* Properly release file system resources even in failure cases */
if(br != null) {
/* buffered reader was successfully created : close it and its underlying streams */
try {
br.close();
} catch (IOException e) {
log.warn("Could not close reader on file " + infile);
}
} else if(fis != null) {
/* no buffered reader : maybe a case of exhausted memory. Anyway file input stream has to be closed. */
try {
fis.close();
} catch (IOException e) {
log.warn("Could not close input stream on file " + infile);
}
}
}
log.info("finished processing json surrogate: " + ((System.currentTimeMillis() - starttime) / 1000) + " seconds");
return moved;
}
private void processSurrogateXML(final InputStream is, final String name) throws IOException {
final int concurrency = Runtime.getRuntime().availableProcessors();
// start reader thread
@ -2379,7 +2429,7 @@ public final class Switchboard extends serverSwitch {
assert this.crawlStacker != null;
Thread[] indexer = new Thread[concurrency];
for (int t = 0; t < concurrency; t++) {
indexer[t] = new Thread("Switchboard.processSurrogate-" + t) {
indexer[t] = new Thread("Switchboard.processSurrogateXML-" + t) {
@Override
public void run() {
VocabularyScraper scraper = new VocabularyScraper();

Loading…
Cancel
Save