diff --git a/htroot/IndexControl_p.java b/htroot/IndexControl_p.java index 9d7fd89c6..5d400b009 100644 --- a/htroot/IndexControl_p.java +++ b/htroot/IndexControl_p.java @@ -117,18 +117,14 @@ public class IndexControl_p { if (post.containsKey("setIndexTransmission")) { if (post.get("indexDistribute", "").equals("on")) { switchboard.setConfig("allowDistributeIndex", "true"); - switchboard.indexDistribution.enable(); } else { switchboard.setConfig("allowDistributeIndex", "false"); - switchboard.indexDistribution.disable(); } if (post.containsKey("indexDistributeWhileCrawling")) { switchboard.setConfig("allowDistributeIndexWhileCrawling", "true"); - switchboard.indexDistribution.enableWhileCrawling(); } else { switchboard.setConfig("allowDistributeIndexWhileCrawling", "false"); - switchboard.indexDistribution.disableWhileCrawling(); } if (post.get("indexReceive", "").equals("on")) { diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 5878c7f13..24d75c5a2 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -153,12 +153,14 @@ import de.anomic.yacy.yacyNewsPool; public final class plasmaSwitchboard extends serverAbstractSwitch implements serverSwitch { // load slots - public static int crawlSlots = 10; - public static int indexingSlots = 100; - public static int stackCrawlSlots = 10000; + public static int crawlSlots = 10; + public static int indexingSlots = 100; + public static int stackCrawlSlots = 10000; + + public static int maxCRLDump = 500000; + public static int maxCRGDump = 200000; + private int dhtTransferIndexCount = 150; - public static int maxCRLDump = 500000; - public static int maxCRGDump = 200000; // couloured list management public static TreeSet badwords = null; @@ -187,7 +189,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser public plasmaCrawlProfile profiles; public plasmaCrawlProfile.entry defaultProxyProfile; public plasmaCrawlProfile.entry defaultRemoteProfile; - public plasmaWordIndexDistribution indexDistribution; public boolean rankingOn; public plasmaRankingDistribution rankingOwnDistribution; public plasmaRankingDistribution rankingOtherDistribution; @@ -551,21 +552,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser peerPing = new serverInstantThread(yc, "peerPing", null), 2000); peerPing.setSyncObject(new Object()); - this.indexDistribution = new plasmaWordIndexDistribution( - this.urlPool, - this.wordIndex, - this.log, - getConfig("allowDistributeIndex", "false").equalsIgnoreCase("true"), - getConfig("allowDistributeIndexWhileCrawling","false").equalsIgnoreCase("true"), - getConfig("indexDistribution.gzipBody","false").equalsIgnoreCase("true"), - (int)getConfigLong("indexDistribution.timeout",60000) /*, - (int)getConfigLong("indexDistribution.maxOpenFiles",800)*/ - ); - indexDistribution.setCounts(150, 1, 3, 10000); getConfig("20_dhtdistribution_threads","1"); for(int i=0; i<(int)getConfigLong("20_dhtdistribution_threads",1);i++) { deployThread("20_dhtdistribution_"+i, "DHT Distribution", "selection, transfer and deletion of index entries that are not searched on your peer, but on others", null, - new serverInstantThread(indexDistribution, "job", null), 60000 + i*5000, + new serverInstantThread(this, "dhtTransferJob", null), 60000 + i*5000, Long.parseLong(getConfig("20_dhtdistribution_idlesleep" , "5000")), Long.parseLong(getConfig("20_dhtdistribution_busysleep" , "0")), Long.parseLong(getConfig("20_dhtdistribution_memprereq" , "1000000"))); @@ -796,7 +786,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser log.logConfig("SWITCHBOARD SHUTDOWN STEP 2: sending termination signal to threaded indexing"); // closing all still running db importer jobs this.dbImportManager.close(); - indexDistribution.close(); cacheLoader.close(); wikiDB.close(); userDB.close(); @@ -1942,9 +1931,105 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser } catch (InterruptedException e) { } transferIdxThread = null; } - } - + } + public boolean dhtTransferJob() { + + if (yacyCore.seedDB == null) { + log.logFine("no DHT distribution: seedDB == null"); + return false; + } + if (yacyCore.seedDB.mySeed == null) { + log.logFine("no DHT distribution: mySeed == null"); + return false; + } + if (yacyCore.seedDB.mySeed.isVirgin()) { + log.logFine("no DHT distribution: status is virgin"); + return false; + } + if (getConfig("allowDistributeIndex","false").equalsIgnoreCase("false")) { + log.logFine("no DHT distribution: not enabled"); + return false; + } + if (urlPool.loadedURL.size() < 10) { + log.logFine("no DHT distribution: loadedURL.size() = " + urlPool.loadedURL.size()); + return false; + } + if (wordIndex.size() < 100) { + log.logFine("no DHT distribution: not enough words - wordIndex.size() = " + wordIndex.size()); + return false; + } + if ((getConfig("allowDistributeIndexWhileCrawling","false").equalsIgnoreCase("false")) && (urlPool.noticeURL.stackSize() > 0)) { + log.logFine("no DHT distribution: crawl in progress - noticeURL.stackSize() = " + urlPool.noticeURL.stackSize()); + return false; + } + + // do the transfer + int peerCount = (yacyCore.seedDB.mySeed.isJunior()) ? 1 : 3; + long starttime = System.currentTimeMillis(); + plasmaDHTChunk dhtChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, 30, dhtTransferIndexCount); + boolean ok = dhtTransferProcess(dhtChunk, peerCount, true); + + if (!ok) { + log.logFine("no word distribution: transfer failed"); + return false; + } + + // adopt transfer count + if ((System.currentTimeMillis() - starttime) > (10000 * peerCount)) + dhtTransferIndexCount--; + else + dhtTransferIndexCount++; + if (dhtTransferIndexCount < 50) dhtTransferIndexCount = 50; + + // show success + return true; + + } + + public boolean dhtTransferProcess(plasmaDHTChunk dhtChunk, int peerCount, boolean delete) { + if ((yacyCore.seedDB == null) || (yacyCore.seedDB.sizeConnected() == 0)) return false; + + // find a list of DHT-peers + yacySeed[] seeds = yacyCore.dhtAgent.getDHTTargets(log, peerCount, 10, dhtChunk.firstContainer().wordHash(), dhtChunk.lastContainer().wordHash(), 0.4); + + if (seeds.length < peerCount) { + log.logWarning("found not enough (" + seeds.length + ") peers for distribution"); + return false; + } + + // send away the indexes to all these peers + String peerNames = ""; + int hc1 = 0; + plasmaDHTTransfer transfer = null; + for (int i = 0; i < seeds.length; i++) { + transfer = new plasmaDHTTransfer(log, seeds[i], dhtChunk, + getConfig("indexDistribution.gzipBody","false").equalsIgnoreCase("true"), + (int)getConfigLong("indexDistribution.timeout",60000), 0); + try {transfer.uploadIndex();} catch (InterruptedException e) {} + + if (transfer.dhtChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) { + peerNames += ", " + seeds[i].getName(); + hc1++; + } + if (hc1 >= peerCount) break; + } + if (peerNames.length() > 0) peerNames = peerNames.substring(2); // remove comma + + // clean up and finish with deletion of indexes + if (hc1 >= peerCount) { + // success + if (delete) { + int deletedURLs = dhtChunk.deleteTransferIndexes(); + this.log.logFine("Deleted from " + dhtChunk.containers().length + " transferred RWIs locally, removed " + deletedURLs + " URL references"); + } + dhtChunk.setStatus(plasmaDHTChunk.chunkStatus_COMPLETE); + return true; + } + this.log.logSevere("Index distribution failed. Too few peers (" + hc1 + ") received the index, not deleted locally."); + return false; + } + public void terminate() { this.terminate = true; this.shutdownSync.V(); diff --git a/source/de/anomic/plasma/plasmaWordIndexDistribution.java b/source/de/anomic/plasma/plasmaWordIndexDistribution.java deleted file mode 100644 index 9dab724d9..000000000 --- a/source/de/anomic/plasma/plasmaWordIndexDistribution.java +++ /dev/null @@ -1,244 +0,0 @@ -//plasmaWordIndexDistribution.java -//------------------------------------------- -//(C) by Michael Peter Christen; mc@anomic.de -//first published on http://www.anomic.de -//Frankfurt, Germany, 2005 -// -// $LastChangedDate$ -// $LastChangedRevision$ -// $LastChangedBy$ -// -//This program is free software; you can redistribute it and/or modify -//it under the terms of the GNU General Public License as published by -//the Free Software Foundation; either version 2 of the License, or -//(at your option) any later version. -// -//This program is distributed in the hope that it will be useful, -//but WITHOUT ANY WARRANTY; without even the implied warranty of -//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -//GNU General Public License for more details. -// -//You should have received a copy of the GNU General Public License -//along with this program; if not, write to the Free Software -//Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA -// -//Using this software in any meaning (reading, learning, copying, compiling, -//running) means that you agree that the Author(s) is (are) not responsible -//for cost, loss of data or any harm that may be caused directly or indirectly -//by usage of this softare or this documentation. The usage of this software -//is on your own risk. The installation and usage (starting/running) of this -//software may allow other people or application to access your computer and -//any attached devices and is highly dependent on the configuration of the -//software which must be done by the user of the software; the author(s) is -//(are) also not responsible for proper configuration and usage of the -//software, even if provoked by documentation provided together with -//the software. -// -//Any changes to this file according to the GPL as documented in the file -//gpl.txt aside this file in the shipment you received can be done to the -//lines that follows this copyright notice here, but changes must not be -//done inside the copyright notive above. A re-distribution must contain -//the intact and unchanged copyright notice. -//Contributions and changes to the program code must be marked as such. - -package de.anomic.plasma; - -import de.anomic.yacy.yacyCore; -import de.anomic.yacy.yacySeed; -import de.anomic.server.logging.serverLog; - -public final class plasmaWordIndexDistribution { - - // distributes parts of the index to other peers - // stops as soon as an error occurrs - - private int indexCount; - private int juniorPeerCount, seniorPeerCount; - private long maxTime; - - private final plasmaURLPool urlPool; - private final plasmaWordIndex wordIndex; - final serverLog log; - boolean paused = false; - private boolean enabled; - private boolean enabledWhileCrawling; - private boolean closed; - private boolean gzipBody4Distribution; - private int timeout4Distribution; - - public plasmaWordIndexDistribution( - plasmaURLPool urlPool, - plasmaWordIndex wordIndex, - serverLog log, - boolean enable, - boolean enabledWhileCrawling, - boolean gzipBody, - int timeout - ) { - this.urlPool = urlPool; - this.wordIndex = wordIndex; - this.enabled = enable; - this.enabledWhileCrawling = enabledWhileCrawling; - this.log = log; - this.closed = false; - setCounts(100 /*indexCount*/, 1 /*juniorPeerCount*/, 3 /*seniorPeerCount*/, 8000); - this.gzipBody4Distribution = gzipBody; - this.timeout4Distribution = timeout; - } - - public void enable() { - enabled = true; - } - - public void disable() { - enabled = false; - } - - public void enableWhileCrawling() { - this.enabledWhileCrawling = true; - } - - public void disableWhileCrawling() { - this.enabledWhileCrawling = false; - } - - public void close() { - closed = true; - - } - - private boolean isClosed() { - return (this.closed || Thread.currentThread().isInterrupted()); - } - - public boolean job() { - - if (this.closed) { - log.logFine("no DHT distribution: closed"); - return false; - } - if (yacyCore.seedDB == null) { - log.logFine("no DHT distribution: seedDB == null"); - return false; - } - if (yacyCore.seedDB.mySeed == null) { - log.logFine("no DHT distribution: mySeed == null"); - return false; - } - if (yacyCore.seedDB.mySeed.isVirgin()) { - log.logFine("no DHT distribution: status is virgin"); - return false; - } - if (!(enabled)) { - log.logFine("no DHT distribution: not enabled"); - return false; - } - if (paused) { - log.logFine("no DHT distribution: paused"); - return false; - } - if (urlPool.loadedURL.size() < 10) { - log.logFine("no DHT distribution: loadedURL.size() = " + urlPool.loadedURL.size()); - return false; - } - if (wordIndex.size() < 100) { - log.logFine("no DHT distribution: not enough words - wordIndex.size() = " + wordIndex.size()); - return false; - } - if ((!enabledWhileCrawling) && (urlPool.noticeURL.stackSize() > 0)) { - log.logFine("no DHT distribution: crawl in progress - noticeURL.stackSize() = " + urlPool.noticeURL.stackSize()); - return false; - } - - // do the transfer - int peerCount = (yacyCore.seedDB.mySeed.isJunior()) ? juniorPeerCount : seniorPeerCount; - long starttime = System.currentTimeMillis(); - int transferred = performTransferIndex(indexCount, peerCount, true); - - if (transferred <= 0) { - log.logFine("no word distribution: transfer failed"); - return false; - } - - // adopt transfer count - if ((System.currentTimeMillis() - starttime) > (maxTime * peerCount)) - indexCount--; - else - indexCount++; - if (indexCount < 50) indexCount = 50; - - // show success - return true; - - } - - public void setCounts(int indexCount, int juniorPeerCount, int seniorPeerCount, long maxTimePerTransfer) { - this.maxTime = maxTimePerTransfer; - this.indexCount = indexCount; - if (indexCount < 30) indexCount = 30; - this.juniorPeerCount = juniorPeerCount; - this.seniorPeerCount = seniorPeerCount; - } - - public int performTransferIndex(int indexCount, int peerCount, boolean delete) { - if ((yacyCore.seedDB == null) || (yacyCore.seedDB.sizeConnected() == 0)) return -1; - - // collect index - plasmaDHTChunk dhtChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, 30, indexCount); - - try { - // find a list of DHT-peers - yacySeed[] seeds = yacyCore.dhtAgent.getDHTTargets(log, peerCount, 10, dhtChunk.firstContainer().wordHash(), dhtChunk.lastContainer().wordHash(), 0.4); - - if (seeds.length < peerCount) { - log.logWarning("found not enough (" + seeds.length + ") peers for distribution"); - return -1; - } - - // send away the indexes to all these peers - String peerNames = ""; - int hc1 = 0; - plasmaDHTTransfer transfer = null; - for (int i = 0; i < seeds.length; i++) { - if (this.isClosed()) { - this.log.logSevere("Index distribution interrupted by close, nothing deleted locally."); - return -1; // interrupted - } - transfer = new plasmaDHTTransfer(log, seeds[i], dhtChunk, this.gzipBody4Distribution, this.timeout4Distribution, 0); - try {transfer.uploadIndex();} catch (InterruptedException e) {} - - if (transfer.dhtChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) { - peerNames += ", " + seeds[i].getName(); - hc1++; - } - if (hc1 >= peerCount) break; - } - if (peerNames.length() > 0) peerNames = peerNames.substring(2); // remove comma - - // clean up and finish with deletion of indexes - if (hc1 >= peerCount) { - // success - if (delete) { - int deletedURLs = dhtChunk.deleteTransferIndexes(); - this.log.logFine("Deleted from " + dhtChunk.containers().length + " transferred RWIs locally, removed " + deletedURLs + " URL references"); - return indexCount; - } - return indexCount; - } - this.log.logSevere("Index distribution failed. Too few peers (" + hc1 + ") received the index, not deleted locally."); - return -1; - } finally { - if (dhtChunk.containers() != null) { - // simply close the indexEntities - closeTransferIndexes(dhtChunk.containers()); - } - } - } - - void closeTransferIndexes(plasmaWordIndexEntryContainer[] indexContainers) { - for (int i = 0; i < indexContainers.length; i++) { - indexContainers[i] = null; - } - } - -}