From 2cb084d426defd82272c228f1eee2e4d069a5c1d Mon Sep 17 00:00:00 2001
From: theli
Date: Mon, 12 Sep 2005 10:37:16 +0000
Subject: [PATCH] *) Complete Index Transfer See:
http://www.yacy-forum.de/viewtopic.php?p=9622
git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@707 6c8d7289-2bf4-0310-a012-ef5d649a1542
---
htroot/IndexTransfer_p.html | 55 +++++++
htroot/IndexTransfer_p.java | 126 ++++++++++++++
.../plasma/plasmaWordIndexDistribution.java | 154 +++++++++++++++++-
source/de/anomic/yacy/yacyClient.java | 8 +-
4 files changed, 337 insertions(+), 6 deletions(-)
create mode 100644 htroot/IndexTransfer_p.html
create mode 100644 htroot/IndexTransfer_p.java
diff --git a/htroot/IndexTransfer_p.html b/htroot/IndexTransfer_p.html
new file mode 100644
index 000000000..973b812b6
--- /dev/null
+++ b/htroot/IndexTransfer_p.html
@@ -0,0 +1,55 @@
+
+
+
+YaCy '#[clientname]#': Index Control
+#[metas]#
+
+
+#[header]#
+
+Index Transfer
+
+
+::
+
+ #[status]# |
+ #[twcount]# (#[twpercent]#%) |
+ #[twrange]# |
+ #[peerName]# |
+ #(stopped)#::
+ #(/stopped)# |
+
+#(/running)#
+
+
+
+#[footer]#
+
+
diff --git a/htroot/IndexTransfer_p.java b/htroot/IndexTransfer_p.java
new file mode 100644
index 000000000..0ddcc14ea
--- /dev/null
+++ b/htroot/IndexTransfer_p.java
@@ -0,0 +1,126 @@
+//IndexControl_p.java
+//-----------------------
+//part of the AnomicHTTPD caching proxy
+//(C) by Michael Peter Christen; mc@anomic.de
+//first published on http://www.anomic.de
+//Frankfurt, Germany, 2004
+//last change: 02.05.2004
+//
+//This program is free software; you can redistribute it and/or modify
+//it under the terms of the GNU General Public License as published by
+//the Free Software Foundation; either version 2 of the License, or
+//(at your option) any later version.
+//
+//This program is distributed in the hope that it will be useful,
+//but WITHOUT ANY WARRANTY; without even the implied warranty of
+//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+//GNU General Public License for more details.
+//
+//You should have received a copy of the GNU General Public License
+//along with this program; if not, write to the Free Software
+//Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+//
+//Using this software in any meaning (reading, learning, copying, compiling,
+//running) means that you agree that the Author(s) is (are) not responsible
+//for cost, loss of data or any harm that may be caused directly or indirectly
+//by usage of this softare or this documentation. The usage of this software
+//is on your own risk. The installation and usage (starting/running) of this
+//software may allow other people or application to access your computer and
+//any attached devices and is highly dependent on the configuration of the
+//software which must be done by the user of the software; the author(s) is
+//(are) also not responsible for proper configuration and usage of the
+//software, even if provoked by documentation provided together with
+//the software.
+//
+//Any changes to this file according to the GPL as documented in the file
+//gpl.txt aside this file in the shipment you received can be done to the
+//lines that follows this copyright notice here, but changes must not be
+//done inside the copyright notive above. A re-distribution must contain
+//the intact and unchanged copyright notice.
+//Contributions and changes to the program code must be marked as such.
+
+//You must compile this file with
+//javac -classpath .:../Classes IndexControl_p.java
+//if the shell's current path is HTROOT
+
+import java.util.Enumeration;
+import java.util.NoSuchElementException;
+import java.util.TreeMap;
+
+import de.anomic.http.httpHeader;
+import de.anomic.plasma.plasmaSwitchboard;
+import de.anomic.plasma.plasmaWordIndexDistribution;
+import de.anomic.server.serverObjects;
+import de.anomic.server.serverSwitch;
+import de.anomic.yacy.yacyCore;
+import de.anomic.yacy.yacySeed;
+
+public class IndexTransfer_p {
+
+ public static serverObjects respond(httpHeader header, serverObjects post, serverSwitch env) {
+ // return variable that accumulates replacements
+ plasmaSwitchboard switchboard = (plasmaSwitchboard) env;
+ serverObjects prop = new serverObjects();
+
+ if (post != null) {
+ if (post.containsKey("startIndexTransfer")) {
+ yacySeed seed = yacyCore.seedDB.getConnected(post.get("hostHash", ""));
+ if (seed == null) return prop;
+
+ switchboard.indexDistribution.startTransferWholeIndex(seed,true);
+ } else if (post.containsKey("stopIndexTransfer")) {
+ switchboard.indexDistribution.stopTransferWholeIndex();
+ } else if (post.containsKey("newIndexTransfer")) {
+ switchboard.indexDistribution.abortTransferWholeIndex();
+ }
+ }
+
+
+ // insert constants
+ plasmaWordIndexDistribution.transferIndexThread transfThread = switchboard.indexDistribution.transferIdxThread;
+ int wcount = 0, ucount = 0;
+ prop.put("wcount", Integer.toString(wcount = switchboard.wordIndex.size()));
+ prop.put("ucount", Integer.toString(ucount = switchboard.urlPool.loadedURL.size()));
+ prop.put("running",(transfThread==null)?0:1);
+ if (transfThread != null) {
+ int transferedIdxCount = transfThread.getTransferedIndexCount();
+ prop.put("running_status",transfThread.getStatus());
+ prop.put("running_twcount",transferedIdxCount);
+ prop.put("running_twpercent",Float.toString(transferedIdxCount*100/wcount));
+ prop.put("running_twrange", transfThread.getRange());
+ prop.put("running_peerName",transfThread.getSeed().getName());
+ prop.put("running_stopped",(transfThread.isFinished()) || (!transfThread.isAlive())?1:0);
+ }
+
+
+
+ //List known hosts
+ yacySeed seed;
+ int hc = 0;
+ if ((yacyCore.seedDB != null) && (yacyCore.seedDB.sizeConnected() > 0)) {
+ Enumeration e = yacyCore.dhtAgent.getAcceptRemoteIndexSeeds("------------");
+ TreeMap hostList = new TreeMap();
+ while (e.hasMoreElements()) {
+ seed = (yacySeed) e.nextElement();
+ if (seed != null) hostList.put(seed.get("Name", "nameless"),seed.hash);
+ }
+
+ String hostName = null;
+ try {
+ while ((hostName = (String) hostList.firstKey()) != null) {
+ prop.put("running_hosts_" + hc + "_hosthash", hostList.get(hostName));
+ prop.put("running_hosts_" + hc + "_hostname", /*seed.hash + " " +*/ hostName);
+ hc++;
+ hostList.remove(hostName);
+ }
+ } catch (NoSuchElementException ex) {}
+ prop.put("running_hosts", Integer.toString(hc));
+ } else {
+ prop.put("running_hosts", "0");
+ }
+
+ return prop;
+ }
+
+
+}
diff --git a/source/de/anomic/plasma/plasmaWordIndexDistribution.java b/source/de/anomic/plasma/plasmaWordIndexDistribution.java
index 3fd37305b..8c8b6d7ce 100644
--- a/source/de/anomic/plasma/plasmaWordIndexDistribution.java
+++ b/source/de/anomic/plasma/plasmaWordIndexDistribution.java
@@ -29,11 +29,14 @@ public class plasmaWordIndexDistribution {
private plasmaURLPool urlPool;
private plasmaWordIndex wordIndex;
- private serverLog log;
+ serverLog log;
+ boolean paused = false;
private boolean enabled;
private boolean enabledWhileCrawling;
private boolean closed;
+ public transferIndexThread transferIdxThread = null;
+
public plasmaWordIndexDistribution(plasmaURLPool urlPool, plasmaWordIndex wordIndex, serverLog log,
boolean enable, boolean enabledWhileCrawling) {
this.urlPool = urlPool;
@@ -63,6 +66,9 @@ public class plasmaWordIndexDistribution {
public void close() {
closed = true;
+ if (transferIdxThread != null) {
+ stopTransferWholeIndex();
+ }
}
public boolean job() {
@@ -87,6 +93,10 @@ public class plasmaWordIndexDistribution {
log.logFine("no word distribution: not enabled");
return false;
}
+ if (paused) {
+ log.logFine("no word distribution: paused");
+ return false;
+ }
if (urlPool.loadedURL.size() < 10) {
log.logFine("no word distribution: loadedURL.size() = " + urlPool.loadedURL.size());
return false;
@@ -227,7 +237,7 @@ public class plasmaWordIndexDistribution {
return startPointHash;
}
- private Object[] /* of {plasmaWordIndexEntity[], HashMap(String, plasmaCrawlLURL.Entry)}*/
+ Object[] /* of {plasmaWordIndexEntity[], HashMap(String, plasmaCrawlLURL.Entry)}*/
selectTransferIndexes(String hash, int count) {
// the hash is a start hash from where the indexes are picked
Vector tmpEntities = new Vector();
@@ -332,7 +342,7 @@ public class plasmaWordIndexDistribution {
}
}
- private boolean deleteTransferIndexes(plasmaWordIndexEntity[] indexEntities) throws IOException {
+ boolean deleteTransferIndexes(plasmaWordIndexEntity[] indexEntities) throws IOException {
String wordhash;
Enumeration urlEnum;
plasmaWordIndexEntry indexEntry;
@@ -381,4 +391,142 @@ public class plasmaWordIndexDistribution {
}
return success;
}
+
+
+ public void startTransferWholeIndex(yacySeed seed, boolean delete) {
+ if (transferIdxThread == null) {
+ this.transferIdxThread = new transferIndexThread(seed,delete);
+ this.transferIdxThread.start();
+ }
+ }
+
+ public void stopTransferWholeIndex() {
+ if ((transferIdxThread != null) && (!transferIdxThread.isFinished())) {
+ this.transferIdxThread.stopIt();
+ }
+ }
+
+ public void abortTransferWholeIndex() {
+ if (transferIdxThread != null) {
+ if (!transferIdxThread.isFinished()) this.transferIdxThread.stopIt();
+ transferIdxThread = null;
+ }
+ }
+
+
+ public class transferIndexThread extends Thread {
+
+ private yacySeed seed = null;
+ private boolean delete = false;
+ private boolean finished = false;
+ private int transferedIndexCount = 0;
+ private String status = "running";
+ private String startPointHash = "------------";
+
+ public transferIndexThread(yacySeed seed, boolean delete) {
+ this.seed = seed;
+ this.delete = delete;
+ }
+
+ public void run() {
+ performTransferWholeIndex();
+ }
+
+ public void stopIt() {
+ this.finished = true;
+ }
+
+ public boolean isFinished() {
+ return this.finished;
+ }
+
+ public int getTransferedIndexCount() {
+ return this.transferedIndexCount;
+ }
+
+ public yacySeed getSeed() {
+ return this.seed;
+ }
+
+ public String getStatus() {
+ return this.status;
+ }
+
+ public String getRange() {
+ return "[------------ .. " + startPointHash + "]";
+ }
+
+ public void performTransferWholeIndex() {
+ try {
+ plasmaWordIndexDistribution.this.paused = true;
+
+ // collect index
+ plasmaWordIndexDistribution.this.log.logFine("Selected hash " + startPointHash + " as start point for index distribution of whole index");
+
+
+ while (!finished && !Thread.currentThread().isInterrupted()) {
+ Object[] selectResult = selectTransferIndexes(startPointHash, 500);
+ plasmaWordIndexEntity[] indexEntities = (plasmaWordIndexEntity[]) selectResult[0];
+ if (finished || Thread.currentThread().isInterrupted()) {
+ this.status = "aborted";
+ return;
+ }
+
+ HashMap urlCache = (HashMap) selectResult[1]; // String (url-hash) / plasmaCrawlLURL.Entry
+ if ((indexEntities == null) || (indexEntities.length == 0)) {
+ plasmaWordIndexDistribution.this.log.logFine("No index available for index transfer, hash start-point " + startPointHash);
+ this.status = "finished.";
+ return;
+ }
+ // count the indexes again, can be smaller as expected
+ int idxCount = 0;
+ for (int i = 0; i < indexEntities.length; i++) {
+ idxCount += indexEntities[i].size();
+ }
+
+ // find start point for DHT-selection
+ startPointHash = indexEntities[indexEntities.length - 1].wordHash(); // DHT targets must have greater hashes
+
+ String error;
+ long start;
+
+ start = System.currentTimeMillis();
+ error = yacyClient.transferIndex(seed, indexEntities, urlCache);
+ if (error == null) {
+ plasmaWordIndexDistribution.this.log.logInfo("Index transfer of " + idxCount + " words [" + indexEntities[0].wordHash() + " .. " + indexEntities[indexEntities.length-1].wordHash() + "]" +
+ " to peer " + seed.getName() + ":" + seed.hash + " in " +
+ ((System.currentTimeMillis() - start) / 1000) + " seconds successfull (" +
+ (1000 * idxCount / (System.currentTimeMillis() - start + 1)) + " words/s)");
+ } else {
+ plasmaWordIndexDistribution.this.log.logWarning("Index transfer to peer " + seed.getName() + ":" + seed.hash + " failed:'" + error + "', disconnecting peer");
+ yacyCore.peerActions.peerDeparture(seed);
+ this.status = "Disconnected peer";
+ return;
+ }
+
+ if (delete) {
+ try {
+ if (deleteTransferIndexes(indexEntities)) {
+ plasmaWordIndexDistribution.this.log.logFine("Deleted all " + indexEntities.length + " transferred whole-word indexes locally");
+ transferedIndexCount += idxCount;
+ } else {
+ plasmaWordIndexDistribution.this.log.logSevere("Deleted not all transferred whole-word indexes");
+ }
+ } catch (IOException ee) {
+ plasmaWordIndexDistribution.this.log.logSevere("Deletion of indexes not possible:" + ee.getMessage(), ee);
+ }
+ } else {
+ // simply close the indexEntities
+ for (int i = 0; i < indexEntities.length; i++) try {
+ indexEntities[i].close();
+ } catch (IOException ee) {}
+ transferedIndexCount += idxCount;
+ }
+ }
+ this.status = "aborted";
+ } finally {
+ plasmaWordIndexDistribution.this.paused = false;
+ }
+ }
+ }
}
diff --git a/source/de/anomic/yacy/yacyClient.java b/source/de/anomic/yacy/yacyClient.java
index 467bbf158..9dc48e0c6 100644
--- a/source/de/anomic/yacy/yacyClient.java
+++ b/source/de/anomic/yacy/yacyClient.java
@@ -588,14 +588,16 @@ public class yacyClient {
post.put("youare", targetSeed.hash);
post.put("wordc", Integer.toString(indexes.length));
int indexcount = 0;
- String entrypost = "";
+ StringBuffer entrypost = new StringBuffer(indexes.length*73);
Enumeration eenum;
plasmaWordIndexEntry entry;
for (int i = 0; i < indexes.length; i++) {
eenum = indexes[i].elements(true);
while (eenum.hasMoreElements()) {
entry = (plasmaWordIndexEntry) eenum.nextElement();
- entrypost += indexes[i].wordHash() + entry.toExternalForm() + serverCore.crlfString;
+ entrypost.append(indexes[i].wordHash())
+ .append(entry.toExternalForm())
+ .append(serverCore.crlfString);
indexcount++;
}
}
@@ -609,7 +611,7 @@ public class yacyClient {
}
post.put("entryc", Integer.toString(indexcount));
- post.put("indexes", entrypost);
+ post.put("indexes", entrypost.toString());
try {
Vector v = httpc.wput(new URL("http://" + address + "/yacy/transferRWI.html"), 60000, null, null,
yacyCore.seedDB.sb.remoteProxyHost, yacyCore.seedDB.sb.remoteProxyPort, post);