diff --git a/build.properties b/build.properties index 5d8176427..f5e7c0fd0 100644 --- a/build.properties +++ b/build.properties @@ -3,7 +3,7 @@ javacSource=1.5 javacTarget=1.5 # Release Configuration -releaseVersion=0.575 +releaseVersion=0.576 stdReleaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz embReleaseFile=yacy_emb_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz proReleaseFile=yacy_pro_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz diff --git a/htroot/IndexCreateIndexingQueue_p.java b/htroot/IndexCreateIndexingQueue_p.java index c7a5099bc..8ec803f27 100644 --- a/htroot/IndexCreateIndexingQueue_p.java +++ b/htroot/IndexCreateIndexingQueue_p.java @@ -84,7 +84,7 @@ public class IndexCreateIndexingQueue_p { if (post.containsKey("clearIndexingQueue")) { try { synchronized (switchboard.sbQueue) { - plasmaSwitchboardQueue.Entry entry = null; + plasmaSwitchboardQueue.QueueEntry entry = null; while ((entry = switchboard.sbQueue.pop()) != null) { if ((entry != null) && (entry.profile() != null) && (!(entry.profile().storeHTCache()))) { plasmaHTCache.deleteURLfromCache(entry.url()); @@ -106,26 +106,24 @@ public class IndexCreateIndexingQueue_p { yacySeed initiator; boolean dark; - if ((switchboard.sbQueue.size() == 0) && (switchboard.indexingTasksInProcess.size() == 0)) { + if ((switchboard.sbQueue.size() == 0) && (switchboard.sbQueue.getActiveQueueSize() == 0)) { prop.put("indexing-queue", "0"); //is empty } else { prop.put("indexing-queue", "1"); // there are entries in the queue or in process dark = true; - plasmaSwitchboardQueue.Entry pcentry; - int inProcessCount = 0, entryCount = 0, totalCount = 0; + plasmaSwitchboardQueue.QueueEntry pcentry; + int entryCount = 0, totalCount = 0; long totalSize = 0; - ArrayList entryList = new ArrayList(); // getting all entries that are currently in process - synchronized (switchboard.indexingTasksInProcess) { - inProcessCount = switchboard.indexingTasksInProcess.size(); - entryList.addAll(switchboard.indexingTasksInProcess.values()); - } + ArrayList entryList = new ArrayList(); + entryList.addAll(switchboard.sbQueue.getActiveQueueEntries()); + int inProcessCount = entryList.size(); // getting all enqueued entries if ((switchboard.sbQueue.size() > 0)) { - Iterator i = switchboard.sbQueue.entryIterator(false); + Iterator i = switchboard.sbQueue.entryIterator(false); while (i.hasNext()) entryList.add(i.next()); } @@ -134,7 +132,7 @@ public class IndexCreateIndexingQueue_p { for (int i = 0; (i < count) && (entryCount < showLimit); i++) { boolean inProcess = i < inProcessCount; - pcentry = (plasmaSwitchboardQueue.Entry) entryList.get(i); + pcentry = (plasmaSwitchboardQueue.QueueEntry) entryList.get(i); if ((pcentry != null)&&(pcentry.url() != null)) { long entrySize = pcentry.size(); totalSize += entrySize; diff --git a/htroot/Status.java b/htroot/Status.java index fa98918c5..b99f9e3ef 100644 --- a/htroot/Status.java +++ b/htroot/Status.java @@ -305,7 +305,7 @@ public class Status { prop.putNum("connectionsMax", httpd.getMaxSessionCount()); // Queue information - int indexingJobCount = sb.getThread("80_indexing").getJobCount()+sb.indexingTasksInProcess.size(); + int indexingJobCount = sb.getThread("80_indexing").getJobCount() + sb.sbQueue.getActiveQueueSize(); int indexingMaxCount = (int) sb.getConfigLong(plasmaSwitchboard.INDEXER_SLOTS, 30); int indexingPercent = (indexingMaxCount==0)?0:indexingJobCount*100/indexingMaxCount; prop.putNum("indexingQueueSize", indexingJobCount); diff --git a/htroot/xml/queues_p.java b/htroot/xml/queues_p.java index d0b40e6a7..23aaf1b46 100644 --- a/htroot/xml/queues_p.java +++ b/htroot/xml/queues_p.java @@ -87,28 +87,25 @@ public class queues_p { yacySeed initiator; //indexing queue - prop.putNum("indexingSize", sb.getThread(plasmaSwitchboard.INDEXER).getJobCount()+sb.indexingTasksInProcess.size()); + prop.putNum("indexingSize", sb.getThread(plasmaSwitchboard.INDEXER).getJobCount() + sb.sbQueue.getActiveQueueSize()); prop.putNum("indexingMax", (int) sb.getConfigLong(plasmaSwitchboard.INDEXER_SLOTS, 30)); prop.putNum("urlpublictextSize", sb.wordIndex.countURL()); prop.putNum("rwipublictextSize", sb.wordIndex.size()); - if ((sb.sbQueue.size() == 0) && (sb.indexingTasksInProcess.size() == 0)) { + if ((sb.sbQueue.size() == 0) && (sb.sbQueue.getActiveQueueSize() == 0)) { prop.put("list", "0"); //is empty } else { - plasmaSwitchboardQueue.Entry pcentry; - int inProcessCount = 0; + plasmaSwitchboardQueue.QueueEntry pcentry; long totalSize = 0; int i=0; //counter - ArrayList entryList = new ArrayList(); // getting all entries that are currently in process - synchronized (sb.indexingTasksInProcess) { - inProcessCount = sb.indexingTasksInProcess.size(); - entryList.addAll(sb.indexingTasksInProcess.values()); - } + ArrayList entryList = new ArrayList(); + entryList.addAll(sb.sbQueue.getActiveQueueEntries()); + int inProcessCount = entryList.size(); // getting all enqueued entries if ((sb.sbQueue.size() > 0)) { - Iterator i1 = sb.sbQueue.entryIterator(false); + Iterator i1 = sb.sbQueue.entryIterator(false); while (i1.hasNext()) entryList.add(i1.next()); } @@ -118,8 +115,8 @@ public class queues_p { int ok = 0; for (i = 0; i < size; i++) { boolean inProcess = i < inProcessCount; - pcentry = (plasmaSwitchboardQueue.Entry) entryList.get(i); - if ((pcentry != null)&&(pcentry.url() != null)) { + pcentry = entryList.get(i); + if ((pcentry != null) && (pcentry.url() != null)) { long entrySize = pcentry.size(); totalSize += entrySize; initiator = yacyCore.seedDB.getConnected(pcentry.initiator()); diff --git a/source/de/anomic/kelondro/kelondroBufferedEcoFS.java b/source/de/anomic/kelondro/kelondroBufferedEcoFS.java index 4e6511195..b54fc67ab 100644 --- a/source/de/anomic/kelondro/kelondroBufferedEcoFS.java +++ b/source/de/anomic/kelondro/kelondroBufferedEcoFS.java @@ -48,6 +48,7 @@ public class kelondroBufferedEcoFS { } private void flushBuffer() throws IOException { + if (efs == null) return; Iterator> i = buffer.entrySet().iterator(); Map.Entry entry; while (i.hasNext()) { @@ -71,7 +72,7 @@ public class kelondroBufferedEcoFS { } catch (IOException e) { e.printStackTrace(); } - efs.close(); + if (efs != null) efs.close(); efs = null; } diff --git a/source/de/anomic/plasma/plasmaProfiling.java b/source/de/anomic/plasma/plasmaProfiling.java index d0b0b1d99..4f53991ad 100644 --- a/source/de/anomic/plasma/plasmaProfiling.java +++ b/source/de/anomic/plasma/plasmaProfiling.java @@ -41,7 +41,7 @@ public class plasmaProfiling { public static long lastPPMUpdate = System.currentTimeMillis()- 30000; - public static void updateIndexedPage(plasmaSwitchboardQueue.Entry entry) { + public static void updateIndexedPage(plasmaSwitchboardQueue.QueueEntry entry) { if (System.currentTimeMillis() - lastPPMUpdate > 30000) { // we don't want to do this too often yacyCore.peerActions.updateMySeed(); diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 1582b0b71..51cc13c4e 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -156,7 +156,7 @@ import de.anomic.yacy.yacySeed; import de.anomic.yacy.yacyURL; import de.anomic.yacy.yacyVersion; -public final class plasmaSwitchboard extends serverAbstractSwitch implements serverSwitch { +public final class plasmaSwitchboard extends serverAbstractSwitch implements serverSwitch { // load slots public static int xstackCrawlSlots = 2000; @@ -216,7 +216,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch indexingTasksInProcess; public userDB userDB; public bookmarksDB bookmarksDB; public plasmaWebStructure webStructure; @@ -365,6 +364,21 @@ public final class plasmaSwitchboard extends serverAbstractSwitchpublic static final String INDEXER = "80_indexing"

+ *

Name of the indexer thread, performing the actual indexing of a website

+ */ + public static final String PARSER = "74_indexing"; + public static final String PARSER_MEMPREREQ = "74_indexing_memprereq"; + public static final String PARSER_IDLESLEEP = "74_indexing_idlesleep"; + public static final String PARSER_BUSYSLEEP = "74_indexing_busysleep"; + public static final String PARSER_METHOD_START = "deQueueProcess"; + public static final String PARSER_METHOD_JOBCOUNT = "queueSize"; + public static final String PARSER_METHOD_FREEMEM = "deQueueFreeMem"; + + // 80_indexing /** *

public static final String INDEXER = "80_indexing"

@@ -1122,13 +1136,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch(); - // going through the sbQueue Entries and registering all content files as in use int count = 0; - plasmaSwitchboardQueue.Entry queueEntry; - Iterator i1 = sbQueue.entryIterator(true); + plasmaSwitchboardQueue.QueueEntry queueEntry; + Iterator i1 = sbQueue.entryIterator(true); while (i1.hasNext()) { queueEntry = i1.next(); if ((queueEntry != null) && (queueEntry.url() != null) && (queueEntry.cacheFile().exists())) { @@ -1284,8 +1295,12 @@ public final class plasmaSwitchboard extends serverAbstractSwitch * @param newCacheSize in MB @@ -1743,14 +1756,14 @@ public final class plasmaSwitchboard extends serverAbstractSwitch hl = document.getHyperlinks(); - Iterator> i = hl.entrySet().iterator(); - yacyURL nextUrl; - Map.Entry nextEntry; - while (i.hasNext()) { - // check for interruption - checkInterruption(); - - // fetching the next hyperlink - nextEntry = i.next(); - nextUrl = nextEntry.getKey(); - // enqueue the hyperlink into the pre-notice-url db - crawlStacker.enqueueEntry(nextUrl, entry.urlHash(), entry.initiator(), nextEntry.getValue(), docDate, entry.depth() + 1, entry.profile()); - } - if (log.isInfo()) log.logInfo("CRAWL: ADDED " + hl.size() + " LINKS FROM " + entry.url().toNormalform(false, true) + - ", NEW CRAWL STACK SIZE IS " + crawlQueues.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE) + - ", STACKING TIME = " + (stackEndTime-stackStartTime) + - ", PARSING TIME = " + (parsingEndTime-parsingStartTime)); - } - stackEndTime = System.currentTimeMillis(); - } catch (Exception e) { - if (e instanceof InterruptedException) throw (InterruptedException)e; - this.log.logSevere("Unexpected exception while parsing/indexing URL ",e); - } catch (Error e) { - this.log.logSevere("Unexpected exception while parsing/indexing URL ",e); - } finally { - checkInterruption(); - - // The following code must be into the finally block, otherwise it will not be executed - // on errors! - - // removing current entry from in process list - synchronized (this.indexingTasksInProcess) { - this.indexingTasksInProcess.remove(entry.urlHash()); + // parse the document + document = parser.parseSource(entry.url(), entry.getMimeType(), entry.getCharacterEncoding(), entry.cacheFile()); + assert(document != null) : "Unexpected error. Parser returned null."; + if (document == null) return null; + } catch (ParserException e) { + this.log.logInfo("Unable to parse the resource '" + entry.url() + "'. " + e.getMessage()); + addURLtoErrorDB(entry.url(), entry.referrerHash(), entry.initiator(), entry.anchorName(), e.getErrorCode(), new kelondroBitfield()); + if (document != null) { + document.close(); + document = null; } - - // explicit delete/free resources - if ((entry != null) && (entry.profile() != null) && (!(entry.profile().storeHTCache()))) { - plasmaHTCache.filesInUse.remove(entry.cacheFile()); - //plasmaHTCache.deleteURLfromCache(entry.url()); + return null; + } + + long parsingEndTime = System.currentTimeMillis(); + + // get the document date + Date docDate = entry.getModificationDate(); + + // put anchors on crawl stack + long stackStartTime = System.currentTimeMillis(); + if ( + ((processCase == PROCESSCASE_4_PROXY_LOAD) || (processCase == PROCESSCASE_5_LOCAL_CRAWLING)) && + ((entry.profile() == null) || (entry.depth() < entry.profile().generalDepth())) + ) { + Map hl = document.getHyperlinks(); + Iterator> i = hl.entrySet().iterator(); + yacyURL nextUrl; + Map.Entry nextEntry; + while (i.hasNext()) { + // check for interruption + checkInterruption(); + + // fetching the next hyperlink + nextEntry = i.next(); + nextUrl = nextEntry.getKey(); + // enqueue the hyperlink into the pre-notice-url db + crawlStacker.enqueueEntry(nextUrl, entry.urlHash(), entry.initiator(), nextEntry.getValue(), docDate, entry.depth() + 1, entry.profile()); } - entry = null; - - if (document != null) try { document.close(); } catch (Exception e) {} + long stackEndTime = System.currentTimeMillis(); + if (log.isInfo()) log.logInfo("CRAWL: ADDED " + hl.size() + " LINKS FROM " + entry.url().toNormalform(false, true) + + ", NEW CRAWL STACK SIZE IS " + crawlQueues.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE) + + ", STACKING TIME = " + (stackEndTime-stackStartTime) + + ", PARSING TIME = " + (parsingEndTime-parsingStartTime)); } return document; } - private plasmaCondenser condenseDocument(plasmaSwitchboardQueue.Entry entry, plasmaParserDocument document) throws InterruptedException { + private plasmaCondenser condenseDocument(plasmaSwitchboardQueue.QueueEntry entry, plasmaParserDocument document) throws InterruptedException { // CREATE INDEX String dc_title = document.dc_title(); yacyURL referrerURL = entry.referrerURL(); @@ -2229,7 +2243,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch queueInProcess; public plasmaSwitchboardQueue(plasmaWordIndex index, File sbQueueStackPath, plasmaCrawlProfile profiles) { this.sbQueueStackPath = sbQueueStackPath; this.profiles = profiles; this.index = index; + this.queueInProcess = new ConcurrentHashMap(); initQueueStack(); } @@ -104,7 +108,7 @@ public class plasmaSwitchboardQueue { return sbQueueStack.size(); } - public synchronized void push(Entry entry) throws IOException { + public synchronized void push(QueueEntry entry) throws IOException { if (entry == null) return; sbQueueStack.push(sbQueueStack.row().newEntry(new byte[][]{ entry.url.toString().getBytes(), @@ -118,20 +122,20 @@ public class plasmaSwitchboardQueue { })); } - public synchronized Entry pop() throws IOException { + public synchronized QueueEntry pop() throws IOException { if (sbQueueStack.size() == 0) return null; kelondroRow.Entry b = sbQueueStack.pot(); if (b == null) return null; - return new Entry(b); + return new QueueEntry(b); } - public synchronized Entry remove(String urlHash) { + public synchronized QueueEntry remove(String urlHash) { Iterator i = sbQueueStack.stackIterator(true); kelondroRow.Entry rowentry; - Entry entry; + QueueEntry entry; while (i.hasNext()) { rowentry = (kelondroRow.Entry) i.next(); - entry = new Entry(rowentry); + entry = new QueueEntry(rowentry); if (entry.urlHash().equals(urlHash)) { i.remove(); return entry; @@ -160,13 +164,13 @@ public class plasmaSwitchboardQueue { super.finalize(); } - public Iterator entryIterator(boolean up) { + public Iterator entryIterator(boolean up) { // iterates the elements in an ordered way. // returns plasmaSwitchboardQueue.Entry - type Objects return new entryIterator(up); } - public class entryIterator implements Iterator { + public class entryIterator implements Iterator { Iterator rows; @@ -178,8 +182,8 @@ public class plasmaSwitchboardQueue { return rows.hasNext(); } - public Entry next() { - return new Entry((kelondroRow.Entry) rows.next()); + public QueueEntry next() { + return new QueueEntry((kelondroRow.Entry) rows.next()); } public void remove() { @@ -187,12 +191,43 @@ public class plasmaSwitchboardQueue { } } - public Entry newEntry(yacyURL url, String referrer, Date ifModifiedSince, boolean requestWithCookie, + public QueueEntry newEntry(yacyURL url, String referrer, Date ifModifiedSince, boolean requestWithCookie, String initiator, int depth, String profilehandle, String anchorName) { - return new Entry(url, referrer, ifModifiedSince, requestWithCookie, initiator, depth, profilehandle, anchorName); + return new QueueEntry(url, referrer, ifModifiedSince, requestWithCookie, initiator, depth, profilehandle, anchorName); } - - public class Entry { + + public void enQueueToActive(QueueEntry entry) { + queueInProcess.put(entry.urlHash(), entry); + } + + public QueueEntry getActiveEntry(String urlhash) { + // show one entry from the queue + return this.queueInProcess.get(urlhash); + } + + public int getActiveQueueSize() { + return this.queueInProcess.size(); + } + + public Collection getActiveQueueEntries() { + return this.queueInProcess.values(); + } + + public static final int QUEUE_STATE_FRESH = 0; + public static final int QUEUE_STATE_PARSING_WAITING = 1; + public static final int QUEUE_STATE_PARSING_RUNNING = 2; + public static final int QUEUE_STATE_PARSING_COMPLETE = 3; + public static final int QUEUE_STATE_CONDENSING_WAITING = 4; + public static final int QUEUE_STATE_CONDENSING_RUNNING = 5; + public static final int QUEUE_STATE_CONDENSING_COMPLETE = 6; + public static final int QUEUE_STATE_STRUCTUREANALYSIS_WAITING = 7; + public static final int QUEUE_STATE_STRUCTUREANALYSIS_RUNNING = 8; + public static final int QUEUE_STATE_STRUCTUREANALYSIS_COMPLETE = 9; + public static final int QUEUE_STATE_INDEXSTORAGE_WAITING = 10; + public static final int QUEUE_STATE_INDEXSTORAGE_RUNNING = 11; + public static final int QUEUE_STATE_INDEXSTORAGE_COMPLETE = 12; + + public class QueueEntry { yacyURL url; // plasmaURL.urlStringLength String referrerHash; // plasmaURL.urlHashLength Date ifModifiedSince; // 6 @@ -201,13 +236,14 @@ public class plasmaSwitchboardQueue { int depth; // plasmaURL.urlCrawlDepthLength String profileHandle; // plasmaURL.urlCrawlProfileHandleLength String anchorName; // plasmaURL.urlDescrLength - + int status; + // computed values private plasmaCrawlProfile.entry profileEntry; private IResourceInfo contentInfo; private yacyURL referrerURL; - public Entry(yacyURL url, String referrer, Date ifModifiedSince, boolean requestWithCookie, + public QueueEntry(yacyURL url, String referrer, Date ifModifiedSince, boolean requestWithCookie, String initiator, int depth, String profileHandle, String anchorName) { this.url = url; this.referrerHash = referrer; @@ -221,9 +257,10 @@ public class plasmaSwitchboardQueue { this.profileEntry = null; this.contentInfo = null; this.referrerURL = null; + this.status = QUEUE_STATE_FRESH; } - public Entry(kelondroRow.Entry row) { + public QueueEntry(kelondroRow.Entry row) { long ims = row.getColLong(2); byte flags = row.getColByte(3); try { @@ -242,9 +279,10 @@ public class plasmaSwitchboardQueue { this.profileEntry = null; this.contentInfo = null; this.referrerURL = null; + this.status = QUEUE_STATE_FRESH; } - public Entry(byte[][] row) throws IOException { + public QueueEntry(byte[][] row) throws IOException { long ims = (row[2] == null) ? 0 : kelondroBase64Order.enhancedCoder.decodeLong(new String(row[2], "UTF-8")); byte flags = (row[3] == null) ? 0 : row[3][0]; try { @@ -263,6 +301,19 @@ public class plasmaSwitchboardQueue { this.profileEntry = null; this.contentInfo = null; this.referrerURL = null; + this.status = QUEUE_STATE_FRESH; + } + + public void updateStatus(int newStatus) { + this.status = newStatus; + } + + public void close() { + queueInProcess.remove(this.url.hash()); + } + + public void finalize() { + this.close(); } public yacyURL url() { diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index 92cad1f0b..0265607ad 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -605,7 +605,7 @@ public final class plasmaWordIndex implements indexRI { return containers; // this may return less containers as demanded } - public indexURLReference storeDocument(plasmaSwitchboardQueue.Entry entry, plasmaParserDocument document, plasmaCondenser condenser) throws IOException { + public indexURLReference storeDocument(plasmaSwitchboardQueue.QueueEntry entry, plasmaParserDocument document, plasmaCondenser condenser) throws IOException { long startTime = System.currentTimeMillis(); // CREATE INDEX