added surrogate import process for exported solr dumps.

Just throw your solr dump file into DATA/SURROGATES/in/ and it will be
imported!
pull/8/head
Michael Peter Christen 10 years ago
parent b77537294d
commit b43811d38c

@ -30,7 +30,6 @@ import net.yacy.data.ymark.YMarkTables;
import net.yacy.data.ymark.YMarkUtil;
import net.yacy.data.ymark.YMarkXBELImporter;
import net.yacy.document.Parser.Failure;
import net.yacy.document.content.SurrogateReader;
import net.yacy.kelondro.blob.Tables;
import net.yacy.kelondro.workflow.InstantBusyThread;
import net.yacy.search.Switchboard;
@ -39,8 +38,6 @@ import net.yacy.server.serverSwitch;
import org.xml.sax.SAXException;
public class import_ymark {
public static serverObjects respond(final RequestHeader header, final serverObjects post, final serverSwitch env) {
@ -92,9 +89,10 @@ public class import_ymark {
final byte[] bytes = UTF8.getBytes(post.get("bmkfile$file"));
stream = new ByteArrayInputStream(bytes);
if(post.get("importer").equals("surro") && stream != null) {
/**
SurrogateReader surrogateReader;
try {
surrogateReader = new SurrogateReader(stream, queueSize);
surrogateReader = new SurrogateReader(stream, queueSize, sb.crawlStacker, sb.index.fulltext().getDefaultConfiguration());
} catch (final IOException e) {
//TODO: display an error message
ConcurrentLog.logException(e);
@ -106,6 +104,7 @@ public class import_ymark {
putBookmark(sb, bmk_user, bmk, autoTaggingQueue, autotag, empty, indexing, medialink);
}
prop.put("status", "1");
*/
} else {
MonitoredReader reader = null;
try {

@ -32,7 +32,6 @@ import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.MultiMapSolrParams;
import org.apache.solr.common.util.NamedList;
import net.yacy.cora.storage.Configuration;
import net.yacy.cora.util.ConcurrentLog;

@ -24,26 +24,32 @@
package net.yacy.document.content;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PushbackInputStream;
import java.io.Reader;
import java.io.StringReader;
import java.net.MalformedURLException;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.zip.GZIPInputStream;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.parsers.SAXParser;
import javax.xml.parsers.SAXParserFactory;
import net.yacy.cora.document.encoding.UTF8;
import net.yacy.cora.document.id.DigestURL;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.crawler.CrawlStacker;
import net.yacy.search.schema.CollectionConfiguration;
import org.apache.solr.client.solrj.impl.XMLResponseParser;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.NamedList;
import org.xml.sax.Attributes;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
@ -63,16 +69,19 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
" xmlns:geo=\"http://www.w3.org/2003/01/geo/wgs84_pos#\">";
public final static String SURROGATES_MAIN_ELEMENT_CLOSE =
"</" + SURROGATES_MAIN_ELEMENT_NAME + ">";
public final static SolrInputDocument POISON_DOCUMENT = new SolrInputDocument();
// class variables
private final StringBuilder buffer;
private boolean parsingValue;
private DCEntry surrogate;
private DCEntry dcEntry;
private String elementName;
private final BlockingQueue<DCEntry> surrogates;
private final BlockingQueue<SolrInputDocument> surrogates;
private SAXParser saxParser;
private final InputSource inputSource;
private final InputStream inputStream;
private final PushbackInputStream inputStream;
private final CrawlStacker crawlStacker;
private final CollectionConfiguration configuration;
private static final ThreadLocal<SAXParser> tlSax = new ThreadLocal<SAXParser>();
private static SAXParser getParser() throws SAXException {
@ -87,13 +96,19 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
}
return parser;
}
public SurrogateReader(final InputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration) throws IOException {
this(new PushbackInputStream(stream, 200), queueSize, crawlStacker, configuration);
}
public SurrogateReader(final InputStream stream, int queueSize) throws IOException {
public SurrogateReader(final PushbackInputStream stream, int queueSize, CrawlStacker crawlStacker, CollectionConfiguration configuration) throws IOException {
this.crawlStacker = crawlStacker;
this.configuration = configuration;
this.buffer = new StringBuilder(300);
this.parsingValue = false;
this.surrogate = null;
this.dcEntry = null;
this.elementName = null;
this.surrogates = new ArrayBlockingQueue<DCEntry>(queueSize);
this.surrogates = new ArrayBlockingQueue<>(queueSize);
Reader reader = new BufferedReader(new InputStreamReader(stream, "UTF-8"));
this.inputSource = new InputSource(reader);
@ -110,8 +125,37 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
@Override
public void run() {
// test the syntax of the stream by reading parts of the beginning
try {
this.saxParser.parse(this.inputSource, this);
if (isSolrDump()) {
BufferedReader br = new BufferedReader(new InputStreamReader(this.inputStream, "UTF-8"));
String line;
while ((line = br.readLine()) != null) {
if (!line.startsWith("<doc>")) continue;
try {
NamedList<Object> nl = new XMLResponseParser().processResponse(new StringReader("<result>" + line + "</result>")); //
SolrDocument doc = (SolrDocument) nl.iterator().next().getValue();
// check if url is in accepted domain
String u = (String) doc.getFieldValue("sku");
if (u != null) {
try {
DigestURL url = new DigestURL(u);
final String urlRejectReason = this.crawlStacker.urlInAcceptedDomain(url);
if ( urlRejectReason == null ) {
// convert DCEntry to SolrInputDocument
this.surrogates.put(this.configuration.toSolrInputDocument(doc));
}
} catch (MalformedURLException e) {
}
}
} catch (Throwable ee) {
// bad line
}
}
} else {
this.saxParser.parse(this.inputSource, this);
}
} catch (final SAXParseException e) {
ConcurrentLog.logException(e);
} catch (final SAXException e) {
@ -120,7 +164,7 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
ConcurrentLog.logException(e);
} finally {
try {
this.surrogates.put(DCEntry.poison);
this.surrogates.put(POISON_DOCUMENT);
} catch (final InterruptedException e1) {
ConcurrentLog.logException(e1);
}
@ -132,12 +176,35 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
}
}
private boolean isSolrDump() {
try {
if (this.inputStream.available() < 60) return false;
byte[] b = new byte[100];
this.inputStream.read(b);
try {
String s = UTF8.String(b);
if (s.contains("<response>") && s.contains("<result>")) {
this.inputStream.unread(b);
return true;
}
} catch (IOException e) {
ConcurrentLog.logException(e);
this.inputStream.unread(b);
return false;
}
} catch (IOException e) {
ConcurrentLog.logException(e);
return false;
}
return false;
}
@Override
public void startElement(final String uri, final String name, String tag, final Attributes atts) throws SAXException {
if (tag == null) return;
tag = tag.toLowerCase();
if ("record".equals(tag) || "document".equals(tag) || "doc".equals(tag)) {
this.surrogate = new DCEntry();
this.dcEntry = new DCEntry();
} else if ("element".equals(tag) || "str".equals(tag) || "int".equals(tag) || "bool".equals(tag) || "long".equals(tag)) {
this.elementName = atts.getValue("name");
this.parsingValue = true;
@ -158,12 +225,17 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
if ("record".equals(tag) || "document".equals(tag) || "doc".equals(tag)) {
//System.out.println("A Title: " + this.surrogate.title());
try {
this.surrogates.put(this.surrogate);
// check if url is in accepted domain
final String urlRejectReason = this.crawlStacker.urlInAcceptedDomain(this.dcEntry.getIdentifier(true));
if ( urlRejectReason == null ) {
// convert DCEntry to SolrInputDocument
this.surrogates.put(this.configuration.toSolrInputDocument(this.dcEntry));
}
} catch (final InterruptedException e) {
ConcurrentLog.logException(e);
} finally {
//System.out.println("B Title: " + this.surrogate.title());
this.surrogate = null;
this.dcEntry = null;
this.buffer.setLength(0);
this.parsingValue = false;
}
@ -173,7 +245,7 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
} else if ("str".equals(tag) || "int".equals(tag) || "bool".equals(tag) || "long".equals(tag)){
final String value = buffer.toString().trim();
if (this.elementName != null) {
this.surrogate.getMap().put(this.elementName, new String[]{value});
this.dcEntry.getMap().put(this.elementName, new String[]{value});
}
this.buffer.setLength(0);
this.parsingValue = false;
@ -181,14 +253,14 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
//System.out.println("BUFFER-SIZE=" + buffer.length());
final String value = buffer.toString().trim();
if (this.elementName != null) {
this.surrogate.getMap().put(this.elementName, new String[]{value});
this.dcEntry.getMap().put(this.elementName, new String[]{value});
}
this.buffer.setLength(0);
this.parsingValue = false;
} else if (tag.startsWith("dc:") || tag.startsWith("geo:") || tag.startsWith("md:")) {
final String value = buffer.toString().trim();
if (this.elementName != null && tag.equals(this.elementName)) {
Map<String,String[]> map = this.surrogate.getMap();
Map<String,String[]> map = this.dcEntry.getMap();
String[] oldcontent = map.get(this.elementName);
if (oldcontent == null || oldcontent.length == 0) {
map.put(this.elementName, new String[]{value});
@ -211,7 +283,7 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
}
}
public DCEntry take() {
public SolrInputDocument take() {
try {
return this.surrogates.take();
} catch (final InterruptedException e) {
@ -219,29 +291,5 @@ public class SurrogateReader extends DefaultHandler implements Runnable {
return null;
}
}
public static void main(String[] args) {
File f = new File(args[0]);
SurrogateReader sr;
try {
InputStream is = new BufferedInputStream(new FileInputStream(f));
if (f.getName().endsWith(".gz")) is = new GZIPInputStream(is);
sr = new SurrogateReader(is, 1);
Thread t = new Thread(sr, "Surrogate-Reader " + f.getAbsolutePath());
t.start();
DCEntry s;
while ((s = sr.take()) != DCEntry.poison) {
System.out.println("Title: " + s.getTitle());
System.out.println("Date: " + s.getDate());
System.out.println("Creator: " + s.getCreator());
System.out.println("Publisher: " + s.getPublisher());
System.out.println("URL: " + s.getIdentifier(true));
System.out.println("Language: " + s.getLanguage());
System.out.println("Body: " + s.getDescriptions().toString());
}
} catch (final IOException e) {
ConcurrentLog.logException(e);
}
}
}

@ -155,7 +155,6 @@ import net.yacy.document.LibraryProvider;
import net.yacy.document.Parser;
import net.yacy.document.TextParser;
import net.yacy.document.Parser.Failure;
import net.yacy.document.content.DCEntry;
import net.yacy.document.content.SurrogateReader;
import net.yacy.document.importer.OAIListFriendsLoader;
import net.yacy.document.parser.audioTagParser;
@ -1984,46 +1983,15 @@ public final class Switchboard extends serverSwitch {
}
public void processSurrogate(final InputStream is, final String name) throws IOException {
final SurrogateReader reader = new SurrogateReader(is, 100);
final SurrogateReader reader = new SurrogateReader(is, 100, this.crawlStacker, this.index.fulltext().getDefaultConfiguration());
final Thread readerThread = new Thread(reader, name);
readerThread.start();
DCEntry surrogate;
Response response;
while ( (surrogate = reader.take()) != DCEntry.poison ) {
SolrInputDocument surrogate;
while ((surrogate = reader.take()) != SurrogateReader.POISON_DOCUMENT ) {
// check if url is in accepted domain
assert surrogate != null;
assert this.crawlStacker != null;
final String urlRejectReason =
this.crawlStacker.urlInAcceptedDomain(surrogate.getIdentifier(true));
if ( urlRejectReason != null ) {
this.log.warn("Rejected URL '"
+ surrogate.getIdentifier(true)
+ "': "
+ urlRejectReason);
continue;
}
if (surrogate.get("text_t") == null) {
// create a queue entry
final Document document = surrogate.document();
final Request request =
new Request(
ASCII.getBytes(this.peers.mySeed().hash),
surrogate.getIdentifier(true),
null,
"",
surrogate.getDate(),
this.crawler.defaultSurrogateProfile.handle(),
0,
this.crawler.defaultSurrogateProfile.timezoneOffset());
response = new Response(request, null, null, this.crawler.defaultSurrogateProfile, false, null);
final IndexingQueueEntry queueEntry =
new IndexingQueueEntry(response, new Document[] {document}, null);
this.indexingCondensementProcessor.enQueue(queueEntry);
} else {
this.index.putDocument(this.index.fulltext().getDefaultConfiguration().toSolrInputDocument(surrogate));
}
this.index.putDocument(surrogate);
if (shallTerminate()) break;
}
}

@ -27,7 +27,6 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.Array;
import java.net.MalformedURLException;
import java.util.ArrayList;
@ -78,7 +77,6 @@ import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.core.SolrInfoMBean;
import org.apache.commons.io.output.StringBuilderWriter;
import org.apache.lucene.util.Version;
public final class Fulltext {

Loading…
Cancel
Save