recover sax fatal error on OAI-PMH import of xml with entity error

this allows to continue loading next resumptionToken even if import file caused sax parser error
fix http://mantis.tokeek.de/view.php?id=63
pull/1/head
reger 11 years ago
parent 81dc2aa536
commit 121d25be38

@ -47,7 +47,7 @@ public class IndexImportOAIPMHList_p {
if (post != null && post.containsKey("source")) { if (post != null && post.containsKey("source")) {
ClientIdentification.Agent agent = ClientIdentification.getAgent(post.get("agentName", ClientIdentification.yacyInternetCrawlerAgentName)); ClientIdentification.Agent agent = ClientIdentification.getAgent(post.get("agentName", ClientIdentification.yacyInternetCrawlerAgentName));
final Set<String> oaiRoots = OAIListFriendsLoader.getListFriends(sb.loader, agent).keySet(); final Set<String> oaiRoots = new OAIListFriendsLoader().getListFriends(sb.loader, agent).keySet();
boolean dark = false; boolean dark = false;
int count = 0; int count = 0;

@ -83,7 +83,7 @@ public class OAIListFriendsLoader implements Serializable {
} }
public static Map<String, String> getListFriends(final LoaderDispatcher loader, final ClientIdentification.Agent agent) { public Map<String, String> getListFriends(final LoaderDispatcher loader, final ClientIdentification.Agent agent) {
final Map<String, String> map = new TreeMap<String, String>(); final Map<String, String> map = new TreeMap<String, String>();
Map<String, String> m; Map<String, String> m;
for (final Map.Entry<String, File> oaiFriend: listFriends.entrySet()) try { for (final Map.Entry<String, File> oaiFriend: listFriends.entrySet()) try {
@ -105,7 +105,7 @@ public class OAIListFriendsLoader implements Serializable {
} }
private static final ThreadLocal<SAXParser> tlSax = new ThreadLocal<SAXParser>(); private static final ThreadLocal<SAXParser> tlSax = new ThreadLocal<SAXParser>();
private static SAXParser getParser() throws SAXException { private SAXParser getParser() throws SAXException {
SAXParser parser = tlSax.get(); SAXParser parser = tlSax.get();
if (parser == null) { if (parser == null) {
try { try {
@ -119,7 +119,7 @@ public class OAIListFriendsLoader implements Serializable {
} }
// get a resumption token using a SAX xml parser from am input stream // get a resumption token using a SAX xml parser from am input stream
public static class Parser extends DefaultHandler { private class Parser extends DefaultHandler {
// class variables // class variables
private final StringBuilder buffer; private final StringBuilder buffer;

@ -29,7 +29,6 @@ import java.text.ParseException;
import java.util.Date; import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import net.yacy.cora.date.GenericFormatter; import net.yacy.cora.date.GenericFormatter;
@ -46,7 +45,7 @@ import net.yacy.search.Switchboard;
public class OAIPMHImporter extends Thread implements Importer, Comparable<OAIPMHImporter> { public class OAIPMHImporter extends Thread implements Importer, Comparable<OAIPMHImporter> {
private static int importerCounter = Integer.MAX_VALUE; private static int importerCounter = Integer.MAX_VALUE;
private static Object N = new Object(); private static final Object N = new Object();
public static ConcurrentHashMap<OAIPMHImporter, Object> startedJobs = new ConcurrentHashMap<OAIPMHImporter, Object>(); public static ConcurrentHashMap<OAIPMHImporter, Object> startedJobs = new ConcurrentHashMap<OAIPMHImporter, Object>();
public static ConcurrentHashMap<OAIPMHImporter, Object> runningJobs = new ConcurrentHashMap<OAIPMHImporter, Object>(); public static ConcurrentHashMap<OAIPMHImporter, Object> runningJobs = new ConcurrentHashMap<OAIPMHImporter, Object>();
@ -137,16 +136,16 @@ public class OAIPMHImporter extends Thread implements Importer, Comparable<OAIPM
this.message = "loading first part of records"; this.message = "loading first part of records";
while (true) { while (true) {
try { try {
OAIPMHLoader loader = new OAIPMHLoader(this.loader, this.source, Switchboard.getSwitchboard().surrogatesInPath, this.agent); OAIPMHLoader oailoader = new OAIPMHLoader(this.loader, this.source, Switchboard.getSwitchboard().surrogatesInPath, this.agent);
this.completeListSize = Math.max(this.completeListSize, loader.getResumptionToken().getCompleteListSize()); this.completeListSize = Math.max(this.completeListSize, oailoader.getResumptionToken().getCompleteListSize());
this.chunkCount++; this.chunkCount++;
this.recordsCount += loader.getResumptionToken().getRecordCounter(); this.recordsCount += oailoader.getResumptionToken().getRecordCounter();
this.source = loader.getResumptionToken().resumptionURL(); this.source = oailoader.getResumptionToken().resumptionURL();
if (this.source == null) { if (this.source == null) {
this.message = "import terminated with source = null"; this.message = "import terminated with source = null";
break; break;
} }
this.message = "loading next resumption fragment, cursor = " + loader.getResumptionToken().getCursor(); this.message = "loading next resumption fragment, cursor = " + oailoader.getResumptionToken().getCursor();
} catch (final IOException e) { } catch (final IOException e) {
this.message = e.getMessage(); this.message = e.getMessage();
break; break;
@ -182,21 +181,6 @@ public class OAIPMHImporter extends Thread implements Importer, Comparable<OAIPM
return 0; return 0;
} }
public static Set<String> getUnloadedOAIServer(
LoaderDispatcher loader,
File surrogatesIn,
File surrogatesOut,
long staleLimit,
ClientIdentification.Agent agent) {
Set<String> plainList = OAIListFriendsLoader.getListFriends(loader, agent).keySet();
Map<String, Date> loaded = getLoadedOAIServer(surrogatesIn, surrogatesOut);
long limit = System.currentTimeMillis() - staleLimit;
for (Map.Entry<String, Date> a: loaded.entrySet()) {
if (a.getValue().getTime() > limit) plainList.remove(a.getKey());
}
return plainList;
}
/** /**
* get a map for already loaded oai-pmh servers and their latest access date * get a map for already loaded oai-pmh servers and their latest access date
* @param surrogatesIn * @param surrogatesIn

@ -82,61 +82,6 @@ public class OAIPMHLoader {
public String source() { public String source() {
return this.source.toNormalform(true); return this.source.toNormalform(true);
} }
public static StringBuilder escape(final String s) {
final int len = s.length();
final StringBuilder sbuf = new StringBuilder(len + 10);
for (int i = 0; i < len; i++) {
final int ch = s.charAt(i);
if ('A' <= ch && ch <= 'Z') { // 'A'..'Z'
sbuf.append((char)ch);
} else if ('a' <= ch && ch <= 'z') { // 'a'..'z'
sbuf.append((char)ch);
} else if ('0' <= ch && ch <= '9') { // '0'..'9'
sbuf.append((char)ch);
} else if (ch == ' ') { // space
sbuf.append("%20");
} else if (ch == '&' || ch == ':' // unreserved
|| ch == '-' || ch == '_'
|| ch == '.' || ch == '!'
|| ch == '~' || ch == '*'
|| ch == '\'' || ch == '('
|| ch == ')' || ch == ';') {
sbuf.append((char)ch);
}
}
return sbuf;
}
public static String unescape(final String s) {
final int l = s.length();
final StringBuilder sbuf = new StringBuilder(l);
int ch = -1;
int b;
for (int i = 0; i < l; i++) {
/* Get next byte b from URL segment s */
switch (ch = s.charAt(i)) {
case '%':
if (i + 2 < l) {
ch = s.charAt(++i);
final int hb = (Character.isDigit ((char) ch) ? ch - '0' : 10 + Character.toLowerCase((char) ch) - 'a') & 0xF;
ch = s.charAt(++i);
final int lb = (Character.isDigit ((char) ch) ? ch - '0' : 10 + Character.toLowerCase ((char) ch) - 'a') & 0xF;
b = (hb << 4) | lb;
} else {
b = ch;
}
break;
case '+':
b = ' ';
break;
default:
b = ch;
}
sbuf.append(b);
}
return sbuf.toString();
}
} }
/* /*

@ -55,7 +55,7 @@ public class ResumptionToken extends TreeMap<String, String> {
insensitiveCollator.setDecomposition(Collator.NO_DECOMPOSITION); insensitiveCollator.setDecomposition(Collator.NO_DECOMPOSITION);
} }
int recordCounter; private int recordCounter;
private final DigestURL source; private final DigestURL source;
@ -66,40 +66,6 @@ public class ResumptionToken extends TreeMap<String, String> {
new Parser(b); new Parser(b);
} }
/*
public ResumptionToken(
DigestURI source,
Date expirationDate,
int completeListSize,
int cursor,
String token
) {
super((Collator) insensitiveCollator.clone());
this.source = source;
this.recordCounter = 0;
this.put("expirationDate", DateFormatter.formatISO8601(expirationDate));
this.put("completeListSize", Integer.toString(completeListSize));
this.put("cursor", Integer.toString(cursor));
this.put("token", token);
}
public ResumptionToken(
DigestURI source,
String expirationDate,
int completeListSize,
int cursor,
String token
) {
super((Collator) insensitiveCollator.clone());
this.source = source;
this.recordCounter = 0;
this.put("expirationDate", expirationDate);
this.put("completeListSize", Integer.toString(completeListSize));
this.put("cursor", Integer.toString(cursor));
this.put("token", token);
}
*/
/** /**
* truncate the given url at the '?' * truncate the given url at the '?'
* @param url * @param url
@ -150,7 +116,7 @@ public class ResumptionToken extends TreeMap<String, String> {
return new DigestURL(u); return new DigestURL(u);
} }
public static StringBuilder escape(final String s) { private StringBuilder escape(final String s) {
final int len = s.length(); final int len = s.length();
final StringBuilder sbuf = new StringBuilder(len + 10); final StringBuilder sbuf = new StringBuilder(len + 10);
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
@ -260,7 +226,7 @@ public class ResumptionToken extends TreeMap<String, String> {
} }
return parser; return parser;
} }
// get a resumption token using a SAX xml parser from am input stream // get a resumption token using a SAX xml parser from am input stream
private class Parser extends DefaultHandler { private class Parser extends DefaultHandler {
@ -268,27 +234,49 @@ public class ResumptionToken extends TreeMap<String, String> {
private final StringBuilder buffer; private final StringBuilder buffer;
private boolean parsingValue; private boolean parsingValue;
private SAXParser saxParser; private SAXParser saxParser;
private final InputStream stream;
private Attributes atts; private Attributes atts;
public Parser(final byte[] b) throws IOException { public Parser(final byte[] b) throws IOException {
this.buffer = new StringBuilder(); this.buffer = new StringBuilder();
this.parsingValue = false; this.parsingValue = false;
this.atts = null; this.atts = null;
this.stream = new ByteArrayInputStream(b); InputStream stream = new ByteArrayInputStream(b);
try { try {
this.saxParser = getParser(); this.saxParser = getParser();
this.saxParser.parse(this.stream, this); this.saxParser.parse(stream, this);
} catch (final SAXException e) { } catch (final SAXException e) {
ConcurrentLog.logException(e); // some received xml are not valid, common error is '&' sign in xml text
ConcurrentLog.warn("ResumptionToken", "token was not parsed (1):\n" + UTF8.String(b)); // causing a fatal sax error before <resumptionToken> entry is reached
// this patch tries to extract the <resumptionToken> only to allow loading
// the next oai-pmh xml.
// extract and parse only resumptionToken line
String in = UTF8.String(b);
final int istart = in.lastIndexOf("<resumptionToken");
if (istart >= 0) {
final int iend = in.indexOf("</resumptionToken",istart);
if (iend >= 0) {
in = in.substring(istart, iend) + "</resumptionToken>";
stream = new ByteArrayInputStream(UTF8.getBytes(in));
try {
this.saxParser.parse(stream, this);
ResumptionToken.this.recordCounter++;
} catch (final SAXException e2) {
ConcurrentLog.warn("ResumptionToken", "token was not parsed (invalid resumption token): " + in);
}
ConcurrentLog.warn("ResumptionToken", "input file with error: " + e.getMessage());
}
} else {
ConcurrentLog.logException(e);
ConcurrentLog.warn("ResumptionToken", "token was not parsed (parser error):\n" + UTF8.String(b));
}
} catch (final IOException e) { } catch (final IOException e) {
ConcurrentLog.logException(e); ConcurrentLog.logException(e);
ConcurrentLog.warn("ResumptionToken", "token was not parsed (2):\n" + UTF8.String(b)); ConcurrentLog.warn("ResumptionToken", "token was not parsed (IO error)");
throw new IOException(e.getMessage()); throw new IOException(e.getMessage());
} finally { } finally {
try { try {
this.stream.close(); stream.close();
} catch (final IOException e) { } catch (final IOException e) {
ConcurrentLog.logException(e); ConcurrentLog.logException(e);
} }

Loading…
Cancel
Save