From b43811d38c426ce5237df7d37c88285644e6a544 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Sat, 30 May 2015 13:19:59 +0200 Subject: [PATCH] added surrogate import process for exported solr dumps. Just throw your solr dump file into DATA/SURROGATES/in/ and it will be imported! --- htroot/api/ymarks/import_ymark.java | 7 +- .../federate/solr/SchemaConfiguration.java | 1 - .../document/content/SurrogateReader.java | 138 ++++++++++++------ source/net/yacy/search/Switchboard.java | 40 +---- source/net/yacy/search/index/Fulltext.java | 2 - 5 files changed, 100 insertions(+), 88 deletions(-) diff --git a/htroot/api/ymarks/import_ymark.java b/htroot/api/ymarks/import_ymark.java index 0558c284b..9dc4a7de9 100644 --- a/htroot/api/ymarks/import_ymark.java +++ b/htroot/api/ymarks/import_ymark.java @@ -30,7 +30,6 @@ import net.yacy.data.ymark.YMarkTables; import net.yacy.data.ymark.YMarkUtil; import net.yacy.data.ymark.YMarkXBELImporter; import net.yacy.document.Parser.Failure; -import net.yacy.document.content.SurrogateReader; import net.yacy.kelondro.blob.Tables; import net.yacy.kelondro.workflow.InstantBusyThread; import net.yacy.search.Switchboard; @@ -39,8 +38,6 @@ import net.yacy.server.serverSwitch; import org.xml.sax.SAXException; - - public class import_ymark { public static serverObjects respond(final RequestHeader header, final serverObjects post, final serverSwitch env) { @@ -92,9 +89,10 @@ public class import_ymark { final byte[] bytes = UTF8.getBytes(post.get("bmkfile$file")); stream = new ByteArrayInputStream(bytes); if(post.get("importer").equals("surro") && stream != null) { + /** SurrogateReader surrogateReader; try { - surrogateReader = new SurrogateReader(stream, queueSize); + surrogateReader = new SurrogateReader(stream, queueSize, sb.crawlStacker, sb.index.fulltext().getDefaultConfiguration()); } catch (final IOException e) { //TODO: display an error message ConcurrentLog.logException(e); @@ -106,6 +104,7 @@ public class import_ymark { putBookmark(sb, bmk_user, bmk, autoTaggingQueue, autotag, empty, indexing, medialink); } prop.put("status", "1"); + */ } else { MonitoredReader reader = null; try { diff --git a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java index 0f1ba91f8..9c563bf07 100644 --- a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java +++ b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java @@ -32,7 +32,6 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputField; import org.apache.solr.common.params.MultiMapSolrParams; -import org.apache.solr.common.util.NamedList; import net.yacy.cora.storage.Configuration; import net.yacy.cora.util.ConcurrentLog; diff --git a/source/net/yacy/document/content/SurrogateReader.java b/source/net/yacy/document/content/SurrogateReader.java index 7f9f2db63..0bb47b8f3 100644 --- a/source/net/yacy/document/content/SurrogateReader.java +++ b/source/net/yacy/document/content/SurrogateReader.java @@ -24,26 +24,32 @@ package net.yacy.document.content; -import java.io.BufferedInputStream; import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.PushbackInputStream; import java.io.Reader; +import java.io.StringReader; +import java.net.MalformedURLException; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.zip.GZIPInputStream; import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.SAXParser; import javax.xml.parsers.SAXParserFactory; - +import net.yacy.cora.document.encoding.UTF8; +import net.yacy.cora.document.id.DigestURL; import net.yacy.cora.util.ConcurrentLog; +import net.yacy.crawler.CrawlStacker; +import net.yacy.search.schema.CollectionConfiguration; +import org.apache.solr.client.solrj.impl.XMLResponseParser; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.NamedList; import org.xml.sax.Attributes; import org.xml.sax.InputSource; import org.xml.sax.SAXException; @@ -63,16 +69,19 @@ public class SurrogateReader extends DefaultHandler implements Runnable { " xmlns:geo=\"http://www.w3.org/2003/01/geo/wgs84_pos#\">"; public final static String SURROGATES_MAIN_ELEMENT_CLOSE = ""; + public final static SolrInputDocument POISON_DOCUMENT = new SolrInputDocument(); // class variables private final StringBuilder buffer; private boolean parsingValue; - private DCEntry surrogate; + private DCEntry dcEntry; private String elementName; - private final BlockingQueue surrogates; + private final BlockingQueue surrogates; private SAXParser saxParser; private final InputSource inputSource; - private final InputStream inputStream; + private final PushbackInputStream inputStream; + private final CrawlStacker crawlStacker; + private final CollectionConfiguration configuration; private static final ThreadLocal tlSax = new ThreadLocal(); private static SAXParser getParser() throws SAXException { @@ -87,13 +96,19 @@ public class SurrogateReader extends DefaultHandler implements Runnable { } return parser; } + + public SurrogateReader(final InputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration) throws IOException { + this(new PushbackInputStream(stream, 200), queueSize, crawlStacker, configuration); + } - public SurrogateReader(final InputStream stream, int queueSize) throws IOException { + public SurrogateReader(final PushbackInputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration) throws IOException { + this.crawlStacker = crawlStacker; + this.configuration = configuration; this.buffer = new StringBuilder(300); this.parsingValue = false; - this.surrogate = null; + this.dcEntry = null; this.elementName = null; - this.surrogates = new ArrayBlockingQueue(queueSize); + this.surrogates = new ArrayBlockingQueue<>(queueSize); Reader reader = new BufferedReader(new InputStreamReader(stream, "UTF-8")); this.inputSource = new InputSource(reader); @@ -110,8 +125,37 @@ public class SurrogateReader extends DefaultHandler implements Runnable { @Override public void run() { + // test the syntax of the stream by reading parts of the beginning try { - this.saxParser.parse(this.inputSource, this); + if (isSolrDump()) { + BufferedReader br = new BufferedReader(new InputStreamReader(this.inputStream, "UTF-8")); + String line; + while ((line = br.readLine()) != null) { + if (!line.startsWith("")) continue; + try { + NamedList nl = new XMLResponseParser().processResponse(new StringReader("" + line + "")); // + SolrDocument doc = (SolrDocument) nl.iterator().next().getValue(); + + // check if url is in accepted domain + String u = (String) doc.getFieldValue("sku"); + if (u != null) { + try { + DigestURL url = new DigestURL(u); + final String urlRejectReason = this.crawlStacker.urlInAcceptedDomain(url); + if ( urlRejectReason == null ) { + // convert DCEntry to SolrInputDocument + this.surrogates.put(this.configuration.toSolrInputDocument(doc)); + } + } catch (MalformedURLException e) { + } + } + } catch (Throwable ee) { + // bad line + } + } + } else { + this.saxParser.parse(this.inputSource, this); + } } catch (final SAXParseException e) { ConcurrentLog.logException(e); } catch (final SAXException e) { @@ -120,7 +164,7 @@ public class SurrogateReader extends DefaultHandler implements Runnable { ConcurrentLog.logException(e); } finally { try { - this.surrogates.put(DCEntry.poison); + this.surrogates.put(POISON_DOCUMENT); } catch (final InterruptedException e1) { ConcurrentLog.logException(e1); } @@ -132,12 +176,35 @@ public class SurrogateReader extends DefaultHandler implements Runnable { } } + private boolean isSolrDump() { + try { + if (this.inputStream.available() < 60) return false; + byte[] b = new byte[100]; + this.inputStream.read(b); + try { + String s = UTF8.String(b); + if (s.contains("") && s.contains("")) { + this.inputStream.unread(b); + return true; + } + } catch (IOException e) { + ConcurrentLog.logException(e); + this.inputStream.unread(b); + return false; + } + } catch (IOException e) { + ConcurrentLog.logException(e); + return false; + } + return false; + } + @Override public void startElement(final String uri, final String name, String tag, final Attributes atts) throws SAXException { if (tag == null) return; tag = tag.toLowerCase(); if ("record".equals(tag) || "document".equals(tag) || "doc".equals(tag)) { - this.surrogate = new DCEntry(); + this.dcEntry = new DCEntry(); } else if ("element".equals(tag) || "str".equals(tag) || "int".equals(tag) || "bool".equals(tag) || "long".equals(tag)) { this.elementName = atts.getValue("name"); this.parsingValue = true; @@ -158,12 +225,17 @@ public class SurrogateReader extends DefaultHandler implements Runnable { if ("record".equals(tag) || "document".equals(tag) || "doc".equals(tag)) { //System.out.println("A Title: " + this.surrogate.title()); try { - this.surrogates.put(this.surrogate); + // check if url is in accepted domain + final String urlRejectReason = this.crawlStacker.urlInAcceptedDomain(this.dcEntry.getIdentifier(true)); + if ( urlRejectReason == null ) { + // convert DCEntry to SolrInputDocument + this.surrogates.put(this.configuration.toSolrInputDocument(this.dcEntry)); + } } catch (final InterruptedException e) { ConcurrentLog.logException(e); } finally { //System.out.println("B Title: " + this.surrogate.title()); - this.surrogate = null; + this.dcEntry = null; this.buffer.setLength(0); this.parsingValue = false; } @@ -173,7 +245,7 @@ public class SurrogateReader extends DefaultHandler implements Runnable { } else if ("str".equals(tag) || "int".equals(tag) || "bool".equals(tag) || "long".equals(tag)){ final String value = buffer.toString().trim(); if (this.elementName != null) { - this.surrogate.getMap().put(this.elementName, new String[]{value}); + this.dcEntry.getMap().put(this.elementName, new String[]{value}); } this.buffer.setLength(0); this.parsingValue = false; @@ -181,14 +253,14 @@ public class SurrogateReader extends DefaultHandler implements Runnable { //System.out.println("BUFFER-SIZE=" + buffer.length()); final String value = buffer.toString().trim(); if (this.elementName != null) { - this.surrogate.getMap().put(this.elementName, new String[]{value}); + this.dcEntry.getMap().put(this.elementName, new String[]{value}); } this.buffer.setLength(0); this.parsingValue = false; } else if (tag.startsWith("dc:") || tag.startsWith("geo:") || tag.startsWith("md:")) { final String value = buffer.toString().trim(); if (this.elementName != null && tag.equals(this.elementName)) { - Map map = this.surrogate.getMap(); + Map map = this.dcEntry.getMap(); String[] oldcontent = map.get(this.elementName); if (oldcontent == null || oldcontent.length == 0) { map.put(this.elementName, new String[]{value}); @@ -211,7 +283,7 @@ public class SurrogateReader extends DefaultHandler implements Runnable { } } - public DCEntry take() { + public SolrInputDocument take() { try { return this.surrogates.take(); } catch (final InterruptedException e) { @@ -219,29 +291,5 @@ public class SurrogateReader extends DefaultHandler implements Runnable { return null; } } - - public static void main(String[] args) { - File f = new File(args[0]); - SurrogateReader sr; - try { - InputStream is = new BufferedInputStream(new FileInputStream(f)); - if (f.getName().endsWith(".gz")) is = new GZIPInputStream(is); - sr = new SurrogateReader(is, 1); - - Thread t = new Thread(sr, "Surrogate-Reader " + f.getAbsolutePath()); - t.start(); - DCEntry s; - while ((s = sr.take()) != DCEntry.poison) { - System.out.println("Title: " + s.getTitle()); - System.out.println("Date: " + s.getDate()); - System.out.println("Creator: " + s.getCreator()); - System.out.println("Publisher: " + s.getPublisher()); - System.out.println("URL: " + s.getIdentifier(true)); - System.out.println("Language: " + s.getLanguage()); - System.out.println("Body: " + s.getDescriptions().toString()); - } - } catch (final IOException e) { - ConcurrentLog.logException(e); - } - } + } diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index b68dc4ce3..57540271c 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -155,7 +155,6 @@ import net.yacy.document.LibraryProvider; import net.yacy.document.Parser; import net.yacy.document.TextParser; import net.yacy.document.Parser.Failure; -import net.yacy.document.content.DCEntry; import net.yacy.document.content.SurrogateReader; import net.yacy.document.importer.OAIListFriendsLoader; import net.yacy.document.parser.audioTagParser; @@ -1984,46 +1983,15 @@ public final class Switchboard extends serverSwitch { } public void processSurrogate(final InputStream is, final String name) throws IOException { - final SurrogateReader reader = new SurrogateReader(is, 100); + final SurrogateReader reader = new SurrogateReader(is, 100, this.crawlStacker, this.index.fulltext().getDefaultConfiguration()); final Thread readerThread = new Thread(reader, name); readerThread.start(); - DCEntry surrogate; - Response response; - while ( (surrogate = reader.take()) != DCEntry.poison ) { + SolrInputDocument surrogate; + while ((surrogate = reader.take()) != SurrogateReader.POISON_DOCUMENT ) { // check if url is in accepted domain assert surrogate != null; assert this.crawlStacker != null; - final String urlRejectReason = - this.crawlStacker.urlInAcceptedDomain(surrogate.getIdentifier(true)); - if ( urlRejectReason != null ) { - this.log.warn("Rejected URL '" - + surrogate.getIdentifier(true) - + "': " - + urlRejectReason); - continue; - } - - if (surrogate.get("text_t") == null) { - // create a queue entry - final Document document = surrogate.document(); - final Request request = - new Request( - ASCII.getBytes(this.peers.mySeed().hash), - surrogate.getIdentifier(true), - null, - "", - surrogate.getDate(), - this.crawler.defaultSurrogateProfile.handle(), - 0, - this.crawler.defaultSurrogateProfile.timezoneOffset()); - response = new Response(request, null, null, this.crawler.defaultSurrogateProfile, false, null); - final IndexingQueueEntry queueEntry = - new IndexingQueueEntry(response, new Document[] {document}, null); - - this.indexingCondensementProcessor.enQueue(queueEntry); - } else { - this.index.putDocument(this.index.fulltext().getDefaultConfiguration().toSolrInputDocument(surrogate)); - } + this.index.putDocument(surrogate); if (shallTerminate()) break; } } diff --git a/source/net/yacy/search/index/Fulltext.java b/source/net/yacy/search/index/Fulltext.java index 6d70297e6..81dca80f6 100644 --- a/source/net/yacy/search/index/Fulltext.java +++ b/source/net/yacy/search/index/Fulltext.java @@ -27,7 +27,6 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintWriter; -import java.io.StringWriter; import java.lang.reflect.Array; import java.net.MalformedURLException; import java.util.ArrayList; @@ -78,7 +77,6 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.core.SolrInfoMBean; -import org.apache.commons.io.output.StringBuilderWriter; import org.apache.lucene.util.Version; public final class Fulltext {