fixed a problem in warc importer: do not fail if single WARC entries are

faulty
pull/402/head
Michael Peter Christen 4 years ago
parent 3078b74e1d
commit d3526c52af

@ -24,7 +24,6 @@ package net.yacy.document.importer;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
@ -121,49 +120,55 @@ public class WarcImporter extends Thread implements Importer {
InputStream istream = wrec.getPayloadContent(); InputStream istream = wrec.getPayloadContent();
hl = http.getHeader(HeaderFramework.TRANSFER_ENCODING); hl = http.getHeader(HeaderFramework.TRANSFER_ENCODING);
if (hl != null && hl.value.contains("chunked")) { content = null;
// because chunked stream.read doesn't read source fully, make sure all chunks are read try {
istream = new ChunkedInputStream(istream); if (hl != null && hl.value.contains("chunked")) {
final ByteBuffer bbuffer = new ByteBuffer(); // because chunked stream.read doesn't read source fully, make sure all chunks are read
int c; istream = new ChunkedInputStream(istream);
while ((c = istream.read()) >= 0) { final ByteBuffer bbuffer = new ByteBuffer();
bbuffer.append(c); int c;
while ((c = istream.read()) >= 0) {
bbuffer.append(c);
}
content = bbuffer.getBytes();
} else {
content = new byte[(int) http.getPayloadLength()];
istream.read(content, 0, content.length);
} }
content = bbuffer.getBytes();
} else {
content = new byte[(int) http.getPayloadLength()];
istream.read(content, 0, content.length);
}
istream.close();
RequestHeader requestHeader = new RequestHeader(); RequestHeader requestHeader = new RequestHeader();
ResponseHeader responseHeader = new ResponseHeader(http.statusCode);
for (HeaderLine hx : http.getHeaderList()) { // include all original response headers for parser
responseHeader.put(hx.name, hx.value);
}
final Request request = new Request(
ASCII.getBytes(Switchboard.getSwitchboard().peers.mySeed().hash),
location,
requestHeader.referer() == null ? null : requestHeader.referer().hash(),
"warc",
responseHeader.lastModified(),
Switchboard.getSwitchboard().crawler.defaultSurrogateProfile.handle(),
0,
Switchboard.getSwitchboard().crawler.defaultSurrogateProfile.timezoneOffset());
final Response response = new Response(
request,
requestHeader,
responseHeader,
Switchboard.getSwitchboard().crawler.defaultSurrogateProfile,
false,
content
);
ResponseHeader responseHeader = new ResponseHeader(http.statusCode); String error = Switchboard.getSwitchboard().toIndexer(response);
for (HeaderLine hx : http.getHeaderList()) { // include all original response headers for parser if (error != null) ConcurrentLog.info("WarcImporter", "error parsing: " + error);
responseHeader.put(hx.name, hx.value); } catch (IOException e) {
ConcurrentLog.info("WarcImporter", "error reading: " + e.getMessage());
} finally {
try {istream.close();} catch (IOException e) {}
} }
final Request request = new Request(
ASCII.getBytes(Switchboard.getSwitchboard().peers.mySeed().hash),
location,
requestHeader.referer() == null ? null : requestHeader.referer().hash(),
"warc",
responseHeader.lastModified(),
Switchboard.getSwitchboard().crawler.defaultSurrogateProfile.handle(),
0,
Switchboard.getSwitchboard().crawler.defaultSurrogateProfile.timezoneOffset());
final Response response = new Response(
request,
requestHeader,
responseHeader,
Switchboard.getSwitchboard().crawler.defaultSurrogateProfile,
false,
content
);
String error = Switchboard.getSwitchboard().toIndexer(response);
if (error != null) ConcurrentLog.info("WarcImporter", "error: " + error);
recordCnt++; recordCnt++;
} }
} }

Loading…
Cancel
Save