diff --git a/htroot/QuickCrawlLink_p.java b/htroot/QuickCrawlLink_p.java index 0ce745935..16ed0a9bd 100644 --- a/htroot/QuickCrawlLink_p.java +++ b/htroot/QuickCrawlLink_p.java @@ -184,15 +184,20 @@ public class QuickCrawlLink_p { } // stack URL - String reasonString = switchboard.sbStackCrawlThread.stackCrawl( - crawlingStart, - null, - yacyCore.seedDB.mySeed.hash, - (title==null)?"CRAWLING-ROOT":title, - new Date(), - 0, - pe - ); + String reasonString = null; + try { + reasonString = switchboard.sbStackCrawlThread.stackCrawl( + crawlingStart, + null, + yacyCore.seedDB.mySeed.hash, + (title==null)?"CRAWLING-ROOT":title, + new Date(), + 0, + pe + ); + } catch (InterruptedException e) { + reasonString = "Server shutdown in progess"; + } // validate rejection reason if (reasonString == null) { diff --git a/htroot/yacy/crawlOrder.java b/htroot/yacy/crawlOrder.java index e17de4754..984604229 100644 --- a/htroot/yacy/crawlOrder.java +++ b/htroot/yacy/crawlOrder.java @@ -234,7 +234,12 @@ public final class crawlOrder { String response, reason, lurl; // stack url switchboard.getLog().logFinest("crawlOrder: stack: url='" + url + "'"); - String reasonString = switchboard.sbStackCrawlThread.stackCrawl(url, referrer, iam, "REMOTE-CRAWLING", new Date(), 0, switchboard.defaultRemoteProfile); + String reasonString = null; + try { + reasonString = switchboard.sbStackCrawlThread.stackCrawl(url, referrer, iam, "REMOTE-CRAWLING", new Date(), 0, switchboard.defaultRemoteProfile); + } catch (InterruptedException e) { + reasonString = "Shutdown in progress"; + } if (reasonString == null) { // liftoff! response = "stacked"; diff --git a/source/de/anomic/data/robotsParser.java b/source/de/anomic/data/robotsParser.java index a84f03ce2..169ae2b05 100644 --- a/source/de/anomic/data/robotsParser.java +++ b/source/de/anomic/data/robotsParser.java @@ -327,7 +327,13 @@ public final class robotsParser{ } + // sending the get request httpc.response res = con.GET(robotsURL.getFile(), reqHeaders); + + // check for interruption + if (Thread.currentThread().isInterrupted()) throw new InterruptedException("Shutdown in progress."); + + // check the response status if (res.status.startsWith("2")) { if (!res.responseHeader.mime().startsWith("text/plain")) { robotsTxt = null; diff --git a/source/de/anomic/plasma/parser/AbstractParser.java b/source/de/anomic/plasma/parser/AbstractParser.java index 2d07e8b8c..e8632ce93 100644 --- a/source/de/anomic/plasma/parser/AbstractParser.java +++ b/source/de/anomic/plasma/parser/AbstractParser.java @@ -53,6 +53,7 @@ import java.io.InputStream; import de.anomic.net.URL; import de.anomic.plasma.plasmaParserDocument; +import de.anomic.server.serverThread; import de.anomic.server.logging.serverLog; /** @@ -92,6 +93,12 @@ public abstract class AbstractParser implements Parser{ this.libxDependencies = libxDependencies; } + public static final void checkInterruption() throws InterruptedException { + Thread currentThread = Thread.currentThread(); + if ((currentThread instanceof serverThread) && ((serverThread)currentThread).shutdownInProgress()) throw new InterruptedException("Shutdown in progress ..."); + if (currentThread.isInterrupted()) throw new InterruptedException("Shutdown in progress ..."); + } + /** * Parsing a document available as byte array. * @param location the origin of the document @@ -107,7 +114,7 @@ public abstract class AbstractParser implements Parser{ URL location, String mimeType, byte[] source - ) throws ParserException { + ) throws ParserException, InterruptedException { ByteArrayInputStream contentInputStream = null; try { contentInputStream = new ByteArrayInputStream(source); @@ -134,7 +141,7 @@ public abstract class AbstractParser implements Parser{ * @see de.anomic.plasma.parser.Parser#parse(de.anomic.net.URL, java.lang.String, java.io.File) */ public plasmaParserDocument parse(URL location, String mimeType, - File sourceFile) throws ParserException { + File sourceFile) throws ParserException, InterruptedException { BufferedInputStream contentInputStream = null; try { contentInputStream = new BufferedInputStream(new FileInputStream(sourceFile)); @@ -158,7 +165,7 @@ public abstract class AbstractParser implements Parser{ * @see de.anomic.plasma.parser.Parser#parse(de.anomic.net.URL, java.lang.String, java.io.InputStream) */ public abstract plasmaParserDocument parse(URL location, String mimeType, - InputStream source) throws ParserException; + InputStream source) throws ParserException, InterruptedException; /** * @return Returns a list of library names that are needed by this parser diff --git a/source/de/anomic/plasma/parser/Parser.java b/source/de/anomic/plasma/parser/Parser.java index 9e2e4d343..c44b1d84c 100644 --- a/source/de/anomic/plasma/parser/Parser.java +++ b/source/de/anomic/plasma/parser/Parser.java @@ -71,7 +71,7 @@ public interface Parser { * @throws ParserException if the content could not be parsed properly */ public plasmaParserDocument parse(URL location, String mimeType, byte[] source) - throws ParserException; + throws ParserException, InterruptedException; /** * Parsing a document stored in a {@link File} @@ -84,7 +84,7 @@ public interface Parser { * @throws ParserException if the content could not be parsed properly */ public plasmaParserDocument parse(URL location, String mimeType, File sourceFile) - throws ParserException; + throws ParserException, InterruptedException; /** * Parsing a document available as {@link InputStream} @@ -97,7 +97,7 @@ public interface Parser { * @throws ParserException if the content could not be parsed properly */ public plasmaParserDocument parse(URL location, String mimeType, InputStream source) - throws ParserException; + throws ParserException, InterruptedException; /** * Can be used to determine the MimeType(s) that are supported by the parser diff --git a/source/de/anomic/plasma/parser/bzip/bzipParser.java b/source/de/anomic/plasma/parser/bzip/bzipParser.java index 81dc8f85f..7ce87893f 100644 --- a/source/de/anomic/plasma/parser/bzip/bzipParser.java +++ b/source/de/anomic/plasma/parser/bzip/bzipParser.java @@ -87,7 +87,7 @@ public class bzipParser extends AbstractParser implements Parser { return SUPPORTED_MIME_TYPES; } - public plasmaParserDocument parse(URL location, String mimeType, InputStream source) throws ParserException { + public plasmaParserDocument parse(URL location, String mimeType, InputStream source) throws ParserException, InterruptedException { File tempFile = null; try { @@ -115,17 +115,20 @@ public class bzipParser extends AbstractParser implements Parser { FileOutputStream out = new FileOutputStream(tempFile); // reading gzip file and store it uncompressed - while((read = zippedContent.read(data, 0, 1024)) != -1) - { + while((read = zippedContent.read(data, 0, 1024)) != -1) { out.write(data, 0, read); } zippedContent.close(); out.close(); + // check for interruption + checkInterruption(); + // creating a new parser class to parse the unzipped content plasmaParser theParser = new plasmaParser(); return theParser.parseSource(location,null,tempFile); - } catch (Exception e) { + } catch (Exception e) { + if (e instanceof InterruptedException) throw (InterruptedException) e; throw new ParserException("Unable to parse the gzip content. " + e.getMessage()); } finally { if (tempFile != null) tempFile.delete(); diff --git a/source/de/anomic/plasma/parser/doc/docParser.java b/source/de/anomic/plasma/parser/doc/docParser.java index e17910736..8cf1bb32c 100644 --- a/source/de/anomic/plasma/parser/doc/docParser.java +++ b/source/de/anomic/plasma/parser/doc/docParser.java @@ -79,7 +79,7 @@ implements Parser { } public plasmaParserDocument parse(URL location, String mimeType, - InputStream source) throws ParserException { + InputStream source) throws ParserException, InterruptedException { try { @@ -103,8 +103,8 @@ implements Parser { null); return theDoc; - } - catch (Exception e) { + } catch (Exception e) { + if (e instanceof InterruptedException) throw (InterruptedException) e; throw new ParserException("Unable to parse the doc content. " + e.getMessage()); } } diff --git a/source/de/anomic/plasma/parser/gzip/gzipParser.java b/source/de/anomic/plasma/parser/gzip/gzipParser.java index 5153004db..b9db9827b 100644 --- a/source/de/anomic/plasma/parser/gzip/gzipParser.java +++ b/source/de/anomic/plasma/parser/gzip/gzipParser.java @@ -83,7 +83,7 @@ public class gzipParser extends AbstractParser implements Parser { return SUPPORTED_MIME_TYPES; } - public plasmaParserDocument parse(URL location, String mimeType, InputStream source) throws ParserException { + public plasmaParserDocument parse(URL location, String mimeType, InputStream source) throws ParserException, InterruptedException { File tempFile = null; try { @@ -105,10 +105,14 @@ public class gzipParser extends AbstractParser implements Parser { zippedContent.close(); out.close(); + // check for interruption + checkInterruption(); + // creating a new parser class to parse the unzipped content plasmaParser theParser = new plasmaParser(); return theParser.parseSource(location,null,tempFile); - } catch (Exception e) { + } catch (Exception e) { + if (e instanceof InterruptedException) throw (InterruptedException) e; throw new ParserException("Unable to parse the gzip content. " + e.getMessage()); } finally { if (tempFile != null) tempFile.delete(); diff --git a/source/de/anomic/plasma/parser/mimeType/mimeTypeParser.java b/source/de/anomic/plasma/parser/mimeType/mimeTypeParser.java index 2e83f1622..738018dd2 100644 --- a/source/de/anomic/plasma/parser/mimeType/mimeTypeParser.java +++ b/source/de/anomic/plasma/parser/mimeType/mimeTypeParser.java @@ -120,12 +120,12 @@ implements Parser { return mimeType; } } catch (Exception e) { - + /* ignore this */ } return null; } - public plasmaParserDocument parse(URL location, String mimeType, File sourceFile) throws ParserException { + public plasmaParserDocument parse(URL location, String mimeType, File sourceFile) throws ParserException, InterruptedException { String orgMimeType = mimeType; @@ -162,13 +162,18 @@ implements Parser { // to avoid loops we have to test if the mimetype has changed ... if (this.getSupportedMimeTypes().containsKey(mimeType)) return null; if (orgMimeType.equals(mimeType)) return null; + + // check for interruption + checkInterruption(); + // parsing the content using the determined mimetype plasmaParser theParser = new plasmaParser(); return theParser.parseSource(location,mimeType,sourceFile); } return null; } catch (Exception e) { + if (e instanceof InterruptedException) throw (InterruptedException) e; return null; } finally { Integer loopDepth = (Integer) threadLoopDetection.get(Thread.currentThread()); diff --git a/source/de/anomic/plasma/parser/odt/odtParser.java b/source/de/anomic/plasma/parser/odt/odtParser.java index a9a93f63b..5089bf6a5 100644 --- a/source/de/anomic/plasma/parser/odt/odtParser.java +++ b/source/de/anomic/plasma/parser/odt/odtParser.java @@ -91,7 +91,7 @@ public class odtParser extends AbstractParser implements Parser { return SUPPORTED_MIME_TYPES; } - public plasmaParserDocument parse(URL location, String mimeType, File dest) throws ParserException { + public plasmaParserDocument parse(URL location, String mimeType, File dest) throws ParserException, InterruptedException { try { byte[] docContent = null; @@ -106,6 +106,10 @@ public class odtParser extends AbstractParser implements Parser { // looping through all containing files while (zipEnum.hasMoreElements()) { + // check for interruption + checkInterruption(); + + // getting the next zip file entry ZipEntry zipEntry= (ZipEntry) zipEnum.nextElement(); String entryName = zipEntry.getName(); @@ -129,7 +133,7 @@ public class odtParser extends AbstractParser implements Parser { if (docLongTitle == null) { if (docShortTitle != null) { docLongTitle = docShortTitle; - } else if (docContent.length <= 80) { + } else if (docContent != null && docContent.length <= 80) { docLongTitle = new String(docContent, "UTF-8"); } else { byte[] title = new byte[80]; @@ -157,7 +161,7 @@ public class odtParser extends AbstractParser implements Parser { null, null); } catch (Exception e) { - e.printStackTrace(); + if (e instanceof InterruptedException) throw (InterruptedException) e; throw new ParserException("Unable to parse the odt content. " + e.getMessage()); } catch (Error e) { throw new ParserException("Unable to parse the odt content. " + e.getMessage()); diff --git a/source/de/anomic/plasma/parser/pdf/pdfParser.java b/source/de/anomic/plasma/parser/pdf/pdfParser.java index 00a4d0988..c513aee76 100644 --- a/source/de/anomic/plasma/parser/pdf/pdfParser.java +++ b/source/de/anomic/plasma/parser/pdf/pdfParser.java @@ -85,7 +85,7 @@ public class pdfParser extends AbstractParser implements Parser { return SUPPORTED_MIME_TYPES; } - public plasmaParserDocument parse(URL location, String mimeType, InputStream source) throws ParserException { + public plasmaParserDocument parse(URL location, String mimeType, InputStream source) throws ParserException, InterruptedException { PDDocument theDocument = null; @@ -100,9 +100,17 @@ public class pdfParser extends AbstractParser implements Parser { String docTitle = null, docSubject = null, /*docAuthor = null,*/ docKeyWords = null; + // check for interruption + checkInterruption(); + + // creating a pdf parser PDFParser parser = new PDFParser(source); parser.parse(); + + // check for interruption + checkInterruption(); + // creating a text stripper PDFTextStripper stripper = new PDFTextStripper(); theDocument = parser.getPDDocument(); @@ -155,7 +163,8 @@ public class pdfParser extends AbstractParser implements Parser { return theDoc; } - catch (Exception e) { + catch (Exception e) { + if (e instanceof InterruptedException) throw (InterruptedException) e; throw new ParserException("Unable to parse the pdf content. " + e.getMessage(),e); } finally { if (theDocument != null) try { theDocument.close(); } catch (Exception e) {} diff --git a/source/de/anomic/plasma/parser/rpm/rpmParser.java b/source/de/anomic/plasma/parser/rpm/rpmParser.java index 864b051cf..5070007bf 100644 --- a/source/de/anomic/plasma/parser/rpm/rpmParser.java +++ b/source/de/anomic/plasma/parser/rpm/rpmParser.java @@ -105,7 +105,7 @@ public class rpmParser extends AbstractParser implements Parser { } } - public plasmaParserDocument parse(URL location, String mimeType, File sourceFile) throws ParserException { + public plasmaParserDocument parse(URL location, String mimeType, File sourceFile) throws ParserException, InterruptedException { RPMFile rpmFile = null; try { String summary = null, description = null, name = sourceFile.getName(); @@ -121,6 +121,10 @@ public class rpmParser extends AbstractParser implements Parser { // getting all header names String[] headerNames = rpmFile.getTagNames(); for (int i=0; i-1)?entryName.substring(0,idx):entryName), (entryExt.length()>0)?"."+entryExt:entryExt); serverFileUtils.write(buf, tempFile); - // parsing the content + // check for interruption + checkInterruption(); + // parsing the content theDoc = theParser.parseSource(new URL(tempFile),entryMime,tempFile); } finally { if (tempFile != null) try {tempFile.delete(); } catch(Exception ex){} @@ -194,7 +197,8 @@ public class tarParser extends AbstractParser implements Parser { docText.toByteArray(), docAnchors, docImages); - } catch (Exception e) { + } catch (Exception e) { + if (e instanceof InterruptedException) throw (InterruptedException) e; throw new ParserException("Unable to parse the zip content. " + e.getMessage()); } } diff --git a/source/de/anomic/plasma/parser/vcf/vcfParser.java b/source/de/anomic/plasma/parser/vcf/vcfParser.java index 549adc022..ecd16d8be 100644 --- a/source/de/anomic/plasma/parser/vcf/vcfParser.java +++ b/source/de/anomic/plasma/parser/vcf/vcfParser.java @@ -97,7 +97,7 @@ public class vcfParser extends AbstractParser implements Parser { return SUPPORTED_MIME_TYPES; } - public plasmaParserDocument parse(URL location, String mimeType, InputStream source) throws ParserException { + public plasmaParserDocument parse(URL location, String mimeType, InputStream source) throws ParserException, InterruptedException { try { StringBuffer parsedTitle = new StringBuffer(); @@ -111,6 +111,10 @@ public class vcfParser extends AbstractParser implements Parser { String line = null; BufferedReader inputReader = new BufferedReader(new InputStreamReader(source)); while (true) { + // check for interruption + checkInterruption(); + + // getting the next line if (!useLastLine) { line = inputReader.readLine(); } else { @@ -244,7 +248,8 @@ public class vcfParser extends AbstractParser implements Parser { anchors, null); return theDoc; - } catch (Exception e) { + } catch (Exception e) { + if (e instanceof InterruptedException) throw (InterruptedException) e; throw new ParserException("Unable to parse the vcard content. " + e.getMessage()); } finally { } diff --git a/source/de/anomic/plasma/parser/zip/zipParser.java b/source/de/anomic/plasma/parser/zip/zipParser.java index 742ef68a2..f1da328c7 100644 --- a/source/de/anomic/plasma/parser/zip/zipParser.java +++ b/source/de/anomic/plasma/parser/zip/zipParser.java @@ -91,7 +91,7 @@ public class zipParser extends AbstractParser implements Parser { return SUPPORTED_MIME_TYPES; } - public plasmaParserDocument parse(URL location, String mimeType, InputStream source) throws ParserException { + public plasmaParserDocument parse(URL location, String mimeType, InputStream source) throws ParserException, InterruptedException { try { StringBuffer docKeywords = new StringBuffer(); @@ -110,7 +110,7 @@ public class zipParser extends AbstractParser implements Parser { ZipEntry entry; ZipInputStream zippedContent = new ZipInputStream(source); while ((entry = zippedContent.getNextEntry()) !=null) { - + // skip directories if (entry.isDirectory()) continue; // Get the entry name @@ -128,6 +128,9 @@ public class zipParser extends AbstractParser implements Parser { bos.write(buf); byte[] ut = bos.toByteArray(); + // check for interruption + checkInterruption(); + // parsing the content plasmaParserDocument theDoc = theParser.parseSource(location,entryMime,ut); if (theDoc == null) continue; @@ -170,7 +173,8 @@ public class zipParser extends AbstractParser implements Parser { docText.toByteArray(), docAnchors, docImages); - } catch (Exception e) { + } catch (Exception e) { + if (e instanceof InterruptedException) throw (InterruptedException) e; throw new ParserException("Unable to parse the zip content. " + e.getMessage()); } catch (Error e) { throw new ParserException("Unable to parse the zip content. " + e.getMessage()); diff --git a/source/de/anomic/plasma/plasmaCrawlStacker.java b/source/de/anomic/plasma/plasmaCrawlStacker.java index 627cdc44e..5af0710ad 100644 --- a/source/de/anomic/plasma/plasmaCrawlStacker.java +++ b/source/de/anomic/plasma/plasmaCrawlStacker.java @@ -69,6 +69,7 @@ import de.anomic.kelondro.kelondroTree; import de.anomic.plasma.plasmaCrawlEURL; import de.anomic.plasma.urlPattern.plasmaURLPattern; import de.anomic.server.serverSemaphore; +import de.anomic.server.serverThread; import de.anomic.server.logging.serverLog; import de.anomic.tools.bitfield; import de.anomic.yacy.yacyCore; @@ -168,16 +169,16 @@ public final class plasmaCrawlStacker { public long[] cacheObjectStatus() { return this.queue.cacheObjectStatus(); - } + } public void job() { try { // getting a new message from the crawler queue - if (Thread.currentThread().isInterrupted()) return; + checkInterruption(); stackCrawlMessage theMsg = this.queue.waitForMessage(); // getting a free session thread from the pool - if (Thread.currentThread().isInterrupted()) return; + checkInterruption(); Worker worker = (Worker) this.theWorkerPool.borrowObject(); // processing the new request @@ -221,7 +222,7 @@ public final class plasmaCrawlStacker { } } - public String dequeue(stackCrawlMessage theMsg) { + public String dequeue(stackCrawlMessage theMsg) throws InterruptedException { plasmaCrawlProfile.entry profile = this.sb.profiles.getEntry(theMsg.profileHandle()); if (profile == null) { @@ -240,7 +241,12 @@ public final class plasmaCrawlStacker { profile); } - public String stackCrawl(String nexturlString, String referrerString, String initiatorHash, String name, Date loadDate, int currentdepth, plasmaCrawlProfile.entry profile) { + public void checkInterruption() throws InterruptedException { + Thread curThread = Thread.currentThread(); + if (curThread.isInterrupted()) throw new InterruptedException("Shutdown in progress ..."); + } + + public String stackCrawl(String nexturlString, String referrerString, String initiatorHash, String name, Date loadDate, int currentdepth, plasmaCrawlProfile.entry profile) throws InterruptedException { // stacks a crawl item. The position can also be remote // returns null if successful, a reason string if not successful this.log.logFinest("stackCrawl: nexturlString='" + nexturlString + "'"); @@ -254,15 +260,12 @@ public final class plasmaCrawlStacker { this.log.logSevere("Wrong URL in stackCrawl: url=null"); return reason; } - /* - if (profile == null) { - reason = "denied_(profile_null)"; - log.logError("Wrong Profile for stackCrawl: profile=null"); - return reason; - } - */ - URL nexturl = null, referrerURL = null; + + // getting the initiator peer hash if ((initiatorHash == null) || (initiatorHash.length() == 0)) initiatorHash = indexURL.dummyHash; + + // getting the referer url and url hash + URL nexturl = null, referrerURL = null; if (referrerString != null) { try { referrerURL = new URL(referrerString); @@ -272,6 +275,8 @@ public final class plasmaCrawlStacker { } } String referrerHash = (referrerString==null)?null:indexURL.urlHash(referrerString); + + // check for malformed urls try { nexturl = new URL(nexturlString); } catch (MalformedURLException e) { @@ -282,6 +287,7 @@ public final class plasmaCrawlStacker { } // check if ip is local ip address + checkInterruption(); InetAddress hostAddress = httpc.dnsResolve(nexturl.getHost()); if (hostAddress == null) { reason = plasmaCrawlEURL.DENIED_UNKNOWN_HOST; @@ -301,6 +307,7 @@ public final class plasmaCrawlStacker { } // check blacklist + checkInterruption(); if (plasmaSwitchboard.urlBlacklist.isListed(plasmaURLPattern.BLACKLIST_CRAWLER,nexturl)) { reason = plasmaCrawlEURL.DENIED_URL_IN_BLACKLIST; this.log.logFine("URL '" + nexturlString + "' is in blacklist. " + @@ -311,9 +318,7 @@ public final class plasmaCrawlStacker { // filter deny if ((currentdepth > 0) && (profile != null) && (!(nexturlString.matches(profile.generalFilter())))) { reason = plasmaCrawlEURL.DENIED_URL_DOES_NOT_MATCH_FILTER; - /* - urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, - name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ + this.log.logFine("URL '" + nexturlString + "' does not match crawling filter '" + profile.generalFilter() + "'. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; @@ -322,9 +327,7 @@ public final class plasmaCrawlStacker { // deny cgi if (plasmaHTCache.isCGI(nexturlString)) { reason = plasmaCrawlEURL.DENIED_CGI_URL; - /* - urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, - name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ + this.log.logFine("URL '" + nexturlString + "' is CGI URL. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; @@ -333,9 +336,7 @@ public final class plasmaCrawlStacker { // deny post properties if ((plasmaHTCache.isPOST(nexturlString)) && (profile != null) && (!(profile.crawlingQ()))) { reason = plasmaCrawlEURL.DENIED_POST_URL; - /* - urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, - name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ + this.log.logFine("URL '" + nexturlString + "' is post URL. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; @@ -362,6 +363,8 @@ public final class plasmaCrawlStacker { return reason; } + // check if the url is double registered + checkInterruption(); String nexturlhash = indexURL.urlHash(nexturl); String dbocc = this.sb.urlPool.exists(nexturlhash); plasmaCrawlLURL.Entry oldEntry = null; @@ -372,20 +375,17 @@ public final class plasmaCrawlStacker { (((System.currentTimeMillis() - oldEntry.loaddate().getTime()) / 60000) > profile.recrawlIfOlder()); if ((dbocc != null) && (!(recrawl))) { reason = plasmaCrawlEURL.DOUBLE_REGISTERED + dbocc + ")"; - /* - urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, - name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ + this.log.logFine("URL '" + nexturlString + "' is double registered in '" + dbocc + "'. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; } // checking robots.txt + checkInterruption(); if (robotsParser.isDisallowed(nexturl)) { reason = plasmaCrawlEURL.DENIED_ROBOTS_TXT; - /* - urlPool.errorURL.newEntry(nexturl, referrerHash, initiatorHash, yacyCore.seedDB.mySeed.hash, - name, reason, new bitfield(plasmaURL.urlFlagLength), false);*/ + this.log.logFine("Crawling of URL '" + nexturlString + "' disallowed by robots.txt. " + "Stack processing time: " + (System.currentTimeMillis()-startTime) + "ms"); return reason; @@ -413,6 +413,8 @@ public final class plasmaCrawlStacker { this.log.logSevere("URL '" + nexturlString + "' can neither be crawled local nor global."); } + // add the url into the crawling queue + checkInterruption(); plasmaCrawlNURL.Entry ne = this.sb.urlPool.noticeURL.newEntry(initiatorHash, /* initiator, needed for p2p-feedback */ nexturl, /* url clear text string */ loadDate, /* load date */ @@ -998,11 +1000,15 @@ public final class plasmaCrawlStacker { } } - private void execute() { + private void execute() throws InterruptedException { try { this.setName("stackCrawlThread_" + this.theMsg.url); String rejectReason = dequeue(this.theMsg); + // check for interruption + checkInterruption(); + + // if the url was rejected we store it into the error URL db if (rejectReason != null) { plasmaCrawlEURL.Entry ee = sb.urlPool.errorURL.newEntry( new URL(this.theMsg.url()), @@ -1017,6 +1023,7 @@ public final class plasmaCrawlStacker { sb.urlPool.errorURL.stackPushEntry(ee); } } catch (Exception e) { + if (e instanceof InterruptedException) throw (InterruptedException) e; plasmaCrawlStacker.this.log.logWarning("Error while processing stackCrawl entry.\n" + "Entry: " + this.theMsg.toString() + "Error: " + e.toString(),e); diff --git a/source/de/anomic/plasma/plasmaDHTChunk.java b/source/de/anomic/plasma/plasmaDHTChunk.java index e67dc47bb..9f952030d 100644 --- a/source/de/anomic/plasma/plasmaDHTChunk.java +++ b/source/de/anomic/plasma/plasmaDHTChunk.java @@ -122,33 +122,41 @@ public class plasmaDHTChunk { } public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount) { - this.log = log; - this.wordIndex = wordIndex; - this.lurls = lurls; - startPointHash = selectTransferStart(); - log.logFine("Selected hash " + startPointHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash)); - selectTransferContainers(startPointHash, minCount, maxCount); + try { + this.log = log; + this.wordIndex = wordIndex; + this.lurls = lurls; + this.startPointHash = selectTransferStart(); + log.logFine("Selected hash " + this.startPointHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, this.startPointHash)); + selectTransferContainers(this.startPointHash, minCount, maxCount); - // count the indexes, can be smaller as expected - this.idxCount = indexCounter(); - if (idxCount < minCount) { - log.logFine("Too few (" + idxCount + ") indexes selected for transfer."); - this.status = chunkStatus_FAILED; + // count the indexes, can be smaller as expected + this.idxCount = indexCounter(); + if (this.idxCount < minCount) { + log.logFine("Too few (" + this.idxCount + ") indexes selected for transfer."); + this.status = chunkStatus_FAILED; + } + } catch (InterruptedException e) { + this.status = chunkStatus_INTERRUPTED; } } public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount, String startHash) { - this.log = log; - this.wordIndex = wordIndex; - this.lurls = lurls; - log.logFine("Demanded hash " + startHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash)); - selectTransferContainers(startHash, minCount, maxCount); + try { + this.log = log; + this.wordIndex = wordIndex; + this.lurls = lurls; + log.logFine("Demanded hash " + startHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, this.startPointHash)); + selectTransferContainers(startHash, minCount, maxCount); - // count the indexes, can be smaller as expected - this.idxCount = indexCounter(); - if (idxCount < minCount) { - log.logFine("Too few (" + idxCount + ") indexes selected for transfer."); - this.status = chunkStatus_FAILED; + // count the indexes, can be smaller as expected + this.idxCount = indexCounter(); + if (this.idxCount < minCount) { + log.logFine("Too few (" + this.idxCount + ") indexes selected for transfer."); + this.status = chunkStatus_FAILED; + } + } catch (InterruptedException e) { + this.status = chunkStatus_INTERRUPTED; } } @@ -167,7 +175,7 @@ public class plasmaDHTChunk { return startPointHash; } - private void selectTransferContainers(String hash, int mincount, int maxcount) { + private void selectTransferContainers(String hash, int mincount, int maxcount) throws InterruptedException { try { this.selectionStartTime = System.currentTimeMillis(); int refcountRAM = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount); @@ -183,7 +191,7 @@ public class plasmaDHTChunk { } } - private int selectTransferContainersResource(String hash, int resourceLevel, int maxcount) { + private int selectTransferContainersResource(String hash, int resourceLevel, int maxcount) throws InterruptedException { // the hash is a start hash from where the indexes are picked ArrayList tmpContainers = new ArrayList(maxcount); try { @@ -198,8 +206,19 @@ public class plasmaDHTChunk { urlCache = new HashMap(); double maximumDistance = ((double) peerRedundancy * 2) / ((double) yacyCore.seedDB.sizeConnected()); - while ((maxcount > refcount) && (indexContainerIterator.hasNext()) && ((container = (indexContainer) indexContainerIterator.next()) != null) && (container.size() > 0) - && ((tmpContainers.size() == 0) || (yacyDHTAction.dhtDistance(container.getWordHash(), ((indexContainer) tmpContainers.get(0)).getWordHash()) < maximumDistance))) { + while ( + (maxcount > refcount) && + (indexContainerIterator.hasNext()) && + ((container = (indexContainer) indexContainerIterator.next()) != null) && + (container.size() > 0) && + ( + (tmpContainers.size() == 0) || + (yacyDHTAction.dhtDistance(container.getWordHash(), ((indexContainer) tmpContainers.get(0)).getWordHash()) < maximumDistance) + ) + ) { + // check for interruption + if (Thread.currentThread().isInterrupted()) throw new InterruptedException("Shutdown in progress"); + // make an on-the-fly entity and insert values int notBoundCounter = 0; try { diff --git a/source/de/anomic/plasma/plasmaParser.java b/source/de/anomic/plasma/plasmaParser.java index bd085f096..2c921a78b 100644 --- a/source/de/anomic/plasma/plasmaParser.java +++ b/source/de/anomic/plasma/plasmaParser.java @@ -203,6 +203,10 @@ public final class plasmaParser { private serverLog theLogger = new serverLog("PARSER"); + public serverLog getLogger() { + return this.theLogger; + } + public static HashMap getParserConfigList() { return parserConfigList; } @@ -461,13 +465,14 @@ public final class plasmaParser { } catch (Exception e) { } } - public plasmaParserDocument parseSource(URL location, String mimeType, byte[] source) { + public plasmaParserDocument parseSource(URL location, String mimeType, byte[] source) throws InterruptedException { File tempFile = null; try { tempFile = File.createTempFile("parseSource", ".tmp"); serverFileUtils.write(source, tempFile); return parseSource(location, mimeType, tempFile); } catch (Exception e) { + if (e instanceof InterruptedException) throw (InterruptedException) e; serverLog.logSevere("PARSER", "parseSource1: " + e.getMessage(), e); return null; } finally { @@ -476,7 +481,7 @@ public final class plasmaParser { } - public plasmaParserDocument parseSource(URL location, String mimeType, File sourceFile) { + public plasmaParserDocument parseSource(URL location, String mimeType, File sourceFile) throws InterruptedException { Parser theParser = null; try { @@ -554,6 +559,7 @@ public final class plasmaParser { return null; } } catch (Exception e) { + if (e instanceof InterruptedException) throw (InterruptedException) e; serverLog.logSevere("PARSER", "parseSource2: " + e.getMessage(), e); return null; } finally { diff --git a/source/de/anomic/plasma/plasmaSnippetCache.java b/source/de/anomic/plasma/plasmaSnippetCache.java index f354db86b..d4a2efd69 100644 --- a/source/de/anomic/plasma/plasmaSnippetCache.java +++ b/source/de/anomic/plasma/plasmaSnippetCache.java @@ -359,43 +359,46 @@ public class plasmaSnippetCache { } public plasmaParserDocument parseDocument(URL url, byte[] resource, httpHeader header) { - if (resource == null) return null; - - if (header == null) { - try { - header = this.cacheManager.getCachedResponse(indexURL.urlHash(url)); - } catch (IOException e) {} - } - - if (header == null) { - String filename = this.cacheManager.getCachePath(url).getName(); - int p = filename.lastIndexOf('.'); - if ( // if no extension is available - (p < 0) || - // or the extension is supported by one of the parsers - ((p >= 0) && (plasmaParser.supportedFileExtContains(filename.substring(p + 1)))) - ) { - String supposedMime = "text/html"; - - // if the mimeType Parser is installed we can set the mimeType to null to force - // a mimetype detection - if (plasmaParser.supportedMimeTypesContains("application/octet-stream")) { - supposedMime = null; - } else if (p != -1){ - // otherwise we try to determine the mimeType per file Extension - supposedMime = plasmaParser.getMimeTypeByFileExt(filename.substring(p + 1)); - } + try { + if (resource == null) return null; + + // try to get the header from the htcache directory + if (header == null) { + try { + header = this.cacheManager.getCachedResponse(indexURL.urlHash(url)); + } catch (IOException e) {} + } - return this.parser.parseSource(url, supposedMime, resource); - } else { + if (header == null) { + String filename = this.cacheManager.getCachePath(url).getName(); + int p = filename.lastIndexOf('.'); + if ( // if no extension is available + (p < 0) || + // or the extension is supported by one of the parsers + ((p >= 0) && (plasmaParser.supportedFileExtContains(filename.substring(p + 1)))) + ) { + String supposedMime = "text/html"; + + // if the mimeType Parser is installed we can set the mimeType to null to force + // a mimetype detection + if (plasmaParser.supportedMimeTypesContains("application/octet-stream")) { + supposedMime = null; + } else if (p != -1){ + // otherwise we try to determine the mimeType per file Extension + supposedMime = plasmaParser.getMimeTypeByFileExt(filename.substring(p + 1)); + } + + return this.parser.parseSource(url, supposedMime, resource); + } return null; } - } else { if (plasmaParser.supportedMimeTypesContains(header.mime())) { return this.parser.parseSource(url, header.mime(), resource); - } else { - return null; } + return null; + } catch (InterruptedException e) { + // interruption of thread detected + return null; } } diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index c7a5cb17c..67cf41cf4 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -122,7 +122,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.TreeSet; -import java.util.logging.Level; import de.anomic.data.blogBoard; import de.anomic.data.bookmarksDB; @@ -155,6 +154,7 @@ import de.anomic.server.serverObjects; import de.anomic.server.serverSemaphore; import de.anomic.server.serverSwitch; import de.anomic.server.serverFileUtils; +import de.anomic.server.serverThread; import de.anomic.server.logging.serverLog; import de.anomic.tools.bitfield; import de.anomic.tools.crypt; @@ -176,6 +176,21 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser private int dhtTransferIndexCount = 50; + // we must distinguish the following cases: resource-load was initiated by + // 1) global crawling: the index is extern, not here (not possible here) + // 2) result of search queries, some indexes are here (not possible here) + // 3) result of index transfer, some of them are here (not possible here) + // 4) proxy-load (initiator is "------------") + // 5) local prefetch/crawling (initiator is own seedHash) + // 6) local fetching for global crawling (other known or unknwon initiator) + public static final int PROCESSCASE_0_UNKNOWN = 0; + public static final int PROCESSCASE_1_GLOBAL_CRAWLING = 1; + public static final int PROCESSCASE_2_SEARCH_QUERY_RESULT = 2; + public static final int PROCESSCASE_3_INDEX_TRANSFER_RESULT = 3; + public static final int PROCESSCASE_4_PROXY_LOAD = 4; + public static final int PROCESSCASE_5_LOCAL_CRAWLING = 5; + public static final int PROCESSCASE_6_GLOBAL_CRAWLING = 6; + // couloured list management public static TreeSet badwords = null; public static TreeSet blueList = null; @@ -958,92 +973,106 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser } public boolean deQueue() { - // work off fresh entries from the proxy or from the crawler - if (onlineCaution()) { - log.logFine("deQueue: online caution, omitting resource stack processing"); - return false; - } + try { + // work off fresh entries from the proxy or from the crawler + if (onlineCaution()) { + log.logFine("deQueue: online caution, omitting resource stack processing"); + return false; + } - // flush some entries from the RAM cache - // (new permanent cache flushing) - wordIndex.flushCacheSome(sbQueue.size() != 0); - - boolean doneSomething = false; - - // possibly delete entries from last chunk - if ((this.dhtTransferChunk != null) && - (this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE)) { - int deletedURLs = this.dhtTransferChunk.deleteTransferIndexes(); - this.log.logFine("Deleted from " + this.dhtTransferChunk.containers().length + " transferred RWIs locally, removed " + deletedURLs + " URL references"); - this.dhtTransferChunk = null; - } - - // generate a dht chunk - if ((dhtShallTransfer() == null) && - ((this.dhtTransferChunk == null) || - (this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_UNDEFINED) || - // (this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) || - (this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_FAILED))) { - // generate new chunk - int minChunkSize = (int) getConfigLong("indexDistribution.minChunkSize", 30); - dhtTransferChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, minChunkSize, dhtTransferIndexCount); - doneSomething = true; - } + // flush some entries from the RAM cache + // (new permanent cache flushing) + wordIndex.flushCacheSome(sbQueue.size() != 0); - synchronized (sbQueue) { + boolean doneSomething = false; - if (sbQueue.size() == 0) { - //log.logFine("deQueue: nothing to do, queue is emtpy"); - return doneSomething; // nothing to do + // possibly delete entries from last chunk + if ((this.dhtTransferChunk != null) && + (this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE)) { + int deletedURLs = this.dhtTransferChunk.deleteTransferIndexes(); + this.log.logFine("Deleted from " + this.dhtTransferChunk.containers().length + " transferred RWIs locally, removed " + deletedURLs + " URL references"); + this.dhtTransferChunk = null; } - /* - if (wordIndex.wordCacheRAMSize() + 1000 > (int) getConfigLong("wordCacheMaxLow", 8000)) { - log.logFine("deQueue: word index ram cache too full (" + ((int) getConfigLong("wordCacheMaxLow", 8000) - wordIndex.wordCacheRAMSize()) + " slots left); dismissed to omit ram flush lock"); - return false; - } - */ - - int stackCrawlQueueSize; - if ((stackCrawlQueueSize = sbStackCrawlThread.size()) >= stackCrawlSlots) { - log.logFine("deQueue: too many processes in stack crawl thread queue, dismissed to protect emergency case (" + "stackCrawlQueue=" + stackCrawlQueueSize + ")"); - return doneSomething; + // generate a dht chunk + if ( + (dhtShallTransfer() == null) && + ( + (this.dhtTransferChunk == null) || + (this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_UNDEFINED) || + // (this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) || + (this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_FAILED) + ) + ) { + // generate new chunk + int minChunkSize = (int) getConfigLong("indexDistribution.minChunkSize", 30); + dhtTransferChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, minChunkSize, dhtTransferIndexCount); + doneSomething = true; } - plasmaSwitchboardQueue.Entry nextentry; - - // if we were interrupted we should return now - if (Thread.currentThread().isInterrupted()) { - log.logFine("deQueue: thread was interrupted"); - return false; - } - - // do one processing step - log.logFine("DEQUEUE: sbQueueSize=" + sbQueue.size() + - ", coreStackSize=" + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE) + - ", limitStackSize=" + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_LIMIT) + - ", overhangStackSize=" + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_OVERHANG) + - ", remoteStackSize=" + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_REMOTE)); - try { - nextentry = sbQueue.pop(); - if (nextentry == null) { - log.logFine("deQueue: null entry on queue stack"); + // check for interruption + checkInterruption(); + + // getting the next entry from the indexing queue + synchronized (sbQueue) { + + if (sbQueue.size() == 0) { + //log.logFine("deQueue: nothing to do, queue is emtpy"); + return doneSomething; // nothing to do + } + + /* + if (wordIndex.wordCacheRAMSize() + 1000 > (int) getConfigLong("wordCacheMaxLow", 8000)) { + log.logFine("deQueue: word index ram cache too full (" + ((int) getConfigLong("wordCacheMaxLow", 8000) - wordIndex.wordCacheRAMSize()) + " slots left); dismissed to omit ram flush lock"); return false; } - } catch (IOException e) { - log.logSevere("IOError in plasmaSwitchboard.deQueue: " + e.getMessage(), e); - return doneSomething; - } - - synchronized (this.indexingTasksInProcess) { - this.indexingTasksInProcess.put(nextentry.urlHash(), nextentry); + */ + + int stackCrawlQueueSize; + if ((stackCrawlQueueSize = sbStackCrawlThread.size()) >= stackCrawlSlots) { + log.logFine("deQueue: too many processes in stack crawl thread queue, dismissed to protect emergency case (" + "stackCrawlQueue=" + stackCrawlQueueSize + ")"); + return doneSomething; + } + + plasmaSwitchboardQueue.Entry nextentry; + + // if we were interrupted we should return now + if (Thread.currentThread().isInterrupted()) { + log.logFine("deQueue: thread was interrupted"); + return false; + } + + // do one processing step + log.logFine("DEQUEUE: sbQueueSize=" + sbQueue.size() + + ", coreStackSize=" + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE) + + ", limitStackSize=" + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_LIMIT) + + ", overhangStackSize=" + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_OVERHANG) + + ", remoteStackSize=" + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_REMOTE)); + try { + nextentry = sbQueue.pop(); + if (nextentry == null) { + log.logFine("deQueue: null entry on queue stack"); + return false; + } + } catch (IOException e) { + log.logSevere("IOError in plasmaSwitchboard.deQueue: " + e.getMessage(), e); + return doneSomething; + } + + synchronized (this.indexingTasksInProcess) { + this.indexingTasksInProcess.put(nextentry.urlHash(), nextentry); + } + + // parse and index the resource + processResourceStack(nextentry); } - processResourceStack(nextentry); + // ready & finished + return true; + } catch (InterruptedException e) { + log.logInfo("DEQUEUE: Shutdown detected."); + return false; } - - // ready & finished - return true; } public int cleanupJobSize() { @@ -1350,7 +1379,44 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser } } - private void processResourceStack(plasmaSwitchboardQueue.Entry entry) { + private plasmaParserDocument parseResource(plasmaSwitchboardQueue.Entry entry, String initiatorHash) throws InterruptedException { + plasmaParserDocument document = null; + + // the http header that belongs to this entry + httpHeader entryRespHeader = entry.responseHeader(); + + // the mimetype of this entry + String mimeType = (entryRespHeader == null)?null:entryRespHeader.mime(); + + // the parser logger + serverLog parserLogger = parser.getLogger(); + + // if the document content is supported we can start to parse the content + if (plasmaParser.supportedContent( + entry.url(), + mimeType) + ){ + if ((entry.cacheFile().exists()) && (entry.cacheFile().length() > 0)) { + parserLogger.logFine("'" + entry.normalizedURLString() + "' is not parsed yet, parsing now from File"); + document = parser.parseSource(entry.url(), mimeType, entry.cacheFile()); + } else { + parserLogger.logFine("'" + entry.normalizedURLString() + "' cannot be parsed, no resource available"); + addURLtoErrorDB(entry.url(), entry.referrerHash(), initiatorHash, entry.anchorName(), plasmaCrawlEURL.DENIED_NOT_PARSEABLE_NO_CONTENT, new bitfield(indexURL.urlFlagLength)); + } + if (document == null) { + parserLogger.logSevere("'" + entry.normalizedURLString() + "' parse failure"); + addURLtoErrorDB(entry.url(), entry.referrerHash(), initiatorHash, entry.anchorName(), plasmaCrawlEURL.DENIED_PARSER_ERROR, new bitfield(indexURL.urlFlagLength)); + } + } else { + parserLogger.logFine("'" + entry.normalizedURLString() + "'. Unsupported mimeType '" + ((mimeType == null) ? "null" : mimeType) + "'."); + addURLtoErrorDB(entry.url(), entry.referrerHash(), initiatorHash, entry.anchorName(), plasmaCrawlEURL.DENIED_WRONG_MIMETYPE_OR_EXT, new bitfield(indexURL.urlFlagLength)); + } + + checkInterruption(); + return document; + } + + private void processResourceStack(plasmaSwitchboardQueue.Entry entry) throws InterruptedException { try { // work off one stack entry with a fresh resource long stackStartTime = 0, stackEndTime = 0, @@ -1365,58 +1431,41 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser // 4) proxy-load (initiator is "------------") // 5) local prefetch/crawling (initiator is own seedHash) // 6) local fetching for global crawling (other known or unknwon initiator) - int processCase = 0; - yacySeed initiator = null; - String initiatorHash = (entry.proxy()) ? indexURL.dummyHash : entry.initiator(); - if (initiatorHash.equals(indexURL.dummyHash)) { + int processCase = PROCESSCASE_0_UNKNOWN; + yacySeed initiatorPeer = null; + String initiatorPeerHash = (entry.proxy()) ? indexURL.dummyHash : entry.initiator(); + if (initiatorPeerHash.equals(indexURL.dummyHash)) { // proxy-load - processCase = 4; - } else if (initiatorHash.equals(yacyCore.seedDB.mySeed.hash)) { + processCase = PROCESSCASE_4_PROXY_LOAD; + } else if (initiatorPeerHash.equals(yacyCore.seedDB.mySeed.hash)) { // normal crawling - processCase = 5; + processCase = PROCESSCASE_5_LOCAL_CRAWLING; } else { // this was done for remote peer (a global crawl) - initiator = yacyCore.seedDB.getConnected(initiatorHash); - processCase = 6; + initiatorPeer = yacyCore.seedDB.getConnected(initiatorPeerHash); + processCase = PROCESSCASE_6_GLOBAL_CRAWLING; } log.logFine("processResourceStack processCase=" + processCase + ", depth=" + entry.depth() + ", maxDepth=" + ((entry.profile() == null) ? "null" : Integer.toString(entry.profile().generalDepth())) + ", filter=" + ((entry.profile() == null) ? "null" : entry.profile().generalFilter()) + - ", initiatorHash=" + initiatorHash + + ", initiatorHash=" + initiatorPeerHash + ", responseHeader=" + ((entry.responseHeader() == null) ? "null" : entry.responseHeader().toString()) + ", url=" + entry.url()); // DEBUG - // parse content - parsingStartTime = System.currentTimeMillis(); + /* ========================================================================= + * PARSE CONTENT + * ========================================================================= */ plasmaParserDocument document = null; - httpHeader entryRespHeader = entry.responseHeader(); - String mimeType = (entryRespHeader == null)?null:entryRespHeader.mime(); - if (plasmaParser.supportedContent( - entry.url(), - mimeType) - ){ - if ((entry.cacheFile().exists()) && (entry.cacheFile().length() > 0)) { - log.logFine("(Parser) '" + entry.normalizedURLString() + "' is not parsed yet, parsing now from File"); - document = parser.parseSource(entry.url(), mimeType, entry.cacheFile()); - } else { - log.logFine("(Parser) '" + entry.normalizedURLString() + "' cannot be parsed, no resource available"); - addURLtoErrorDB(entry.url(), entry.referrerHash(), initiatorHash, entry.anchorName(), plasmaCrawlEURL.DENIED_NOT_PARSEABLE_NO_CONTENT, new bitfield(indexURL.urlFlagLength)); - return; - } - if (document == null) { - log.logSevere("(Parser) '" + entry.normalizedURLString() + "' parse failure"); - addURLtoErrorDB(entry.url(), entry.referrerHash(), initiatorHash, entry.anchorName(), plasmaCrawlEURL.DENIED_PARSER_ERROR, new bitfield(indexURL.urlFlagLength)); - return; - } - } else { - log.logFine("(Parser) '" + entry.normalizedURLString() + "'. Unsupported mimeType '" + ((mimeType == null) ? "null" : mimeType) + "'."); - addURLtoErrorDB(entry.url(), entry.referrerHash(), initiatorHash, entry.anchorName(), plasmaCrawlEURL.DENIED_WRONG_MIMETYPE_OR_EXT, new bitfield(indexURL.urlFlagLength)); - return; - } - parsingEndTime = System.currentTimeMillis(); + parsingStartTime = System.currentTimeMillis(); + + document = this.parseResource(entry, initiatorPeerHash); + if (document == null) return; + + parsingEndTime = System.currentTimeMillis(); + // getting the document date Date docDate = null; if (entry.responseHeader() != null) { docDate = entry.responseHeader().lastModified(); @@ -1424,103 +1473,152 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser } if (docDate == null) docDate = new Date(); - // put anchors on crawl stack + /* ========================================================================= + * put anchors on crawl stack + * ========================================================================= */ stackStartTime = System.currentTimeMillis(); - if (((processCase == 4) || (processCase == 5)) && - ((entry.profile() == null) || (entry.depth() < entry.profile().generalDepth()))) { + if ( + ((processCase == PROCESSCASE_4_PROXY_LOAD) || (processCase == PROCESSCASE_5_LOCAL_CRAWLING)) && + ((entry.profile() == null) || (entry.depth() < entry.profile().generalDepth())) + ) { Map hl = document.getHyperlinks(); Iterator i = hl.entrySet().iterator(); - String nexturlstring; - //String rejectReason; - Map.Entry e; + String nextUrlString; + Map.Entry nextEntry; while (i.hasNext()) { - e = (Map.Entry) i.next(); - nexturlstring = (String) e.getKey(); - try {nexturlstring = new URL(nexturlstring).toNormalform();} catch (MalformedURLException e1) {} + // check for interruption + checkInterruption(); - sbStackCrawlThread.enqueue(nexturlstring, entry.url().toString(), initiatorHash, (String) e.getValue(), docDate, entry.depth() + 1, entry.profile()); - - // rejectReason = stackCrawl(nexturlstring, entry.normalizedURLString(), initiatorHash, (String) e.getValue(), loadDate, entry.depth() + 1, entry.profile()); - // if (rejectReason == null) { c++; } else { - // urlPool.errorURL.newEntry(new URL(nexturlstring), entry.normalizedURLString(), entry.initiator(), yacyCore.seedDB.mySeed.hash, - // (String) e.getValue(), rejectReason, new bitfield(indexURL.urlFlagLength), false); - // } + // fetching the next hyperlink + nextEntry = (Map.Entry) i.next(); + nextUrlString = (String) nextEntry.getKey(); + try { + nextUrlString = new URL(nextUrlString).toNormalform(); + + // enqueue the hyperlink into the pre-notice-url db + sbStackCrawlThread.enqueue(nextUrlString, entry.url().toString(), initiatorPeerHash, (String) nextEntry.getValue(), docDate, entry.depth() + 1, entry.profile()); + } catch (MalformedURLException e1) {} } log.logInfo("CRAWL: ADDED " + hl.size() + " LINKS FROM " + entry.normalizedURLString() + ", NEW CRAWL STACK SIZE IS " + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE)); } stackEndTime = System.currentTimeMillis(); - // create index - String descr = document.getMainLongTitle(); - String referrerHash; + /* ========================================================================= + * CREATE INDEX + * ========================================================================= */ + String docDescription = document.getMainLongTitle(); URL referrerURL = entry.referrerURL(); - referrerHash = indexURL.urlHash(referrerURL); - if (referrerHash == null) referrerHash = indexURL.dummyHash; + String referrerUrlHash = indexURL.urlHash(referrerURL); + if (referrerUrlHash == null) referrerUrlHash = indexURL.dummyHash; String noIndexReason = plasmaCrawlEURL.DENIED_UNSPECIFIED_INDEXING_ERROR; - if (processCase == 4) { + if (processCase == PROCESSCASE_4_PROXY_LOAD) { // proxy-load noIndexReason = entry.shallIndexCacheForProxy(); } else { // normal crawling noIndexReason = entry.shallIndexCacheForCrawler(); } + if (noIndexReason == null) { // strip out words indexingStartTime = System.currentTimeMillis(); + + checkInterruption(); log.logFine("Condensing for '" + entry.normalizedURLString() + "'"); plasmaCondenser condenser = new plasmaCondenser(new ByteArrayInputStream(document.getText())); // generate citation reference Integer[] ioLinks = generateCitationReference(entry.urlHash(), docDate, document, condenser); - //log.logInfo("INDEXING HEADLINE:" + descr); - try { - //log.logDebug("Create LURL-Entry for '" + entry.normalizedURLString() + "', " + - // "responseHeader=" + entry.responseHeader().toString()); + try { + // check for interruption + checkInterruption(); + // create a new loaded URL db entry plasmaCrawlLURL.Entry newEntry = urlPool.loadedURL.newEntry( - entry.url(), descr, docDate, new Date(), - referrerHash, - 0, true, - condenser.RESULT_WORD_ENTROPHY, - indexEntryAttribute.language(entry.url()), - indexEntryAttribute.docType(document.getMimeType()), - (int) entry.size(), - condenser.RESULT_NUMB_WORDS + entry.url(), // URL + docDescription, // document description + docDate, // modification date + new Date(), // loaded date + referrerUrlHash, // referer hash + 0, // copy count + true, // local need + condenser.RESULT_WORD_ENTROPHY, // quality + indexEntryAttribute.language(entry.url()), // language + indexEntryAttribute.docType(document.getMimeType()), // doctype + (int) entry.size(), // size + condenser.RESULT_NUMB_WORDS // word count ); + + /* ======================================================================== + * STORE URL TO LOADED-URL-DB + * ======================================================================== */ newEntry.store(); urlPool.loadedURL.stackEntry( - newEntry, - initiatorHash, - yacyCore.seedDB.mySeed.hash, - processCase - ); - String urlHash = newEntry.hash(); + newEntry, // loaded url db entry + initiatorPeerHash, // initiator peer hash + yacyCore.seedDB.mySeed.hash, // executor peer hash + processCase // process case + ); - if (((processCase == 4) || (processCase == 5) || (processCase == 6)) && (entry.profile().localIndexing())) { - // remove stopwords + // check for interruption + checkInterruption(); + + /* ======================================================================== + * STORE WORD INDEX + * ======================================================================== */ + if ( + ( + (processCase == PROCESSCASE_4_PROXY_LOAD) || + (processCase == PROCESSCASE_5_LOCAL_CRAWLING) || + (processCase == PROCESSCASE_6_GLOBAL_CRAWLING) + ) && + (entry.profile().localIndexing()) + ) { + String urlHash = newEntry.hash(); + + // remove stopwords log.logInfo("Excluded " + condenser.excludeWords(stopwords) + " words in URL " + entry.url()); indexingEndTime = System.currentTimeMillis(); - // do indexing - //log.logDebug("Create Index for '" + entry.normalizedURLString() + "'"); storageStartTime = System.currentTimeMillis(); int words = 0; String storagePeerHash; yacySeed seed; - if (((storagePeerHash = getConfig("storagePeerHash",null))== null) || + + if ( + ((storagePeerHash = getConfig("storagePeerHash",null))== null) || (storagePeerHash.trim().length() == 0) || - ((seed = yacyCore.seedDB.getConnected(storagePeerHash))==null)){ - words = wordIndex.addPageIndex(entry.url(), urlHash, docDate, (int) entry.size(), document, condenser, - indexEntryAttribute.language(entry.url()), indexEntryAttribute.docType(document.getMimeType()), - ioLinks[0].intValue(), ioLinks[1].intValue()); + ((seed = yacyCore.seedDB.getConnected(storagePeerHash))==null) + ){ + + /* ======================================================================== + * STORE PAGE INDEX INTO WORD INDEX DB + * ======================================================================== */ + words = wordIndex.addPageIndex( + entry.url(), // document url + urlHash, // document url hash + docDate, // document mod date + (int) entry.size(), // document size + document, // document content + condenser, // document condenser + indexEntryAttribute.language(entry.url()), // document language + indexEntryAttribute.docType(document.getMimeType()), // document type + ioLinks[0].intValue(), // outlinkSame + ioLinks[1].intValue() // outlinkOthers + ); } else { + /* ======================================================================== + * SEND PAGE INDEX TO STORAGE PEER + * ======================================================================== */ HashMap urlCache = new HashMap(1); urlCache.put(newEntry.hash(),newEntry); + ArrayList tmpContainers = new ArrayList(condenser.RESULT_SIMI_WORDS); - String language = indexEntryAttribute.language(entry.url()); + + String language = indexEntryAttribute.language(entry.url()); char doctype = indexEntryAttribute.docType(document.getMimeType()); int urlLength = newEntry.url().toString().length(); int urlComps = htmlFilterContentScraper.urlComps(newEntry.url().toString()).length; @@ -1535,56 +1633,71 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser wordStat = (plasmaCondenser.wordStatProp) wentry.getValue(); String wordHash = indexEntryAttribute.word2hash(word); indexContainer wordIdxContainer = new indexRowSetContainer(wordHash); - indexEntry wordIdxEntry = new indexURLEntry(urlHash, - urlLength, urlComps, - wordStat.count, - document.longTitle.length(), - condenser.RESULT_SIMI_WORDS, - condenser.RESULT_SIMI_SENTENCES, - wordStat.posInText, - wordStat.posInPhrase, - wordStat.numOfPhrase, - 0, - newEntry.size(), - docDate.getTime(), - System.currentTimeMillis(), - condenser.RESULT_WORD_ENTROPHY, - language, - doctype, - ioLinks[0].intValue(), - ioLinks[1].intValue(), - true); + indexEntry wordIdxEntry = new indexURLEntry( + urlHash, + urlLength, urlComps, + wordStat.count, + document.longTitle.length(), + condenser.RESULT_SIMI_WORDS, + condenser.RESULT_SIMI_SENTENCES, + wordStat.posInText, + wordStat.posInPhrase, + wordStat.numOfPhrase, + 0, + newEntry.size(), + docDate.getTime(), + System.currentTimeMillis(), + condenser.RESULT_WORD_ENTROPHY, + language, + doctype, + ioLinks[0].intValue(), + ioLinks[1].intValue(), + true + ); wordIdxContainer.add(wordIdxEntry); tmpContainers.add(wordIdxContainer); - // wordIndex.addEntries(plasmaWordIndexEntryContainer.instantContainer(wordHash, System.currentTimeMillis(), entry)); } //System.out.println("DEBUG: plasmaSearch.addPageIndex: added " + condenser.getWords().size() + " words, flushed " + c + " entries"); words = condenser.RESULT_SIMI_WORDS; // transfering the index to the storage peer + indexContainer[] indexData = (indexContainer[]) tmpContainers.toArray(new indexContainer[tmpContainers.size()]); HashMap resultObj = yacyClient.transferIndex( - seed, - (indexContainer[]) tmpContainers.toArray(new indexContainer[tmpContainers.size()]), - urlCache, - true, - 120000); + seed, // target seed + indexData, // word index data + urlCache, // urls + true, // gzip body + 120000 // transfer timeout + ); + + // check for interruption + checkInterruption(); + + // if the transfer failed we try to store the index locally String error = (String) resultObj.get("result"); if (error != null) { - words = wordIndex.addPageIndex(entry.url(), urlHash, docDate, (int) entry.size(), - document, condenser, - indexEntryAttribute.language(entry.url()), - indexEntryAttribute.docType(document.getMimeType()), - ioLinks[0].intValue(), ioLinks[1].intValue()); + words = wordIndex.addPageIndex( + entry.url(), + urlHash, + docDate, + (int) entry.size(), + document, + condenser, + indexEntryAttribute.language(entry.url()), + indexEntryAttribute.docType(document.getMimeType()), + ioLinks[0].intValue(), + ioLinks[1].intValue() + ); } tmpContainers = null; } storageEndTime = System.currentTimeMillis(); - if (log.isLoggable(Level.INFO)) { + if (log.isInfo()) { log.logInfo("*Indexed " + words + " words in URL " + entry.url() + " [" + entry.urlHash() + "]" + - "\n\tDescription: " + descr + + "\n\tDescription: " + docDescription + "\n\tMimeType: " + document.getMimeType() + " | " + "Size: " + document.text.length + " bytes | " + "Anchors: " + ((document.anchors==null)?0:document.anchors.size()) + @@ -1594,46 +1707,59 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser "StorageTime: " + (storageEndTime-storageStartTime) + " ms"); } + // check for interruption + checkInterruption(); + // if this was performed for a remote crawl request, notify requester - if ((processCase == 6) && (initiator != null)) { - log.logInfo("Sending crawl receipt for '" + entry.normalizedURLString() + "' to " + initiator.getName()); - yacyClient.crawlReceipt(initiator, "crawl", "fill", "indexed", newEntry, ""); + if ((processCase == PROCESSCASE_6_GLOBAL_CRAWLING) && (initiatorPeer != null)) { + log.logInfo("Sending crawl receipt for '" + entry.normalizedURLString() + "' to " + initiatorPeer.getName()); + yacyClient.crawlReceipt(initiatorPeer, "crawl", "fill", "indexed", newEntry, ""); } } else { log.logFine("Not Indexed Resource '" + entry.normalizedURLString() + "': process case=" + processCase); - addURLtoErrorDB(entry.url(), referrerHash, initiatorHash, descr, plasmaCrawlEURL.DENIED_UNKNOWN_INDEXING_PROCESS_CASE, new bitfield(indexURL.urlFlagLength)); + addURLtoErrorDB(entry.url(), referrerUrlHash, initiatorPeerHash, docDescription, plasmaCrawlEURL.DENIED_UNKNOWN_INDEXING_PROCESS_CASE, new bitfield(indexURL.urlFlagLength)); } } catch (Exception ee) { + if (ee instanceof InterruptedException) throw (InterruptedException)ee; + + // check for interruption + checkInterruption(); + log.logSevere("Could not index URL " + entry.url() + ": " + ee.getMessage(), ee); - if ((processCase == 6) && (initiator != null)) { - yacyClient.crawlReceipt(initiator, "crawl", "exception", ee.getMessage(), null, ""); + if ((processCase == PROCESSCASE_6_GLOBAL_CRAWLING) && (initiatorPeer != null)) { + yacyClient.crawlReceipt(initiatorPeer, "crawl", "exception", ee.getMessage(), null, ""); } - addURLtoErrorDB(entry.url(), referrerHash, initiatorHash, descr, plasmaCrawlEURL.DENIED_UNSPECIFIED_INDEXING_ERROR, new bitfield(indexURL.urlFlagLength)); + addURLtoErrorDB(entry.url(), referrerUrlHash, initiatorPeerHash, docDescription, plasmaCrawlEURL.DENIED_UNSPECIFIED_INDEXING_ERROR, new bitfield(indexURL.urlFlagLength)); } } else { + // check for interruption + checkInterruption(); + log.logInfo("Not indexed any word in URL " + entry.url() + "; cause: " + noIndexReason); - addURLtoErrorDB(entry.url(), referrerHash, initiatorHash, descr, noIndexReason, new bitfield(indexURL.urlFlagLength)); - if ((processCase == 6) && (initiator != null)) { - yacyClient.crawlReceipt(initiator, "crawl", "rejected", noIndexReason, null, ""); + addURLtoErrorDB(entry.url(), referrerUrlHash, initiatorPeerHash, docDescription, noIndexReason, new bitfield(indexURL.urlFlagLength)); + if ((processCase == PROCESSCASE_6_GLOBAL_CRAWLING) && (initiatorPeer != null)) { + yacyClient.crawlReceipt(initiatorPeer, "crawl", "rejected", noIndexReason, null, ""); } } document = null; } finally { + checkInterruption(); + // The following code must be into the finally block, otherwise it will not be executed // on errors! - + // removing current entry from in process list synchronized (this.indexingTasksInProcess) { this.indexingTasksInProcess.remove(entry.urlHash()); } - + // removing current entry from notice URL queue boolean removed = urlPool.noticeURL.remove(entry.urlHash()); // worked-off if (!removed) { log.logFinest("Unable to remove indexed URL " + entry.url() + " from Crawler Queue. This could be because of an URL redirect."); } - + // explicit delete/free resources if ((entry != null) && (entry.profile() != null) && (!(entry.profile().storeHTCache()))) { plasmaHTCache.filesInUse.remove(entry.cacheFile()); @@ -2242,10 +2368,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser Iterator seedIter = seeds.iterator(); ArrayList transfer = new ArrayList(peerCount); while (hc1 < peerCount && (transfer.size() > 0 || seedIter.hasNext())) { - + // starting up some transfer threads int transferThreadCount = transfer.size(); for (int i=0; i < peerCount-hc1-transferThreadCount; i++) { + // check for interruption + checkInterruption(); + if (seedIter.hasNext()) { plasmaDHTTransfer t = new plasmaDHTTransfer(log, (yacySeed)seedIter.next(), dhtChunk,gzipBody,timeout,retries); t.start(); @@ -2258,6 +2387,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser // waiting for the transfer threads to finish Iterator transferIter = transfer.iterator(); while (transferIter.hasNext()) { + // check for interruption + checkInterruption(); + plasmaDHTTransfer t = (plasmaDHTTransfer)transferIter.next(); if (!t.isAlive()) { // remove finished thread from the list @@ -2311,6 +2443,12 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser this.urlPool.errorURL.stackPushEntry(ee); } + public void checkInterruption() throws InterruptedException { + Thread curThread = Thread.currentThread(); + if ((curThread instanceof serverThread) && ((serverThread)curThread).shutdownInProgress()) throw new InterruptedException("Shutdown in progress ..."); + else if (this.terminate || curThread.isInterrupted()) throw new InterruptedException("Shutdown in progress ..."); + } + public void terminate(long delay) { if (delay <= 0) throw new IllegalArgumentException("The shutdown delay must be greater than 0."); (new delayedShutdown(this,delay)).start(); diff --git a/source/de/anomic/server/serverAbstractThread.java b/source/de/anomic/server/serverAbstractThread.java index cae6ed1c0..3183d4cad 100644 --- a/source/de/anomic/server/serverAbstractThread.java +++ b/source/de/anomic/server/serverAbstractThread.java @@ -174,6 +174,11 @@ public abstract class serverAbstractThread extends Thread implements serverThrea this.log = log; } + + public boolean shutdownInProgress() { + return !this.running || Thread.currentThread().isInterrupted(); + } + public void intermission(long pause) { if (pause == Long.MAX_VALUE) this.intermission = Long.MAX_VALUE; diff --git a/source/de/anomic/server/serverThread.java b/source/de/anomic/server/serverThread.java index 530cccb81..f6ad9d8fe 100644 --- a/source/de/anomic/server/serverThread.java +++ b/source/de/anomic/server/serverThread.java @@ -112,6 +112,8 @@ public interface serverThread { // the thread is forced to pause for a specific time // if the thread is busy meanwhile, the pause is ommitted + public boolean shutdownInProgress(); + public void terminate(boolean waitFor); // after calling this method, the thread shall terminate // if waitFor is true, the method waits until the process has died