From 85ac7d8386cf198742f5629f5d9c9d5736a7a82b Mon Sep 17 00:00:00 2001 From: orbiter Date: Sun, 19 Feb 2006 01:36:42 +0000 Subject: [PATCH] * moved DHT transfer thread to own class file, needed for further modularization * changed status handling * added forced cache flush when cache has containers with too high number of index entries git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@1702 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- htroot/IndexTransfer_p.java | 6 +- source/de/anomic/plasma/plasmaDHTChunk.java | 54 ++++ .../de/anomic/plasma/plasmaDHTTransfer.java | 210 +++++++++++++++ source/de/anomic/plasma/plasmaWordIndex.java | 5 + .../plasma/plasmaWordIndexDistribution.java | 240 ++---------------- 5 files changed, 292 insertions(+), 223 deletions(-) create mode 100644 source/de/anomic/plasma/plasmaDHTTransfer.java diff --git a/htroot/IndexTransfer_p.java b/htroot/IndexTransfer_p.java index b921468ea..508b99e45 100644 --- a/htroot/IndexTransfer_p.java +++ b/htroot/IndexTransfer_p.java @@ -100,7 +100,7 @@ public final class IndexTransfer_p { if (transfThread != null) { String[] status = transfThread.getStatus(); String[] range = transfThread.getRange(); - int[] chunk = transfThread.getChunkSize(); + int[] chunk = transfThread.getIndexCount(); prop.put("running_selection.status",status[0]); prop.put("running_selection.twrange", range[0]); @@ -111,9 +111,9 @@ public final class IndexTransfer_p { prop.put("running_transfer.twchunk", Integer.toString(chunk[1])); - prop.put("running_twEntityCount",transfThread.getTransferedEntityCount()); + //prop.put("running_twEntityCount",transfThread.getTransferedEntityCount()); prop.put("running_twEntryCount",transfThread.getTransferedEntryCount()); - prop.put("running_twEntityPercent",Float.toString(transfThread.getTransferedEntityPercent())); + //prop.put("running_twEntityPercent",Float.toString(transfThread.getTransferedEntityPercent())); prop.put("running_twEntitySpeed",Integer.toString(transfThread.getTransferedEntitySpeed())); prop.put("running_deleteIndex", transfThread.deleteIndex()?1:0); diff --git a/source/de/anomic/plasma/plasmaDHTChunk.java b/source/de/anomic/plasma/plasmaDHTChunk.java index 358f295cc..64632ca24 100644 --- a/source/de/anomic/plasma/plasmaDHTChunk.java +++ b/source/de/anomic/plasma/plasmaDHTChunk.java @@ -1,3 +1,44 @@ +// plasmaDHTChunk.java +// ------------------------------ +// part of YaCy +// (C) by Michael Peter Christen; mc@anomic.de +// first published on http://www.anomic.de +// Frankfurt, Germany, 2006 +// created: 18.02.2006 +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// Using this software in any meaning (reading, learning, copying, compiling, +// running) means that you agree that the Author(s) is (are) not responsible +// for cost, loss of data or any harm that may be caused directly or indirectly +// by usage of this softare or this documentation. The usage of this software +// is on your own risk. The installation and usage (starting/running) of this +// software may allow other people or application to access your computer and +// any attached devices and is highly dependent on the configuration of the +// software which must be done by the user of the software; the author(s) is +// (are) also not responsible for proper configuration and usage of the +// software, even if provoked by documentation provided together with +// the software. +// +// Any changes to this file according to the GPL as documented in the file +// gpl.txt aside this file in the shipment you received can be done to the +// lines that follows this copyright notice here, but changes must not be +// done inside the copyright notive above. A re-distribution must contain +// the intact and unchanged copyright notice. +// Contributions and changes to the program code must be marked as such. + package de.anomic.plasma; import java.io.IOException; @@ -27,6 +68,7 @@ public class plasmaDHTChunk { private plasmaCrawlLURL lurls; private int status = chunkStatus_UNDEFINED; + private String statusMessage = ""; private String startPointHash; private plasmaWordIndexEntryContainer[] indexContainers = null; private HashMap urlCache; // String (url-hash) / plasmaCrawlLURL.Entry @@ -64,10 +106,22 @@ public class plasmaDHTChunk { return urlCache; } + public void setStatus(int newStatus) { + this.status = newStatus; + } + public int getStatus() { return this.status; } + public void setStatusMessage(String message) { + this.statusMessage = message; + } + + public String getStatusMessage() { + return statusMessage; + } + public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount) { this.log = log; this.wordIndex = wordIndex; diff --git a/source/de/anomic/plasma/plasmaDHTTransfer.java b/source/de/anomic/plasma/plasmaDHTTransfer.java new file mode 100644 index 000000000..68a8a795d --- /dev/null +++ b/source/de/anomic/plasma/plasmaDHTTransfer.java @@ -0,0 +1,210 @@ +// plasmaDHTTransfer.java +// ------------------------------ +// part of YaCy +// (C) by Michael Peter Christen; mc@anomic.de +// first published on http://www.anomic.de +// Frankfurt, Germany, 2006 +// created: 19.02.2006 +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// Using this software in any meaning (reading, learning, copying, compiling, +// running) means that you agree that the Author(s) is (are) not responsible +// for cost, loss of data or any harm that may be caused directly or indirectly +// by usage of this softare or this documentation. The usage of this software +// is on your own risk. The installation and usage (starting/running) of this +// software may allow other people or application to access your computer and +// any attached devices and is highly dependent on the configuration of the +// software which must be done by the user of the software; the author(s) is +// (are) also not responsible for proper configuration and usage of the +// software, even if provoked by documentation provided together with +// the software. +// +// Any changes to this file according to the GPL as documented in the file +// gpl.txt aside this file in the shipment you received can be done to the +// lines that follows this copyright notice here, but changes must not be +// done inside the copyright notive above. A re-distribution must contain +// the intact and unchanged copyright notice. +// Contributions and changes to the program code must be marked as such. + +package de.anomic.plasma; + +import java.util.HashMap; + +import de.anomic.server.logging.serverLog; +import de.anomic.yacy.yacyClient; +import de.anomic.yacy.yacyCore; +import de.anomic.yacy.yacySeed; + +public class plasmaDHTTransfer extends Thread { + + // connection properties + private boolean gzipBody4Transfer = false; + private int timeout4Transfer = 60000; + + // status fields + private boolean finished = false; + boolean success = false; + private long iteration = 0; + private long transferTime = 0; + + // delivery destination + yacySeed seed = null; + + // word chunk + private String endPointHash; + private String startPointHash; + plasmaDHTChunk dhtChunk; + + // other fields + HashMap urlCache; + serverLog log; + + public plasmaDHTTransfer(serverLog log, yacySeed seed, plasmaDHTChunk dhtChunk, + boolean gzipBody, int timeout, long iteration, String endPointHash, String startPointHash) { + super(new ThreadGroup("TransferIndexThreadGroup"), "TransferIndexWorker_" + seed.getName()); + this.log = log; + this.gzipBody4Transfer = gzipBody; + this.timeout4Transfer = timeout; + this.iteration = iteration; + this.seed = seed; + this.dhtChunk = dhtChunk; + this.startPointHash = startPointHash; + this.endPointHash = endPointHash; + } + + public void run() { + try { + uploadIndex(); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } finally { + + } + } + + public int getStatus() { + return dhtChunk.getStatus(); + } + + public String getStatusMessage() { + return dhtChunk.getStatusMessage(); + } + + public boolean success() { + return this.success; + } + + public int getIndexCount() { + return this.dhtChunk.indexCount(); + } + + private boolean isAborted() { + if (finished || Thread.currentThread().isInterrupted()) { + return true; + } + return false; + } + + public void stopIt() { + this.finished = true; + } + + public String getRange() { + return "[" + startPointHash + ".." + endPointHash + "]"; + } + + public long getTransferTime() { + return this.transferTime; + } + + private void uploadIndex() throws InterruptedException { + + /* loop until we + * - have successfully transfered the words list or + * - the retry counter limit was exceeded + */ + long retryCount = 0, start = System.currentTimeMillis(); + while (true) { + // testing if we wer aborted + if (isAborted()) return; + + // transfering seleted words to remote peer + dhtChunk.setStatusMessage("Running: Transfering chunk " + iteration); + dhtChunk.setStatus(plasmaDHTChunk.chunkStatus_RUNNING); + String error = yacyClient.transferIndex(seed, dhtChunk.containers(), urlCache, gzipBody4Transfer, timeout4Transfer); + if (error == null) { + // words successfully transfered + transferTime = System.currentTimeMillis() - start; + this.log.logInfo("Index transfer of " + dhtChunk.indexCount() + " words [" + dhtChunk.firstContainer().wordHash() + " .. " + dhtChunk.lastContainer().wordHash() + "]" + " to peer " + seed.getName() + ":" + seed.hash + " in " + (transferTime / 1000) + " seconds successfull (" + + (1000 * dhtChunk.indexCount() / (transferTime + 1)) + " words/s)"); + retryCount = 0; + + this.success = true; + dhtChunk.setStatusMessage("Finished: Transfer of chunk " + iteration); + dhtChunk.setStatus(plasmaDHTChunk.chunkStatus_COMPLETE); + break; + } else { + // worts transfer failed + + // inc retry counter + retryCount++; + + // testing if we were aborted ... + if (isAborted()) + return; + + // we have lost the connection to the remote peer. Adding peer to disconnected list + this.log.logWarning("Index transfer to peer " + seed.getName() + ":" + seed.hash + " failed:'" + error + "', disconnecting peer"); + yacyCore.peerActions.peerDeparture(seed); + + // if the retry counter limit was not exceeded we'll retry it in a few seconds + dhtChunk.setStatusMessage("Disconnected peer: " + ((retryCount > 5) ? error + ". Transfer aborted" : "Retry " + retryCount)); + if (retryCount > 5) { + dhtChunk.setStatus(plasmaDHTChunk.chunkStatus_FAILED); + return; + } + Thread.sleep(retryCount * 5000); + + /* loop until + * - we have successfully done a peer ping or + * - the retry counter limit was exceeded + */ + while (true) { + // testing if we were aborted ... + if (isAborted()) + return; + + // doing a peer ping to the remote seed + int added = yacyClient.publishMySeed(seed.getAddress(), seed.hash); + if (added < 0) { + // inc. retry counter + retryCount++; + dhtChunk.setStatusMessage("Disconnected peer: Peer ping failed. " + ((retryCount > 5) ? "Transfer aborted." : "Retry " + retryCount)); + if (retryCount > 5) return; + Thread.sleep(retryCount * 5000); + continue; + } else { + yacyCore.seedDB.getConnected(seed.hash); + dhtChunk.setStatusMessage("running"); + break; + } + } + } + } + } + +} diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index fd2795e54..4b317d9da 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -118,6 +118,11 @@ public final class plasmaWordIndex { int added = ramCache.addEntries(entries, updateTime, highPriority); // force flush + while (ramCache.maxURLinWordCache() > 150) { + try { Thread.sleep(10); } catch (InterruptedException e) { } + flushCacheToBackend(ramCache.bestFlushWordHash()); + } + if (highPriority) { if (ramCache.size() > ramCache.getMaxWordsHigh()) { while (ramCache.size() + 500 > ramCache.getMaxWordsHigh()) { diff --git a/source/de/anomic/plasma/plasmaWordIndexDistribution.java b/source/de/anomic/plasma/plasmaWordIndexDistribution.java index 21125fb90..8b4787856 100644 --- a/source/de/anomic/plasma/plasmaWordIndexDistribution.java +++ b/source/de/anomic/plasma/plasmaWordIndexDistribution.java @@ -43,9 +43,7 @@ package de.anomic.plasma; -import java.io.IOException; import java.util.Enumeration; -import java.util.HashMap; import de.anomic.yacy.yacyCore; import de.anomic.yacy.yacySeed; import de.anomic.yacy.yacyClient; @@ -192,10 +190,9 @@ public final class plasmaWordIndexDistribution { if ((yacyCore.seedDB == null) || (yacyCore.seedDB.sizeConnected() == 0)) return -1; // collect index - plasmaDHTChunk dhtChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, 30, indexCount); + try { - // find start point for DHT-selection String keyhash = dhtChunk.lastContainer().wordHash(); // DHT targets must have greater hashes @@ -275,34 +272,6 @@ public final class plasmaWordIndexDistribution { } } - void closeTransferIndex(plasmaWordIndexEntity indexEntity) throws IOException { - Object migrationStatus; - indexEntity.close(); - try { - String wordhash = indexEntity.wordHash(); - migrationStatus = wordIndex.migrateWords2Assortment(wordhash); - if (migrationStatus instanceof Integer) { - int migrationCount = ((Integer) migrationStatus).intValue(); - if (migrationCount == 0) - log.logFine("SKIPPED " + wordhash + ": empty"); - else if (migrationCount > 0) - log.logFine("MIGRATED " + wordhash + ": " + migrationCount + " entries"); - else - log.logFine("REVERSED " + wordhash + ": " + (-migrationCount) + " entries"); - } else if (migrationStatus instanceof String) { - log.logFine("SKIPPED " + wordhash + ": " + migrationStatus); - } - } catch (Exception e) { - log.logWarning("EXCEPTION: ", e); - } - } - - void closeTransferIndexes(plasmaWordIndexEntity[] indexEntities) { - for (int i = 0; i < indexEntities.length; i++) try { - closeTransferIndex(indexEntities[i]); - } catch (IOException ee) {} - } - void closeTransferIndexes(plasmaWordIndexEntryContainer[] indexContainers) { for (int i = 0; i < indexContainers.length; i++) { indexContainers[i] = null; @@ -334,174 +303,6 @@ public final class plasmaWordIndexDistribution { } } - private class transferIndexWorkerThread extends Thread{ - // connection properties - private boolean gzipBody4Transfer = false; - private int timeout4Transfer = 60000; - - - // status fields - private boolean finished = false; - boolean success = false; - private String status = "running"; - private long iteration = 0; - private long transferTime = 0; - private int idxCount = 0; - private int chunkSize = 0; - - // delivery destination - yacySeed seed = null; - - // word chunk - private String endPointHash; - private String startPointHash; - plasmaWordIndexEntryContainer[] indexContainers; - - // other fields - HashMap urlCache; - - public transferIndexWorkerThread( - yacySeed seed, - plasmaWordIndexEntryContainer[] indexContainers, - HashMap urlCache, - boolean gzipBody, - int timeout, - long iteration, - int idxCount, - int chunkSize, - String endPointHash, - String startPointHash) { - super(new ThreadGroup("TransferIndexThreadGroup"),"TransferIndexWorker_" + seed.getName()); - this.gzipBody4Transfer = gzipBody; - this.timeout4Transfer = timeout; - this.iteration = iteration; - this.seed = seed; - this.indexContainers = indexContainers; - this.urlCache = urlCache; - this.idxCount = idxCount; - this.chunkSize = chunkSize; - this.startPointHash = startPointHash; - this.endPointHash = endPointHash; - } - - public void run() { - try { - uploadIndex(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } finally { - - } - } - - public boolean success() { - return this.success; - } - - public int getChunkSize() { - return this.chunkSize; - } - - public String getStatus() { - return this.status; - } - - private boolean isAborted() { - if (finished || Thread.currentThread().isInterrupted()) { - return true; - } - return false; - } - - public void stopIt() { - this.finished = true; - } - - public String getRange() { - return "[" + startPointHash + ".." + endPointHash + "]"; - } - - public long getTransferTime() { - return this.transferTime; - } - - private void uploadIndex() throws InterruptedException { - - /* loop until we - * - have successfully transfered the words list or - * - the retry counter limit was exceeded - */ - long retryCount = 0, start = System.currentTimeMillis(); - while (true) { - // testing if we wer aborted - if (isAborted()) return; - - // transfering seleted words to remote peer - this.status = "Running: Transfering chunk " + iteration; - String error = yacyClient.transferIndex(seed, indexContainers, urlCache, gzipBody4Transfer, timeout4Transfer); - if (error == null) { - // words successfully transfered - transferTime = System.currentTimeMillis() - start; - plasmaWordIndexDistribution.this.log.logInfo("Index transfer of " + idxCount + " words [" + indexContainers[0].wordHash() + " .. " + indexContainers[indexContainers.length-1].wordHash() + "]" + - " to peer " + seed.getName() + ":" + seed.hash + " in " + (transferTime/1000) + " seconds successfull (" + - (1000 * idxCount / (transferTime + 1)) + " words/s)"); - retryCount = 0; - -// if (transferTime > 30000) { -// if (chunkSize>100) chunkSize-=50; -// } else { -// chunkSize+=50; -// } - this.success = true; - this.status = "Finished: Transfer of chunk " + iteration; - break; - } else { - // worts transfer failed - - // inc retry counter - retryCount++; - - // testing if we were aborted ... - if (isAborted()) return; - - // we have lost the connection to the remote peer. Adding peer to disconnected list - plasmaWordIndexDistribution.this.log.logWarning("Index transfer to peer " + seed.getName() + ":" + seed.hash + " failed:'" + error + "', disconnecting peer"); - yacyCore.peerActions.peerDeparture(seed); - - // if the retry counter limit was not exceeded we'll retry it in a few seconds - this.status = "Disconnected peer: " + ((retryCount > 5)? error + ". Transfer aborted":"Retry " + retryCount); - if (retryCount > 5) return; - Thread.sleep(retryCount*5000); - - /* loop until - * - we have successfully done a peer ping or - * - the retry counter limit was exceeded - */ - while (true) { - // testing if we were aborted ... - if (isAborted()) return; - - // doing a peer ping to the remote seed - int added = yacyClient.publishMySeed(seed.getAddress(), seed.hash); - if (added < 0) { - // inc. retry counter - retryCount++; - this.status = "Disconnected peer: Peer ping failed. " + ((retryCount > 5)?"Transfer aborted.":"Retry " + retryCount); - if (retryCount > 5) return; - Thread.sleep(retryCount*5000); - continue; - } else { - yacyCore.seedDB.getConnected(seed.hash); - this.status = "running"; - break; - } - } - } - } - } - } - public class transferIndexThread extends Thread { private yacySeed seed = null; private boolean delete = false; @@ -509,14 +310,14 @@ public final class plasmaWordIndexDistribution { private boolean gzipBody4Transfer = false; private int timeout4Transfer = 60000; private int transferedEntryCount = 0; - private int transferedEntityCount = 0; + private int transferedContainerCount = 0; private String status = "Running"; private String oldStartingPointHash = "------------", startPointHash = "------------"; private int initialWordsDBSize = 0; private int chunkSize = 500; private final long startingTime = System.currentTimeMillis(); private final plasmaSwitchboard sb; - private transferIndexWorkerThread worker = null; + private plasmaDHTTransfer worker = null; public transferIndexThread(yacySeed seed, boolean delete) { super(new ThreadGroup("TransferIndexThreadGroup"),"TransferIndex_" + seed.getName()); @@ -546,28 +347,28 @@ public final class plasmaWordIndexDistribution { return this.delete; } - public int[] getChunkSize() { - transferIndexWorkerThread workerThread = this.worker; + public int[] getIndexCount() { + plasmaDHTTransfer workerThread = this.worker; if (workerThread != null) { - return new int[]{this.chunkSize,workerThread.getChunkSize()}; + return new int[]{this.chunkSize, workerThread.getIndexCount()}; } - return new int[]{this.chunkSize,500}; + return new int[]{this.chunkSize, 500}; } public int getTransferedEntryCount() { return this.transferedEntryCount; } - public int getTransferedEntityCount() { - return this.transferedEntityCount; + public int getTransferedContainerCount() { + return this.transferedContainerCount; } - public float getTransferedEntityPercent() { + public float getTransferedContainerPercent() { long currentWordsDBSize = sb.wordIndex.size(); if (initialWordsDBSize == 0) return 100; else if (currentWordsDBSize >= initialWordsDBSize) return 0; //else return (float) ((initialWordsDBSize-currentWordsDBSize)/(initialWordsDBSize/100)); - else return (float)(this.transferedEntityCount*100/initialWordsDBSize); + else return (float)(this.transferedContainerCount*100/initialWordsDBSize); } public int getTransferedEntitySpeed() { @@ -581,15 +382,15 @@ public final class plasmaWordIndexDistribution { } public String[] getStatus() { - transferIndexWorkerThread workerThread = this.worker; + plasmaDHTTransfer workerThread = this.worker; if (workerThread != null) { - return new String[]{this.status,workerThread.getStatus()}; + return new String[]{this.status,workerThread.getStatusMessage()}; } return new String[]{this.status,"Not running"}; } public String[] getRange() { - transferIndexWorkerThread workerThread = this.worker; + plasmaDHTTransfer workerThread = this.worker; if (workerThread != null) { return new String[]{"[" + oldStartingPointHash + ".." + startPointHash + "]",workerThread.getRange()}; } @@ -670,7 +471,7 @@ public final class plasmaWordIndexDistribution { * Addintionally we recalculate the chunk size to optimize performance */ - this.chunkSize = worker.getChunkSize(); + this.chunkSize = worker.getIndexCount(); long transferTime = worker.getTransferTime(); //TODO: only increase chunk Size if there is free memory left on the server @@ -691,12 +492,12 @@ public final class plasmaWordIndexDistribution { if (delete) { this.status = "Running: Deleting chunk " + iteration; transferedEntryCount += oldDHTChunk.indexCount(); - transferedEntityCount += oldDHTChunk.containerSize(); + transferedContainerCount += oldDHTChunk.containerSize(); int urlReferences = oldDHTChunk.deleteTransferIndexes(); plasmaWordIndexDistribution.this.log.logFine("Deleted from " + oldDHTChunk.containerSize() + " transferred RWIs locally " + urlReferences + " URL references"); } else { transferedEntryCount += oldDHTChunk.indexCount(); - transferedEntityCount += oldDHTChunk.containerSize(); + transferedContainerCount += oldDHTChunk.containerSize(); } oldDHTChunk = null; } @@ -707,10 +508,9 @@ public final class plasmaWordIndexDistribution { if ((newDHTChunk != null) && (newDHTChunk.containerSize() > 0) || (newDHTChunk.getStatus() == plasmaDHTChunk.chunkStatus_FILLED)) { - worker = new transferIndexWorkerThread(seed, newDHTChunk.containers(), newDHTChunk.urlCacheMap(), - gzipBody4Transfer, timeout4Transfer, iteration, - newDHTChunk.indexCount(), newDHTChunk.indexCount(), - startPointHash, oldStartingPointHash); + worker = new plasmaDHTTransfer(log, seed, newDHTChunk, + gzipBody4Transfer, timeout4Transfer, iteration, + startPointHash, oldStartingPointHash); worker.start(); } }