From 461374e17530dbafe2f2b4a95b1b5cb8ed85fed6 Mon Sep 17 00:00:00 2001 From: theli Date: Wed, 12 Oct 2005 09:38:40 +0000 Subject: [PATCH] *) Restricting amount of files that yacy is allowed to open during index transfer/distribution This option is configurable via config file and is set per default to 800 See: http://www.yacy-forum.de/viewtopic.php?p=11137#11137 git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@918 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- .../de/anomic/plasma/plasmaSwitchboard.java | 15 +++-- .../plasma/plasmaWordIndexDistribution.java | 56 +++++++++++++------ yacy.init | 4 ++ 3 files changed, 52 insertions(+), 23 deletions(-) diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 46a1c6cf7..6604c095f 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -478,11 +478,16 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser peerPing = new serverInstantThread(yc, "peerPing", null), 2000); peerPing.setSyncObject(new Object()); - indexDistribution = new plasmaWordIndexDistribution(urlPool, wordIndex, log, - getConfig("allowDistributeIndex", "false").equalsIgnoreCase("true"), - getConfig("allowDistributeIndexWhileCrawling","false").equalsIgnoreCase("true"), - getConfig("indexDistribution.gzipBody","false").equalsIgnoreCase("true"), - (int)getConfigLong("indexDistribution.timeout",60000)); + this.indexDistribution = new plasmaWordIndexDistribution( + this.urlPool, + this.wordIndex, + this.log, + getConfig("allowDistributeIndex", "false").equalsIgnoreCase("true"), + getConfig("allowDistributeIndexWhileCrawling","false").equalsIgnoreCase("true"), + getConfig("indexDistribution.gzipBody","false").equalsIgnoreCase("true"), + (int)getConfigLong("indexDistribution.timeout",60000), + (int)getConfigLong("indexDistribution.maxOpenFiles",800) + ); indexDistribution.setCounts(150, 1, 3, 10000); deployThread("20_dhtdistribution", "DHT Distribution", "selection, transfer and deletion of index entries that are not searched on your peer, but on others", null, new serverInstantThread(indexDistribution, "job", null), 600000); diff --git a/source/de/anomic/plasma/plasmaWordIndexDistribution.java b/source/de/anomic/plasma/plasmaWordIndexDistribution.java index 81cb6c4d1..01aab547d 100644 --- a/source/de/anomic/plasma/plasmaWordIndexDistribution.java +++ b/source/de/anomic/plasma/plasmaWordIndexDistribution.java @@ -76,11 +76,20 @@ public final class plasmaWordIndexDistribution { private boolean closed; private boolean gzipBody; private int timeout; + private int maxOpenFiles; public transferIndexThread transferIdxThread = null; - public plasmaWordIndexDistribution(plasmaURLPool urlPool, plasmaWordIndex wordIndex, serverLog log, - boolean enable, boolean enabledWhileCrawling, boolean gzipBody, int timeout) { + public plasmaWordIndexDistribution( + plasmaURLPool urlPool, + plasmaWordIndex wordIndex, + serverLog log, + boolean enable, + boolean enabledWhileCrawling, + boolean gzipBody, + int timeout, + int maxOpenFiles + ) { this.urlPool = urlPool; this.wordIndex = wordIndex; this.enabled = enable; @@ -90,6 +99,7 @@ public final class plasmaWordIndexDistribution { setCounts(100 /*indexCount*/, 1 /*juniorPeerCount*/, 3 /*seniorPeerCount*/, 8000); this.gzipBody = gzipBody; this.timeout = timeout; + this.maxOpenFiles = maxOpenFiles; } public void enable() { @@ -190,8 +200,9 @@ public final class plasmaWordIndexDistribution { // collect index String startPointHash = selectTransferStart(); log.logFine("Selected hash " + startPointHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash)); - Object[] selectResult = selectTransferIndexes(startPointHash, indexCount); + Object[] selectResult = selectTransferIndexes(startPointHash, indexCount, this.maxOpenFiles); plasmaWordIndexEntity[] indexEntities = (plasmaWordIndexEntity[]) selectResult[0]; + Integer openedFiles = (Integer) selectResult[2]; HashMap urlCache = (HashMap) selectResult[1]; // String (url-hash) / plasmaCrawlLURL.Entry if ((indexEntities == null) || (indexEntities.length == 0)) { log.logFine("No index available for index transfer, hash start-point " + startPointHash); @@ -282,12 +293,13 @@ public final class plasmaWordIndexDistribution { } Object[] /* of {plasmaWordIndexEntity[], HashMap(String, plasmaCrawlLURL.Entry)}*/ - selectTransferIndexes(String hash, int count) { + selectTransferIndexes(String hash, int count, int maxOpenFiles) { // the hash is a start hash from where the indexes are picked ArrayList tmpEntities = new ArrayList(count); String nexthash = ""; try { - Iterator wordHashIterator = wordIndex.wordHashes(hash, true, true); + int currOpenFiles = 0; + Iterator wordHashIterator = this.wordIndex.wordHashes(hash, true, true); plasmaWordIndexEntity indexEntity, tmpEntity; Enumeration urlEnum; Iterator hashIter; @@ -295,9 +307,14 @@ public final class plasmaWordIndexDistribution { plasmaCrawlLURL.Entry lurl; final HashSet unknownURLEntries = new HashSet(); final HashMap knownURLs = new HashMap(); - while ((count > 0) && (wordHashIterator.hasNext()) && - ((nexthash = (String) wordHashIterator.next()) != null) && (nexthash.trim().length() > 0)) { - indexEntity = wordIndex.getEntity(nexthash, true); + while ( + (count > 0) && + (currOpenFiles <= maxOpenFiles) && + (wordHashIterator.hasNext()) && + ((nexthash = (String) wordHashIterator.next()) != null) && + (nexthash.trim().length() > 0) + ) { + indexEntity = this.wordIndex.getEntity(nexthash, true); if (indexEntity.size() == 0) { indexEntity.deleteComplete(); } else if ((indexEntity.size() <= count)|| // if we havn't exceeded the limit @@ -309,12 +326,12 @@ public final class plasmaWordIndexDistribution { unknownURLEntries.clear(); while (urlEnum.hasMoreElements()) { indexEntry = (plasmaWordIndexEntry) urlEnum.nextElement(); - lurl = urlPool.loadedURL.getEntry(indexEntry.getUrlHash()); + lurl = this.urlPool.loadedURL.getEntry(indexEntry.getUrlHash()); if ((lurl == null) || (lurl.toString() == null)) { unknownURLEntries.add(indexEntry.getUrlHash()); } else { if (lurl.toString() == null) { - urlPool.loadedURL.remove(indexEntry.getUrlHash()); + this.urlPool.loadedURL.remove(indexEntry.getUrlHash()); unknownURLEntries.add(indexEntry.getUrlHash()); } else { knownURLs.put(indexEntry.getUrlHash(), lurl); @@ -334,6 +351,7 @@ public final class plasmaWordIndexDistribution { tmpEntities.add(indexEntity); this.log.logFine("Selected whole index (" + indexEntity.size() + " URLs, " + unknownURLEntries.size() + " not bound) for word " + indexEntity.wordHash()); count -= indexEntity.size(); + currOpenFiles++; } } catch (kelondroException e) { this.log.logSevere("plasmaWordIndexDistribution/1: deleted DB for word " + indexEntity.wordHash(), e); @@ -347,12 +365,12 @@ public final class plasmaWordIndexDistribution { unknownURLEntries.clear(); while ((urlEnum.hasMoreElements()) && (count > 0)) { indexEntry = (plasmaWordIndexEntry) urlEnum.nextElement(); - lurl = urlPool.loadedURL.getEntry(indexEntry.getUrlHash()); + lurl = this.urlPool.loadedURL.getEntry(indexEntry.getUrlHash()); if (lurl == null) { unknownURLEntries.add(indexEntry.getUrlHash()); } else { if (lurl.toString() == null) { - urlPool.loadedURL.remove(indexEntry.getUrlHash()); + this.urlPool.loadedURL.remove(indexEntry.getUrlHash()); unknownURLEntries.add(indexEntry.getUrlHash()); } else { knownURLs.put(indexEntry.getUrlHash(), lurl); @@ -367,10 +385,10 @@ public final class plasmaWordIndexDistribution { indexEntity.removeEntry((String) hashIter.next(), true); } // use whats remaining - log.logFine("Selected partial index (" + tmpEntity.size() + " from " + indexEntity.size() +" URLs, " + unknownURLEntries.size() + " not bound) for word " + tmpEntity.wordHash()); + this.log.logFine("Selected partial index (" + tmpEntity.size() + " from " + indexEntity.size() +" URLs, " + unknownURLEntries.size() + " not bound) for word " + tmpEntity.wordHash()); tmpEntities.add(tmpEntity); } catch (kelondroException e) { - log.logSevere("plasmaWordIndexDistribution/2: deleted DB for word " + indexEntity.wordHash(), e); + this.log.logSevere("plasmaWordIndexDistribution/2: deleted DB for word " + indexEntity.wordHash(), e); try {indexEntity.deleteComplete();} catch (IOException ee) {} } indexEntity.close(); // important: is not closed elswhere and cannot be deleted afterwards @@ -380,12 +398,12 @@ public final class plasmaWordIndexDistribution { } // transfer to array plasmaWordIndexEntity[] indexEntities = (plasmaWordIndexEntity[]) tmpEntities.toArray(new plasmaWordIndexEntity[tmpEntities.size()]); - return new Object[]{indexEntities, knownURLs}; + return new Object[]{indexEntities, knownURLs, new Integer(currOpenFiles)}; } catch (IOException e) { - log.logSevere("selectTransferIndexes IO-Error (hash=" + nexthash + "): " + e.getMessage(), e); + this.log.logSevere("selectTransferIndexes IO-Error (hash=" + nexthash + "): " + e.getMessage(), e); return new Object[]{new plasmaWordIndexEntity[0], new HashMap(0)}; } catch (kelondroException e) { - log.logSevere("selectTransferIndexes database corrupted: " + e.getMessage(), e); + this.log.logSevere("selectTransferIndexes database corrupted: " + e.getMessage(), e); return new Object[]{new plasmaWordIndexEntity[0], new HashMap(0)}; } } @@ -641,6 +659,7 @@ public final class plasmaWordIndexDistribution { private boolean finished = false; private boolean gzipBody = false; private int timeout = 60000; + private int maxOpenFiles = 800; private int transferedIndexCount = 0; private String status = "Running"; private String oldStartingPointHash = "------------", startPointHash = "------------"; @@ -659,6 +678,7 @@ public final class plasmaWordIndexDistribution { this.wordsDBSize = sb.wordIndex.size(); this.gzipBody = "true".equalsIgnoreCase(sb.getConfig("indexTransfer.gzipBody","false")); this.timeout = (int) sb.getConfigLong("indexTransfer.timeout",60000); + this.maxOpenFiles = (int) sb.getConfigLong("indexTransfer.maxOpenFiles",800); } public void run() { @@ -742,7 +762,7 @@ public final class plasmaWordIndexDistribution { // selecting 500 words to transfer this.status = "Running: Selecting chunk " + iteration; - Object[] selectResult = selectTransferIndexes(startPointHash, chunkSize); + Object[] selectResult = selectTransferIndexes(this.startPointHash, this.chunkSize, this.maxOpenFiles); newIndexEntities = (plasmaWordIndexEntity[]) selectResult[0]; HashMap urlCache = (HashMap) selectResult[1]; // String (url-hash) / plasmaCrawlLURL.Entry diff --git a/yacy.init b/yacy.init index c19514a2c..f8275961d 100644 --- a/yacy.init +++ b/yacy.init @@ -573,5 +573,9 @@ indexControl.timeout = 60000 indexDistribution.timeout = 60000 indexTransfer.timeout = 120000 +# defining max. allowed amount of open files during index- transfer/distribution +indexDistribution.maxOpenFiles = 800 +indexTransfer.maxOpenFiles = 800 + # storagePeerHash =