CPU & IO reduce (Index Distribution)

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@3184 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
borg-0300 18 years ago
parent e150c20c53
commit 23e613b2ab

@ -1,322 +1,328 @@
// plasmaDHTChunk.java // plasmaDHTChunk.java
// ------------------------------ // ------------------------------
// part of YaCy // part of YaCy
// (C) by Michael Peter Christen; mc@anomic.de // (C) by Michael Peter Christen; mc@anomic.de
// first published on http://www.anomic.de // first published on http://www.anomic.de
// Frankfurt, Germany, 2006 // Frankfurt, Germany, 2006
// created: 18.02.2006 // created: 18.02.2006
// //
// This program is free software; you can redistribute it and/or modify // $LastChangedDate$
// it under the terms of the GNU General Public License as published by // $LastChangedRevision$
// the Free Software Foundation; either version 2 of the License, or // $LastChangedBy$
// (at your option) any later version. //
// // This program is free software; you can redistribute it and/or modify
// This program is distributed in the hope that it will be useful, // it under the terms of the GNU General Public License as published by
// but WITHOUT ANY WARRANTY; without even the implied warranty of // the Free Software Foundation; either version 2 of the License, or
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // (at your option) any later version.
// GNU General Public License for more details. //
// // This program is distributed in the hope that it will be useful,
// You should have received a copy of the GNU General Public License // but WITHOUT ANY WARRANTY; without even the implied warranty of
// along with this program; if not, write to the Free Software // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA // GNU General Public License for more details.
// //
// Using this software in any meaning (reading, learning, copying, compiling, // You should have received a copy of the GNU General Public License
// running) means that you agree that the Author(s) is (are) not responsible // along with this program; if not, write to the Free Software
// for cost, loss of data or any harm that may be caused directly or indirectly // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
// by usage of this softare or this documentation. The usage of this software //
// is on your own risk. The installation and usage (starting/running) of this // Using this software in any meaning (reading, learning, copying, compiling,
// software may allow other people or application to access your computer and // running) means that you agree that the Author(s) is (are) not responsible
// any attached devices and is highly dependent on the configuration of the // for cost, loss of data or any harm that may be caused directly or indirectly
// software which must be done by the user of the software; the author(s) is // by usage of this softare or this documentation. The usage of this software
// (are) also not responsible for proper configuration and usage of the // is on your own risk. The installation and usage (starting/running) of this
// software, even if provoked by documentation provided together with // software may allow other people or application to access your computer and
// the software. // any attached devices and is highly dependent on the configuration of the
// // software which must be done by the user of the software; the author(s) is
// Any changes to this file according to the GPL as documented in the file // (are) also not responsible for proper configuration and usage of the
// gpl.txt aside this file in the shipment you received can be done to the // software, even if provoked by documentation provided together with
// lines that follows this copyright notice here, but changes must not be // the software.
// done inside the copyright notive above. A re-distribution must contain //
// the intact and unchanged copyright notice. // Any changes to this file according to the GPL as documented in the file
// Contributions and changes to the program code must be marked as such. // gpl.txt aside this file in the shipment you received can be done to the
// lines that follows this copyright notice here, but changes must not be
package de.anomic.plasma; // done inside the copyright notive above. A re-distribution must contain
// the intact and unchanged copyright notice.
import java.util.ArrayList; // Contributions and changes to the program code must be marked as such.
import java.util.HashMap;
import java.util.HashSet; package de.anomic.plasma;
import java.util.Iterator;
import java.util.ArrayList;
import de.anomic.index.indexContainer; import java.util.HashMap;
import de.anomic.index.indexRWIEntry; import java.util.HashSet;
import de.anomic.index.indexRWIEntryNew; import java.util.Iterator;
import de.anomic.index.indexURLEntry;
import de.anomic.kelondro.kelondroBase64Order; import de.anomic.index.indexContainer;
import de.anomic.kelondro.kelondroException; import de.anomic.index.indexRWIEntry;
import de.anomic.server.serverCodings; import de.anomic.index.indexRWIEntryNew;
import de.anomic.server.logging.serverLog; import de.anomic.index.indexURLEntry;
import de.anomic.yacy.yacyCore; import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.yacy.yacyDHTAction; import de.anomic.kelondro.kelondroException;
import de.anomic.yacy.yacySeedDB; import de.anomic.server.serverCodings;
import de.anomic.server.logging.serverLog;
public class plasmaDHTChunk { import de.anomic.yacy.yacyCore;
import de.anomic.yacy.yacyDHTAction;
public static final int chunkStatus_UNDEFINED = -1; import de.anomic.yacy.yacySeedDB;
public static final int chunkStatus_FAILED = 0;
public static final int chunkStatus_FILLED = 1; public class plasmaDHTChunk {
public static final int chunkStatus_RUNNING = 2;
public static final int chunkStatus_INTERRUPTED = 3; public static final int chunkStatus_UNDEFINED = -1;
public static final int chunkStatus_COMPLETE = 4; public static final int chunkStatus_FAILED = 0;
public static final int chunkStatus_FILLED = 1;
public static final int peerRedundancy = 3; public static final int chunkStatus_RUNNING = 2;
public static final int chunkStatus_INTERRUPTED = 3;
private plasmaWordIndex wordIndex; public static final int chunkStatus_COMPLETE = 4;
private serverLog log;
public static final int peerRedundancy = 3;
private int status = chunkStatus_UNDEFINED;
private String startPointHash; private plasmaWordIndex wordIndex;
private indexContainer[] indexContainers = null; private serverLog log;
private HashMap urlCache; // String (url-hash) / plasmaCrawlLURL.Entry
private int idxCount; private int status = chunkStatus_UNDEFINED;
private String startPointHash;
private long selectionStartTime = 0; private indexContainer[] indexContainers = null;
private long selectionEndTime = 0; private HashMap urlCache; // String (url-hash) / plasmaCrawlLURL.Entry
private int idxCount;
private int transferFailedCounter = 0;
private long selectionStartTime = 0;
public indexContainer firstContainer() { private long selectionEndTime = 0;
return indexContainers[0];
} private int transferFailedCounter = 0;
public indexContainer lastContainer() { public indexContainer firstContainer() {
return indexContainers[indexContainers.length - 1]; return indexContainers[0];
} }
public indexContainer[] containers() { public indexContainer lastContainer() {
return indexContainers; return indexContainers[indexContainers.length - 1];
} }
public int containerSize() { public indexContainer[] containers() {
return indexContainers.length; return indexContainers;
} }
public int indexCount() { public int containerSize() {
return this.idxCount; return indexContainers.length;
} }
private int indexCounter() { public int indexCount() {
int c = 0; return this.idxCount;
for (int i = 0; i < indexContainers.length; i++) { }
c += indexContainers[i].size();
} private int indexCounter() {
return c; int c = 0;
} for (int i = 0; i < indexContainers.length; i++) {
c += indexContainers[i].size();
public HashMap urlCacheMap() { }
return urlCache; return c;
} }
public void setStatus(int newStatus) { public HashMap urlCacheMap() {
this.status = newStatus; return urlCache;
} }
public int getStatus() { public void setStatus(int newStatus) {
return this.status; this.status = newStatus;
} }
public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, int minCount, int maxCount, int maxtime) { public int getStatus() {
try { return this.status;
this.log = log; }
this.wordIndex = wordIndex;
this.startPointHash = selectTransferStart(); public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, int minCount, int maxCount, int maxtime) {
log.logFine("Selected hash " + this.startPointHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, this.startPointHash)); try {
selectTransferContainers(this.startPointHash, minCount, maxCount, maxtime); this.log = log;
this.wordIndex = wordIndex;
// count the indexes, can be smaller as expected this.startPointHash = selectTransferStart();
this.idxCount = indexCounter(); log.logFine("Selected hash " + this.startPointHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, this.startPointHash));
if (this.idxCount < minCount) { selectTransferContainers(this.startPointHash, minCount, maxCount, maxtime);
log.logFine("Too few (" + this.idxCount + ") indexes selected for transfer.");
this.status = chunkStatus_FAILED; // count the indexes, can be smaller as expected
} this.idxCount = indexCounter();
} catch (InterruptedException e) { if (this.idxCount < minCount) {
this.status = chunkStatus_INTERRUPTED; log.logFine("Too few (" + this.idxCount + ") indexes selected for transfer.");
} this.status = chunkStatus_FAILED;
} }
} catch (InterruptedException e) {
public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, int minCount, int maxCount, int maxtime, String startHash) { this.status = chunkStatus_INTERRUPTED;
try { }
this.log = log; }
this.wordIndex = wordIndex;
log.logFine("Demanded hash " + startHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, this.startPointHash)); public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, int minCount, int maxCount, int maxtime, String startHash) {
selectTransferContainers(startHash, minCount, maxCount, maxtime); try {
this.log = log;
// count the indexes, can be smaller as expected this.wordIndex = wordIndex;
this.idxCount = indexCounter(); log.logFine("Demanded hash " + startHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, this.startPointHash));
if (this.idxCount < minCount) { selectTransferContainers(startHash, minCount, maxCount, maxtime);
log.logFine("Too few (" + this.idxCount + ") indexes selected for transfer.");
this.status = chunkStatus_FAILED; // count the indexes, can be smaller as expected
} this.idxCount = indexCounter();
} catch (InterruptedException e) { if (this.idxCount < minCount) {
this.status = chunkStatus_INTERRUPTED; log.logFine("Too few (" + this.idxCount + ") indexes selected for transfer.");
} this.status = chunkStatus_FAILED;
} }
} catch (InterruptedException e) {
private String selectTransferStart() { this.status = chunkStatus_INTERRUPTED;
String startPointHash; }
// first try to select with increasing probality a good start point }
double minimumDistance = ((double) peerRedundancy) / ((double) yacyCore.seedDB.sizeConnected());
if (Math.round(Math.random() * 6) != 4) private String selectTransferStart() {
for (int i = 9; i > 0; i--) { String startPointHash;
startPointHash = kelondroBase64Order.enhancedCoder.encode(serverCodings.encodeMD5Raw(Long.toString(i + System.currentTimeMillis()))).substring(2, 2 + yacySeedDB.commonHashLength); // first try to select with increasing probality a good start point
if (yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash) > (minimumDistance + ((double) i / (double) 10))) double minimumDistance = ((double) peerRedundancy) / ((double) yacyCore.seedDB.sizeConnected());
return startPointHash; if (Math.round(Math.random() * 6) != 4)
} for (int i = 9; i > 0; i--) {
// if that fails, take simply the best start point (this is usually avoided, since that leads to always the same target peers) startPointHash = kelondroBase64Order.enhancedCoder.encode(serverCodings.encodeMD5Raw(Long.toString(i + System.currentTimeMillis()))).substring(2, 2 + yacySeedDB.commonHashLength);
startPointHash = yacyCore.seedDB.mySeed.hash.substring(0, 11) + "z"; if (yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, startPointHash) > (minimumDistance + ((double) i / (double) 10)))
return startPointHash; return startPointHash;
} }
// if that fails, take simply the best start point (this is usually avoided, since that leads to always the same target peers)
private void selectTransferContainers(String hash, int mincount, int maxcount, int maxtime) throws InterruptedException { startPointHash = yacyCore.seedDB.mySeed.hash.substring(0, 11) + "z";
try { return startPointHash;
this.selectionStartTime = System.currentTimeMillis(); }
int refcountRAM = selectTransferContainersResource(hash, true, maxcount, maxtime);
if (refcountRAM >= mincount) { private void selectTransferContainers(String hash, int mincount, int maxcount, int maxtime) throws InterruptedException {
log.logFine("DHT selection from RAM: " + refcountRAM + " entries"); try {
return; this.selectionStartTime = System.currentTimeMillis();
} int refcountRAM = selectTransferContainersResource(hash, true, maxcount, maxtime);
int refcountFile = selectTransferContainersResource(hash, false, maxcount, maxtime); if (refcountRAM >= mincount) {
log.logFine("DHT selection from FILE: " + refcountFile + " entries, RAM provided only " + refcountRAM + " entries"); log.logFine("DHT selection from RAM: " + refcountRAM + " entries");
return; return;
} finally { }
this.selectionEndTime = System.currentTimeMillis(); int refcountFile = selectTransferContainersResource(hash, false, maxcount, maxtime);
} log.logFine("DHT selection from FILE: " + refcountFile + " entries, RAM provided only " + refcountRAM + " entries");
} return;
} finally {
private int selectTransferContainersResource(String hash, boolean ram, int maxcount, int maxtime) throws InterruptedException { this.selectionEndTime = System.currentTimeMillis();
// the hash is a start hash from where the indexes are picked }
ArrayList tmpContainers = new ArrayList(maxcount); }
try {
Iterator indexContainerIterator = wordIndex.indexContainerSet(hash, ram, true, maxcount).iterator(); private int selectTransferContainersResource(String hash, boolean ram, int maxcount, int maxtime) throws InterruptedException {
indexContainer container; if (maxcount > 500) { maxcount = 500; } // flooding & OOM reduce
Iterator urlIter; // the hash is a start hash from where the indexes are picked
indexRWIEntryNew iEntry; final ArrayList tmpContainers = new ArrayList(maxcount);
indexURLEntry lurl; try {
int refcount = 0; final Iterator indexContainerIterator = wordIndex.indexContainerSet(hash, ram, true, maxcount).iterator();
int wholesize; indexContainer container;
Iterator urlIter;
urlCache = new HashMap(); indexRWIEntryNew iEntry;
double maximumDistance = ((double) peerRedundancy * 2) / ((double) yacyCore.seedDB.sizeConnected()); indexURLEntry lurl;
long timeout = (maxtime < 0) ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; int refcount = 0;
while ( int wholesize;
(maxcount > refcount) &&
(indexContainerIterator.hasNext()) && urlCache = new HashMap();
((container = (indexContainer) indexContainerIterator.next()) != null) && final double maximumDistance = ((double) peerRedundancy * 2) / ((double) yacyCore.seedDB.sizeConnected());
(container.size() > 0) && final long timeout = (maxtime < 0) ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime;
((tmpContainers.size() == 0) || while (
(yacyDHTAction.dhtDistance(container.getWordHash(), ((indexContainer) tmpContainers.get(0)).getWordHash()) < maximumDistance)) && (maxcount > refcount) &&
(System.currentTimeMillis() < timeout) (indexContainerIterator.hasNext()) &&
) { ((container = (indexContainer) indexContainerIterator.next()) != null) &&
// check for interruption (container.size() > 0) &&
if (Thread.currentThread().isInterrupted()) throw new InterruptedException("Shutdown in progress"); ((tmpContainers.size() == 0) ||
(yacyDHTAction.dhtDistance(container.getWordHash(), ((indexContainer) tmpContainers.get(0)).getWordHash()) < maximumDistance)) &&
// make an on-the-fly entity and insert values (System.currentTimeMillis() < timeout)
int notBoundCounter = 0; ) {
try { // check for interruption
wholesize = container.size(); if (Thread.currentThread().isInterrupted()) throw new InterruptedException("Shutdown in progress");
urlIter = container.entries();
// iterate over indexes to fetch url entries and store them in the urlCache // make an on-the-fly entity and insert values
while ((urlIter.hasNext()) && (maxcount > refcount) && (System.currentTimeMillis() < timeout)) { int notBoundCounter = 0;
iEntry = (indexRWIEntryNew) urlIter.next(); try {
if ((iEntry == null) || (iEntry.urlHash() == null)) { wholesize = container.size();
urlIter.remove(); urlIter = container.entries();
continue; // iterate over indexes to fetch url entries and store them in the urlCache
} while ((urlIter.hasNext()) && (maxcount > refcount) && (System.currentTimeMillis() < timeout)) {
lurl = wordIndex.loadedURL.load(iEntry.urlHash(), iEntry); // CPU & IO reduce
if ((lurl == null) || (lurl.comp().url() == null)) { try { Thread.sleep(50); } catch (InterruptedException e) { }
//yacyCore.log.logFine("DEBUG selectTransferContainersResource: not-bound url hash '" + iEntry.urlHash() + "' for word hash " + container.getWordHash());
notBoundCounter++; iEntry = (indexRWIEntryNew) urlIter.next();
urlIter.remove(); if ((iEntry == null) || (iEntry.urlHash() == null)) {
wordIndex.removeEntry(container.getWordHash(), iEntry.urlHash()); urlIter.remove();
} else { continue;
urlCache.put(iEntry.urlHash(), lurl); }
//yacyCore.log.logFine("DEBUG selectTransferContainersResource: added url hash '" + iEntry.urlHash() + "' to urlCache for word hash " + container.getWordHash()); lurl = wordIndex.loadedURL.load(iEntry.urlHash(), iEntry);
refcount++; if ((lurl == null) || (lurl.comp().url() == null)) {
} //yacyCore.log.logFine("DEBUG selectTransferContainersResource: not-bound url hash '" + iEntry.urlHash() + "' for word hash " + container.getWordHash());
} notBoundCounter++;
urlIter.remove();
// remove all remaining; we have enough wordIndex.removeEntry(container.getWordHash(), iEntry.urlHash());
while (urlIter.hasNext()) { } else {
iEntry = (indexRWIEntryNew) urlIter.next(); urlCache.put(iEntry.urlHash(), lurl);
urlIter.remove(); //yacyCore.log.logFine("DEBUG selectTransferContainersResource: added url hash '" + iEntry.urlHash() + "' to urlCache for word hash " + container.getWordHash());
} refcount++;
}
// use whats left }
log.logFine("Selected partial index (" + container.size() + " from " + wholesize + " URLs, " + notBoundCounter + " not bound) for word " + container.getWordHash());
tmpContainers.add(container); // remove all remaining; we have enough
} catch (kelondroException e) { while (urlIter.hasNext()) {
log.logSevere("plasmaWordIndexDistribution/2: deleted DB for word " + container.getWordHash(), e); iEntry = (indexRWIEntryNew) urlIter.next();
wordIndex.deleteContainer(container.getWordHash()); urlIter.remove();
} }
}
// create result // use whats left
indexContainers = (indexContainer[]) tmpContainers.toArray(new indexContainer[tmpContainers.size()]); log.logFine("Selected partial index (" + container.size() + " from " + wholesize + " URLs, " + notBoundCounter + " not bound) for word " + container.getWordHash());
//[C[16GwGuFzwffp] has 1 entries, C[16hGKMAl0w97] has 9 entries, C[17A8cDPF6SfG] has 9 entries, C[17Kdj__WWnUy] has 1 entries, C[1 tmpContainers.add(container);
if ((indexContainers == null) || (indexContainers.length == 0)) { } catch (kelondroException e) {
log.logFine("No index available for index transfer, hash start-point " + startPointHash); log.logSevere("plasmaWordIndexDistribution/2: deleted DB for word " + container.getWordHash(), e);
this.status = chunkStatus_FAILED; wordIndex.deleteContainer(container.getWordHash());
return 0; }
} }
// create result
this.status = chunkStatus_FILLED; indexContainers = (indexContainer[]) tmpContainers.toArray(new indexContainer[tmpContainers.size()]);
//[C[16GwGuFzwffp] has 1 entries, C[16hGKMAl0w97] has 9 entries, C[17A8cDPF6SfG] has 9 entries, C[17Kdj__WWnUy] has 1 entries, C[1
return refcount; if ((indexContainers == null) || (indexContainers.length == 0)) {
} catch (kelondroException e) { log.logFine("No index available for index transfer, hash start-point " + startPointHash);
log.logSevere("selectTransferIndexes database corrupted: " + e.getMessage(), e); this.status = chunkStatus_FAILED;
indexContainers = new indexContainer[0]; return 0;
urlCache = new HashMap(); }
this.status = chunkStatus_FAILED;
return 0; this.status = chunkStatus_FILLED;
} return refcount;
} } catch (kelondroException e) {
log.logSevere("selectTransferIndexes database corrupted: " + e.getMessage(), e);
indexContainers = new indexContainer[0];
public synchronized String deleteTransferIndexes() { urlCache = new HashMap();
Iterator urlIter; this.status = chunkStatus_FAILED;
indexRWIEntry iEntry; return 0;
HashSet urlHashes; }
String count = "0"; }
for (int i = 0; i < this.indexContainers.length; i++) { public synchronized String deleteTransferIndexes() {
// delete entries separately Iterator urlIter;
if (this.indexContainers[i] == null) { indexRWIEntry iEntry;
log.logFine("Deletion of partial index #" + i + " not possible, entry is null"); HashSet urlHashes;
continue; String count = "0";
}
int c = this.indexContainers[i].size(); for (int i = 0; i < this.indexContainers.length; i++) {
urlHashes = new HashSet(this.indexContainers[i].size()); // delete entries separately
urlIter = this.indexContainers[i].entries(); if (this.indexContainers[i] == null) {
while (urlIter.hasNext()) { log.logFine("Deletion of partial index #" + i + " not possible, entry is null");
iEntry = (indexRWIEntry) urlIter.next(); continue;
urlHashes.add(iEntry.urlHash()); }
} int c = this.indexContainers[i].size();
String wordHash = indexContainers[i].getWordHash(); urlHashes = new HashSet(this.indexContainers[i].size());
count = wordIndex.removeEntriesExpl(this.indexContainers[i].getWordHash(), urlHashes); urlIter = this.indexContainers[i].entries();
if (log.isFine()) while (urlIter.hasNext()) {
log.logFine("Deleted partial index (" + c + " URLs) for word " + wordHash + "; " + this.wordIndex.indexSize(wordHash) + " entries left"); iEntry = (indexRWIEntry) urlIter.next();
this.indexContainers[i] = null; urlHashes.add(iEntry.urlHash());
} }
return count; String wordHash = indexContainers[i].getWordHash();
} count = wordIndex.removeEntriesExpl(this.indexContainers[i].getWordHash(), urlHashes);
if (log.isFine())
public long getSelectionTime() { log.logFine("Deleted partial index (" + c + " URLs) for word " + wordHash + "; " + this.wordIndex.indexSize(wordHash) + " entries left");
if (this.selectionStartTime == 0 || this.selectionEndTime == 0) return -1; this.indexContainers[i] = null;
return this.selectionEndTime-this.selectionStartTime; }
} return count;
}
public void incTransferFailedCounter() {
this.transferFailedCounter++; public long getSelectionTime() {
} if (this.selectionStartTime == 0 || this.selectionEndTime == 0) return -1;
return this.selectionEndTime-this.selectionStartTime;
public int getTransferFailedCounter() { }
return transferFailedCounter;
} public void incTransferFailedCounter() {
} this.transferFailedCounter++;
}
public int getTransferFailedCounter() {
return transferFailedCounter;
}
}

Loading…
Cancel
Save