From 6e0dc39a7dd3b3ec7a61dd9d3e7574178404588c Mon Sep 17 00:00:00 2001 From: orbiter Date: Tue, 6 Oct 2009 21:52:55 +0000 Subject: [PATCH] - some fixes to prevent blocking situations - better logging for the crawler - better default values for the crawler git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6377 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- defaults/yacy.init | 4 ++-- source/de/anomic/crawler/CrawlQueues.java | 10 ++++---- source/de/anomic/kelondro/blob/MapView.java | 6 +++-- .../de/anomic/kelondro/text/IODispatcher.java | 23 ++++++++++++++----- source/de/anomic/kelondro/text/IndexCell.java | 9 ++++++-- source/de/anomic/search/Switchboard.java | 17 +++++++++----- 6 files changed, 46 insertions(+), 23 deletions(-) diff --git a/defaults/yacy.init b/defaults/yacy.init index a0e1ee316..88acfd11d 100644 --- a/defaults/yacy.init +++ b/defaults/yacy.init @@ -570,7 +570,7 @@ filterOutStopwordsFromTopwords=true 40_peerseedcycle_busysleep=1200000 40_peerseedcycle_memprereq=4194304 50_localcrawl_idlesleep=2000 -50_localcrawl_busysleep=30 +50_localcrawl_busysleep=20 50_localcrawl_memprereq=12582912 50_localcrawl_isPaused=false 60_remotecrawlloader_idlesleep=60000 @@ -694,7 +694,7 @@ crawler.http.maxFileSize=1048576 crawler.ftp.maxFileSize=1048576 # maximum number of crawler threads -crawler.MaxActiveThreads = 50 +crawler.MaxActiveThreads = 200 # maximum size of indexing queue indexer.slots = 100 diff --git a/source/de/anomic/crawler/CrawlQueues.java b/source/de/anomic/crawler/CrawlQueues.java index 5e92fbce9..a3998fd2d 100644 --- a/source/de/anomic/crawler/CrawlQueues.java +++ b/source/de/anomic/crawler/CrawlQueues.java @@ -212,12 +212,12 @@ public class CrawlQueues { String queueCheck = crawlIsPossible(NoticedURL.STACK_TYPE_CORE, "Core"); if (queueCheck != null) { - if (log.isFinest()) log.logFinest("omitting de-queue/local: " + queueCheck); + log.logInfo("omitting de-queue/local: " + queueCheck); return false; } if (isPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL)) { - if (log.isFinest()) log.logFinest("omitting de-queue/local: paused"); + log.logInfo("omitting de-queue/local: paused"); return false; } @@ -569,9 +569,9 @@ public class CrawlQueues { result = "no content (possibly caused by cache policy)"; } else { request.setStatus("loaded", serverProcessorJob.STATUS_RUNNING); - final boolean stored = sb.toIndexer(response); - request.setStatus("enqueued-" + ((stored) ? "ok" : "fail"), serverProcessorJob.STATUS_FINISHED); - result = (stored) ? null : "not enqueued to indexer"; + final String storedFailMessage = sb.toIndexer(response); + request.setStatus("enqueued-" + ((storedFailMessage == null) ? "ok" : "fail"), serverProcessorJob.STATUS_FINISHED); + result = (storedFailMessage == null) ? null : "not enqueued to indexer: " + storedFailMessage; } } catch (IOException e) { request.setStatus("error", serverProcessorJob.STATUS_FINISHED); diff --git a/source/de/anomic/kelondro/blob/MapView.java b/source/de/anomic/kelondro/blob/MapView.java index 9c69e127f..52208f9cf 100644 --- a/source/de/anomic/kelondro/blob/MapView.java +++ b/source/de/anomic/kelondro/blob/MapView.java @@ -181,10 +181,11 @@ public class MapView { assert key != null; if (cache == null) return false; // case may appear during shutdown key = normalizeKey(key); + boolean h = false; synchronized (this) { - if (this.cache.containsKey(key)) return true; - return this.blob.has(key.getBytes()); + h = this.cache.containsKey(key) || this.blob.has(key.getBytes()); } + return h; } /** @@ -199,6 +200,7 @@ public class MapView { } private String normalizeKey(String key) { + if (blob == null) return key; if (key.length() > blob.keylength()) key = key.substring(0, blob.keylength()); while (key.length() < blob.keylength()) key += fillchar; return key; diff --git a/source/de/anomic/kelondro/text/IODispatcher.java b/source/de/anomic/kelondro/text/IODispatcher.java index 4d24458a0..cb14c970a 100644 --- a/source/de/anomic/kelondro/text/IODispatcher.java +++ b/source/de/anomic/kelondro/text/IODispatcher.java @@ -86,9 +86,15 @@ public class IODispatcher extends Thread { } else { DumpJob job = (DumpJob)new DumpJob(cache, file, array); try { - this.dumpQueue.put(job); - this.controlQueue.release(); - Log.logInfo("IODispatcher", "appended dump job for file " + file.getName()); + // check if the dispatcher is running + if (this.isAlive()) { + this.dumpQueue.put(job); + this.controlQueue.release(); + Log.logInfo("IODispatcher", "appended dump job for file " + file.getName()); + } else { + job.dump(); + Log.logWarning("IODispatcher", "dispatcher is not alive, just dumped file " + file.getName()); + } } catch (InterruptedException e) { e.printStackTrace(); cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); @@ -111,9 +117,14 @@ public class IODispatcher extends Thread { } else { MergeJob job = new MergeJob(f1, f2, factory, array, payloadrow, newFile); try { - this.mergeQueue.put(job); - this.controlQueue.release(); - Log.logInfo("IODispatcher", "appended merge job of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); + if (this.isAlive()) { + this.mergeQueue.put(job); + this.controlQueue.release(); + Log.logInfo("IODispatcher", "appended merge job of files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); + } else { + job.merge(); + Log.logWarning("IODispatcher", "dispatcher not running, merged files " + f1.getName() + ", " + f2.getName() + " to " + newFile.getName()); + } } catch (InterruptedException e) { Log.logWarning("IODispatcher", "interrupted: " + e.getMessage(), e); try { diff --git a/source/de/anomic/kelondro/text/IndexCell.java b/source/de/anomic/kelondro/text/IndexCell.java index f081796d9..4b56e7773 100644 --- a/source/de/anomic/kelondro/text/IndexCell.java +++ b/source/de/anomic/kelondro/text/IndexCell.java @@ -65,6 +65,7 @@ public final class IndexCell extends AbstractBu private final long targetFileSize, maxFileSize; private final int writeBufferSize; private final SimpleARC countCache; + private boolean cleanerRunning = false; public IndexCell( final File cellPath, @@ -340,15 +341,19 @@ public final class IndexCell extends AbstractBu } // clean-up the cache - if (this.array.entries() > 50 || (this.lastCleanup + cleanupCycle < System.currentTimeMillis())) synchronized (this) { - if (this.array.entries() > 50 || (this.lastCleanup + cleanupCycle < System.currentTimeMillis())) { + if (!this.cleanerRunning && (this.array.entries() > 50 || this.lastCleanup + cleanupCycle < System.currentTimeMillis())) synchronized (this) { + if (this.array.entries() > 50 || (this.lastCleanup + cleanupCycle < System.currentTimeMillis())) try { + this.cleanerRunning = true; //System.out.println("----cleanup check"); this.array.shrink(this.targetFileSize, this.maxFileSize); this.lastCleanup = System.currentTimeMillis(); + } finally { + this.cleanerRunning = false; } } } + public File newContainerBLOBFile() { // for migration of cache files return this.array.newContainerBLOBFile(); diff --git a/source/de/anomic/search/Switchboard.java b/source/de/anomic/search/Switchboard.java index cf5e2383e..215e4f770 100644 --- a/source/de/anomic/search/Switchboard.java +++ b/source/de/anomic/search/Switchboard.java @@ -1128,17 +1128,22 @@ public final class Switchboard extends serverAbstractSwitch implements serverSwi log.logConfig("SWITCHBOARD SHUTDOWN TERMINATED"); } - public boolean toIndexer(final Response response) { + /** + * pass a response to the indexer + * @param response + * @return null if successful, an error message othervise + */ + public String toIndexer(final Response response) { assert response != null; // get next queue entry and start a queue processing if (response == null) { if (this.log.isFine()) log.logFine("deQueue: queue entry is null"); - return false; + return "queue entry is null"; } if (response.profile() == null) { if (this.log.isFine()) log.logFine("deQueue: profile is null"); - return false; + return "profile is null"; } // check if the document should be indexed based on proxy/crawler rules @@ -1176,17 +1181,17 @@ public final class Switchboard extends serverAbstractSwitch implements serverSwi if (log.isFine()) log.logFine("deQueue: not indexed any word in URL " + response.url() + "; cause: " + noIndexReason); addURLtoErrorDB(response.url(), (referrerURL == null) ? "" : referrerURL.hash(), response.initiator(), response.name(), noIndexReason); // finish this entry - return false; + return "not indexed any word in URL " + response.url() + "; cause: " + noIndexReason; } // put document into the concurrent processing queue if (log.isFinest()) log.logFinest("deQueue: passing to indexing queue: " + response.url().toNormalform(true, false)); try { this.indexingDocumentProcessor.enQueue(new indexingQueueEntry(response, null, null)); - return true; + return null; } catch (InterruptedException e) { e.printStackTrace(); - return false; + return "interrupted: " + e.getMessage(); } }