From 43614f1b368f56ba7a3c8e181546b31fdfb1d15d Mon Sep 17 00:00:00 2001 From: orbiter Date: Thu, 5 Oct 2006 23:47:08 +0000 Subject: [PATCH] bugfix in collection index. the index for collections was not created correctly The bugfix includes a migration function which starts automatically after startup of yacy. This applies only to you, if you are using the new collection index. git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@2711 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- .../htmlFilter/htmlFilterInputStream.java | 26 ++ .../kelondro/kelondroCollectionIndex.java | 274 ++++++++++++------ .../kelondro/kelondroFixedWidthArray.java | 13 +- .../kelondro/kelondroRowCollection.java | 2 +- source/de/anomic/plasma/plasmaDHTChunk.java | 27 +- source/de/anomic/plasma/plasmaDHTFlush.java | 2 +- .../de/anomic/plasma/plasmaSwitchboard.java | 2 +- 7 files changed, 237 insertions(+), 109 deletions(-) diff --git a/source/de/anomic/htmlFilter/htmlFilterInputStream.java b/source/de/anomic/htmlFilter/htmlFilterInputStream.java index 501754b00..8e9885542 100644 --- a/source/de/anomic/htmlFilter/htmlFilterInputStream.java +++ b/source/de/anomic/htmlFilter/htmlFilterInputStream.java @@ -1,3 +1,29 @@ +// htmlFilterInputStream.java +// (C) 2005, 2006 by Michael Peter Christen; mc@anomic.de, Frankfurt a. M., Germany +// first published 2005 on http://www.anomic.de +// +// This is a part of YaCy, a peer-to-peer based web search engine +// +// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ +// $LastChangedRevision: 1986 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + package de.anomic.htmlFilter; import java.io.BufferedInputStream; diff --git a/source/de/anomic/kelondro/kelondroCollectionIndex.java b/source/de/anomic/kelondro/kelondroCollectionIndex.java index 4acc4d07e..cb1528010 100644 --- a/source/de/anomic/kelondro/kelondroCollectionIndex.java +++ b/source/de/anomic/kelondro/kelondroCollectionIndex.java @@ -33,10 +33,12 @@ import java.util.Map; import java.util.Set; import de.anomic.server.serverFileUtils; +import de.anomic.server.logging.serverLog; public class kelondroCollectionIndex { protected kelondroIndex index; + int keylength; private File path; private String filenameStub; private int loadfactor; @@ -47,15 +49,19 @@ public class kelondroCollectionIndex { private static final int idx_col_key = 0; // the index private static final int idx_col_chunksize = 1; // chunksize (number of bytes in a single chunk, needed for migration option) private static final int idx_col_chunkcount = 2; // chunkcount (number of chunks in this collection) needed to identify array file that has the chunks - private static final int idx_col_indexpos = 3; // indexpos (position in index file) - private static final int idx_col_lastread = 4; // a time stamp, update time in days since 1.1.2000 - private static final int idx_col_lastwrote = 5; // a time stamp, update time in days since 1.1.2000 + private static final int idx_col_clusteridx = 3; // selector for right cluster file, must be >= arrayIndex(chunkcount) + private static final int idx_col_flags = 4; // flags (for future use) + private static final int idx_col_indexpos = 5; // indexpos (position in index file) + private static final int idx_col_lastread = 6; // a time stamp, update time in days since 1.1.2000 + private static final int idx_col_lastwrote = 7; // a time stamp, update time in days since 1.1.2000 - private static kelondroRow indexRow(int keylen) { + private kelondroRow indexRow() { return new kelondroRow( - "byte[] key-" + keylen + "," + + "byte[] key-" + keylength + "," + "int chunksize-4 {b256}," + "int chunkcount-4 {b256}," + + "byte clusteridx-1 {b256}," + + "byte flags-1 {b256}," + "int indexpos-4 {b256}," + "short lastread-2 {b256}, " + "short lastwrote-2 {b256}" @@ -74,11 +80,11 @@ public class kelondroCollectionIndex { String sn = fillZ(Integer.toHexString(serialNumber).toUpperCase(), 2); return new File(path, filenameStub + "." + lf + "." + cs + "." + pn + "." + sn + ".kca"); // kelondro collection array } - + private static File propertyFile(File path, String filenameStub, int loadfactor, int chunksize) { String lf = fillZ(Integer.toHexString(loadfactor).toUpperCase(), 2); String cs = fillZ(Integer.toHexString(chunksize).toUpperCase(), 4); - return new File(path, filenameStub + "." + lf + "." + cs + ".properties"); // kelondro collection array + return new File(path, filenameStub + "." + lf + "." + cs + ".properties"); } public kelondroCollectionIndex(File path, String filenameStub, int keyLength, kelondroOrder indexOrder, @@ -87,11 +93,87 @@ public class kelondroCollectionIndex { // the buffersize is number of bytes that are only used if the kelondroFlexTable is backed up with a kelondroTree this.path = path; this.filenameStub = filenameStub; + this.keylength = keyLength; this.playloadrow = rowdef; this.loadfactor = loadfactor; - // create index table - index = new kelondroFlexTable(path, filenameStub + ".index.table", buffersize, preloadTime, indexRow(keyLength), indexOrder); + boolean ramIndexGeneration = false; + boolean fileIndexGeneration = !(new File(path, filenameStub + ".index").exists()); + if (ramIndexGeneration) index = new kelondroRAMIndex(indexOrder, indexRow()); + if (fileIndexGeneration) index = new kelondroFlexTable(path, filenameStub + ".index", buffersize, preloadTime, indexRow(), indexOrder); + + // open array files + this.arrays = new HashMap(); // all entries will be dynamically created with getArray() + if (((fileIndexGeneration) || (ramIndexGeneration))) { + serverLog.logFine("STARTUP", "STARTED MIGRATION OF OLD COLLECION INDEX TO NEW COLLECTION INDEX. THIS WILL TAKE SOME TIME"); + openAllArrayFiles(((fileIndexGeneration) || (ramIndexGeneration)), indexOrder); + } + + // open/create index table + if (index == null) index = openIndexFile(path, filenameStub, indexOrder, buffersize, preloadTime, loadfactor, rowdef); + } + + private void openAllArrayFiles(boolean indexGeneration, kelondroOrder indexOrder) throws IOException { + String[] list = this.path.list(); + kelondroFixedWidthArray array; + + kelondroRow irow = indexRow(); + int t = kelondroRowCollection.daysSince2000(System.currentTimeMillis()); + for (int i = 0; i < list.length; i++) if (list[i].endsWith(".kca")) { + + // open array + int pos = list[i].indexOf('.'); + if (pos < 0) continue; + int chunksize = Integer.parseInt(list[i].substring(pos + 4, pos + 8), 16); + int partitionNumber = Integer.parseInt(list[i].substring(pos + 9, pos + 11), 16); + int serialNumber = Integer.parseInt(list[i].substring(pos + 12, pos + 14), 16); + try { + array = openArrayFile(partitionNumber, serialNumber, true); + } catch (IOException e) { + e.printStackTrace(); + continue; + } + + // remember that we opened the array + arrays.put(partitionNumber + "-" + chunksize, array); + + if ((index != null) && (indexGeneration)) { + // loop over all elements in array and create index entry for each row + kelondroRow.Entry aentry, ientry; + byte[] key; + long start = System.currentTimeMillis(); + long lastlog = start; + for (int j = 0; j < array.size(); j++) { + aentry = array.get(j); + key = aentry.getColBytes(0); + if (key == null) continue; // skip deleted entries + kelondroRowSet indexrows = new kelondroRowSet(this.playloadrow, aentry.getColBytes(1)); + ientry = irow.newEntry(); + ientry.setCol(idx_col_key, key); + ientry.setCol(idx_col_chunksize, chunksize); + ientry.setCol(idx_col_chunkcount, indexrows.size()); + ientry.setCol(idx_col_clusteridx, (byte) partitionNumber); + ientry.setCol(idx_col_flags, (byte) 0); + ientry.setCol(idx_col_indexpos, j); + ientry.setCol(idx_col_lastread, t); + ientry.setCol(idx_col_lastwrote, t); + index.put(ientry); + + // write a log + if (System.currentTimeMillis() - lastlog > 30000) { + serverLog.logFine("STARTUP", "created " + j + " RWI index entries. " + (((System.currentTimeMillis() - start) * (array.size() - j) / j) / 60000) + " minutes remaining for this array"); + lastlog = System.currentTimeMillis(); + } + } + } + } + } + + private kelondroIndex openIndexFile(File path, String filenameStub, kelondroOrder indexOrder, + long buffersize, long preloadTime, + int loadfactor, kelondroRow rowdef) throws IOException { + // open/create index table + kelondroFlexTable theindex = new kelondroFlexTable(path, filenameStub + ".index", buffersize, preloadTime, indexRow(), indexOrder); // save/check property file for this array File propfile = propertyFile(path, filenameStub, loadfactor, rowdef.objectsize()); @@ -108,19 +190,20 @@ public class kelondroCollectionIndex { props.put("rowdef", rowdef.toString()); serverFileUtils.saveMap(propfile, props, "CollectionIndex properties"); - // open array files - this.arrays = new HashMap(); // all entries will be dynamically created with getArray() + return theindex; } private kelondroFixedWidthArray openArrayFile(int partitionNumber, int serialNumber, boolean create) throws IOException { File f = arrayFile(path, filenameStub, loadfactor, playloadrow.objectsize(), partitionNumber, serialNumber); int load = arrayCapacity(partitionNumber); kelondroRow rowdef = new kelondroRow( - "byte[] key-" + index.row().width(0) + "," + + "byte[] key-" + keylength + "," + "byte[] collection-" + (kelondroRowCollection.exportOverheadSize + load * this.playloadrow.objectsize()) ); if ((!(f.exists())) && (!create)) return null; - return new kelondroFixedWidthArray(f, rowdef, 0); + kelondroFixedWidthArray a = new kelondroFixedWidthArray(f, rowdef, 0); + serverLog.logFine("STARTUP", "opened array file " + f + " with " + a.size() + " RWIs"); + return a; } private kelondroFixedWidthArray getArray(int partitionNumber, int serialNumber, int chunksize) { @@ -191,82 +274,86 @@ public class kelondroCollectionIndex { } return 0; } - // overwrite the old collection - // read old information - int oldchunksize = (int) oldindexrow.getColLong(idx_col_chunksize); // needed only for migration - int oldchunkcount = (int) oldindexrow.getColLong(idx_col_chunkcount); - int oldrownumber = (int) oldindexrow.getColLong(idx_col_indexpos); - int oldPartitionNumber = arrayIndex(oldchunkcount); - int oldSerialNumber = 0; - - if (merge) { - // load the old collection and join it with the old - kelondroRowSet oldcollection = getdelete(oldindexrow, false, false); - - // join with new collection - oldcollection.addAll(collection); - collection = oldcollection; - } - int removed = 0; - if (removekeys != null) { - // load the old collection and remove keys - kelondroRowSet oldcollection = getdelete(oldindexrow, false, false); - - // remove the keys from the set - Iterator i = removekeys.iterator(); - Object k; - while (i.hasNext()) { - k = i.next(); - if (k instanceof byte[]) {if (oldcollection.remove((byte[]) k) != null) removed++;} - if (k instanceof String) {if (oldcollection.remove(((String) k).getBytes()) != null) removed++;} - } - oldcollection.shape(); - collection = oldcollection; - } - - if (collection.size() == 0) { - if (deletecomplete) { - kelondroFixedWidthArray array = getArray(oldPartitionNumber, oldSerialNumber, oldchunksize); - array.remove(oldrownumber); - } - return removed; + // overwrite the old collection + // read old information + int oldchunksize = (int) oldindexrow.getColLong(idx_col_chunksize); // needed only for migration + int oldchunkcount = (int) oldindexrow.getColLong(idx_col_chunkcount); + int oldrownumber = (int) oldindexrow.getColLong(idx_col_indexpos); + int oldPartitionNumber = (int) oldindexrow.getColByte(idx_col_clusteridx); + assert (oldPartitionNumber >= arrayIndex(oldchunkcount)); + int oldSerialNumber = 0; + + if (merge) { + // load the old collection and join it + kelondroRowSet oldcollection = getdelete(oldindexrow, false, false); + + // join with new collection + oldcollection.addAll(collection); + collection = oldcollection; + } + + int removed = 0; + if (removekeys != null) { + // load the old collection and remove keys + kelondroRowSet oldcollection = getdelete(oldindexrow, false, false); + + // remove the keys from the set + Iterator i = removekeys.iterator(); + Object k; + while (i.hasNext()) { + k = i.next(); + if ((k instanceof byte[]) && (oldcollection.remove((byte[]) k) != null)) removed++; + if ((k instanceof String) && (oldcollection.remove(((String) k).getBytes()) != null)) removed++; } - - int newPartitionNumber = arrayIndex(collection.size()); - int newSerialNumber = 0; - - // see if we need new space or if we can overwrite the old space - if (oldPartitionNumber == newPartitionNumber) { - // we don't need a new slot, just write into the old one - - // find array file - kelondroFixedWidthArray array = getArray(newPartitionNumber, newSerialNumber, this.playloadrow.objectsize()); - - // define row - kelondroRow.Entry arrayEntry = array.row().newEntry(); - arrayEntry.setCol(0, key); - arrayEntry.setCol(1, collection.exportCollection()); - - // overwrite entry in this array - array.set(oldrownumber, arrayEntry); - - // update the index entry - oldindexrow.setCol(idx_col_chunkcount, collection.size()); - oldindexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); - index.put(oldindexrow); - } else { - // we need a new slot, that means we must first delete the old entry - // find array file + oldcollection.shape(); + collection = oldcollection; + } + + if (collection.size() == 0) { + if (deletecomplete) { kelondroFixedWidthArray array = getArray(oldPartitionNumber, oldSerialNumber, oldchunksize); - - // delete old entry array.remove(oldrownumber); - - // write a new entry in the other array - overwrite(key, collection); } return removed; + } + + int newPartitionNumber = arrayIndex(collection.size()); + int newSerialNumber = 0; + + // see if we need new space or if we can overwrite the old space + if (oldPartitionNumber == newPartitionNumber) { + // we don't need a new slot, just write into the old one + + // find array file + kelondroFixedWidthArray array = getArray(newPartitionNumber, newSerialNumber, this.playloadrow.objectsize()); + + // define row + kelondroRow.Entry arrayEntry = array.row().newEntry(); + arrayEntry.setCol(0, key); + arrayEntry.setCol(1, collection.exportCollection()); + + // overwrite entry in this array + array.set(oldrownumber, arrayEntry); + + // update the index entry + oldindexrow.setCol(idx_col_chunkcount, collection.size()); + oldindexrow.setCol(idx_col_clusteridx, (byte) newPartitionNumber); + oldindexrow.setCol(idx_col_flags, (byte) 0); + oldindexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); + index.put(oldindexrow); + } else { + // we need a new slot, that means we must first delete the old entry + // find array file + kelondroFixedWidthArray array = getArray(oldPartitionNumber, oldSerialNumber, oldchunksize); + + // delete old entry + array.remove(oldrownumber); + + // write a new entry in the other array + overwrite(key, collection); + } + return removed; } } @@ -275,7 +362,8 @@ public class kelondroCollectionIndex { // simply store a collection without check if the collection existed before // find array file - kelondroFixedWidthArray array = getArray(arrayIndex(collection.size()), 0, this.playloadrow.objectsize()); + int clusteridx = arrayIndex(collection.size()); + kelondroFixedWidthArray array = getArray(clusteridx, 0, this.playloadrow.objectsize()); // define row kelondroRow.Entry arrayEntry = array.row().newEntry(); @@ -290,6 +378,8 @@ public class kelondroCollectionIndex { indexEntry.setCol(idx_col_key, key); indexEntry.setCol(idx_col_chunksize, this.playloadrow.objectsize()); indexEntry.setCol(idx_col_chunkcount, collection.size()); + indexEntry.setCol(idx_col_clusteridx, (byte) clusteridx); + indexEntry.setCol(idx_col_flags, (byte) 0); indexEntry.setCol(idx_col_indexpos, (long) newRowNumber); indexEntry.setCol(idx_col_lastread, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); indexEntry.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); @@ -326,10 +416,11 @@ public class kelondroCollectionIndex { // call this only within a synchronized(index) environment // read values - int chunksize = (int) indexrow.getColLong(idx_col_chunksize); - int chunkcount = (int) indexrow.getColLong(idx_col_chunkcount); - int rownumber = (int) indexrow.getColLong(idx_col_indexpos); - int partitionnumber = arrayIndex(chunkcount); + int chunksize = (int) indexrow.getColLong(idx_col_chunksize); + int chunkcount = (int) indexrow.getColLong(idx_col_chunkcount); + int rownumber = (int) indexrow.getColLong(idx_col_indexpos); + int partitionnumber = (int) indexrow.getColByte(idx_col_clusteridx); + assert(partitionnumber >= arrayIndex(chunkcount)); int serialnumber = 0; // open array entry @@ -347,6 +438,8 @@ public class kelondroCollectionIndex { indexEntry.setCol(idx_col_key, arrayrow.getColBytes(0)); indexEntry.setCol(idx_col_chunksize, this.playloadrow.objectsize()); indexEntry.setCol(idx_col_chunkcount, collection.size()); + indexEntry.setCol(idx_col_clusteridx, (byte) partitionnumber); + indexEntry.setCol(idx_col_flags, (byte) 0); indexEntry.setCol(idx_col_indexpos, (long) rownumber); indexEntry.setCol(idx_col_lastread, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); indexEntry.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis())); @@ -455,10 +548,9 @@ public class kelondroCollectionIndex { collectionIndex.merge(("key-" + i).getBytes(), collection); } - collectionIndex.close(); - // printout of index - kelondroFlexTable index = new kelondroFlexTable(path, filenameStub + ".index", buffersize, preloadTime, indexRow(9), kelondroNaturalOrder.naturalOrder); + kelondroFlexTable index = new kelondroFlexTable(path, filenameStub + ".index", buffersize, preloadTime, collectionIndex.indexRow(), kelondroNaturalOrder.naturalOrder); + collectionIndex.close(); index.print(); index.close(); } catch (IOException e) { diff --git a/source/de/anomic/kelondro/kelondroFixedWidthArray.java b/source/de/anomic/kelondro/kelondroFixedWidthArray.java index 48ecbece5..bebbf5b17 100644 --- a/source/de/anomic/kelondro/kelondroFixedWidthArray.java +++ b/source/de/anomic/kelondro/kelondroFixedWidthArray.java @@ -128,7 +128,18 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro } public synchronized void remove(int index) throws IOException { - deleteNode(new Handle(index)); + if (index >= size()) throw new IOException("remove: index " + index + " out of bounds " + size()); + + // get the node at position index + Handle h = new Handle(index); + Node n = getNode(h); + + // erase the row + n.setValueRow(null); + n.commit(CP_NONE); + + // mark row as deleted so it can be re-used + deleteNode(h); } public void print() throws IOException { diff --git a/source/de/anomic/kelondro/kelondroRowCollection.java b/source/de/anomic/kelondro/kelondroRowCollection.java index 2fb0a4459..27a99f9d2 100644 --- a/source/de/anomic/kelondro/kelondroRowCollection.java +++ b/source/de/anomic/kelondro/kelondroRowCollection.java @@ -89,7 +89,7 @@ public class kelondroRowCollection { this.lastTimeRead = (exportedCollection.getColLong(exp_last_read) + 10957) * day; this.lastTimeWrote = (exportedCollection.getColLong(exp_last_wrote) + 10957) * day; String sortOrderKey = exportedCollection.getColString(exp_order_type, null); - if (sortOrderKey.equals("__")) { + if ((sortOrderKey == null) || (sortOrderKey.equals("__"))) { this.sortOrder = null; } else { this.sortOrder = kelondroNaturalOrder.bySignature(sortOrderKey); diff --git a/source/de/anomic/plasma/plasmaDHTChunk.java b/source/de/anomic/plasma/plasmaDHTChunk.java index fe35dc81f..27f515033 100644 --- a/source/de/anomic/plasma/plasmaDHTChunk.java +++ b/source/de/anomic/plasma/plasmaDHTChunk.java @@ -121,14 +121,14 @@ public class plasmaDHTChunk { return this.status; } - public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount) { + public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount, int maxtime) { try { this.log = log; this.wordIndex = wordIndex; this.lurls = lurls; this.startPointHash = selectTransferStart(); log.logFine("Selected hash " + this.startPointHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, this.startPointHash)); - selectTransferContainers(this.startPointHash, minCount, maxCount); + selectTransferContainers(this.startPointHash, minCount, maxCount, maxtime); // count the indexes, can be smaller as expected this.idxCount = indexCounter(); @@ -141,13 +141,13 @@ public class plasmaDHTChunk { } } - public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount, String startHash) { + public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount, int maxtime, String startHash) { try { this.log = log; this.wordIndex = wordIndex; this.lurls = lurls; log.logFine("Demanded hash " + startHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, this.startPointHash)); - selectTransferContainers(startHash, minCount, maxCount); + selectTransferContainers(startHash, minCount, maxCount, maxtime); // count the indexes, can be smaller as expected this.idxCount = indexCounter(); @@ -175,15 +175,15 @@ public class plasmaDHTChunk { return startPointHash; } - private void selectTransferContainers(String hash, int mincount, int maxcount) throws InterruptedException { + private void selectTransferContainers(String hash, int mincount, int maxcount, int maxtime) throws InterruptedException { try { this.selectionStartTime = System.currentTimeMillis(); - int refcountRAM = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount); + int refcountRAM = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount, maxtime); if (refcountRAM >= mincount) { log.logFine("DHT selection from RAM: " + refcountRAM + " entries"); return; } - int refcountFile = selectTransferContainersResource(hash, plasmaWordIndex.RL_WORDFILES, maxcount); + int refcountFile = selectTransferContainersResource(hash, plasmaWordIndex.RL_WORDFILES, maxcount, maxtime); log.logFine("DHT selection from FILE: " + refcountFile + " entries, RAM provided only " + refcountRAM + " entries"); return; } finally { @@ -191,7 +191,7 @@ public class plasmaDHTChunk { } } - private int selectTransferContainersResource(String hash, int resourceLevel, int maxcount) throws InterruptedException { + private int selectTransferContainersResource(String hash, int resourceLevel, int maxcount, int maxtime) throws InterruptedException { // the hash is a start hash from where the indexes are picked ArrayList tmpContainers = new ArrayList(maxcount); try { @@ -205,16 +205,15 @@ public class plasmaDHTChunk { urlCache = new HashMap(); double maximumDistance = ((double) peerRedundancy * 2) / ((double) yacyCore.seedDB.sizeConnected()); - + long timeout = (maxtime < 0) ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime; while ( (maxcount > refcount) && (indexContainerIterator.hasNext()) && ((container = (indexContainer) indexContainerIterator.next()) != null) && (container.size() > 0) && - ( - (tmpContainers.size() == 0) || - (yacyDHTAction.dhtDistance(container.getWordHash(), ((indexContainer) tmpContainers.get(0)).getWordHash()) < maximumDistance) - ) + ((tmpContainers.size() == 0) || + (yacyDHTAction.dhtDistance(container.getWordHash(), ((indexContainer) tmpContainers.get(0)).getWordHash()) < maximumDistance)) && + (System.currentTimeMillis() < timeout) ) { // check for interruption if (Thread.currentThread().isInterrupted()) throw new InterruptedException("Shutdown in progress"); @@ -225,7 +224,7 @@ public class plasmaDHTChunk { wholesize = container.size(); urlIter = container.entries(); // iterate over indexes to fetch url entries and store them in the urlCache - while ((urlIter.hasNext()) && (maxcount > refcount)) { + while ((urlIter.hasNext()) && (maxcount > refcount) && (System.currentTimeMillis() < timeout)) { iEntry = (indexEntry) urlIter.next(); lurl = lurls.load(iEntry.urlHash(), iEntry); if ((lurl == null) || (lurl.url() == null)) { diff --git a/source/de/anomic/plasma/plasmaDHTFlush.java b/source/de/anomic/plasma/plasmaDHTFlush.java index db4a2ab42..9284bd254 100644 --- a/source/de/anomic/plasma/plasmaDHTFlush.java +++ b/source/de/anomic/plasma/plasmaDHTFlush.java @@ -169,7 +169,7 @@ public class plasmaDHTFlush extends Thread { // selecting 500 words to transfer this.status = "Running: Selecting chunk " + iteration; - newDHTChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.sb.urlPool.loadedURL, this.chunkSize/3*2, this.chunkSize, this.startPointHash); + newDHTChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.sb.urlPool.loadedURL, this.chunkSize/3*2, this.chunkSize, -1, this.startPointHash); /* If we havn't selected a word chunk this could be because of * a) no words are left in the index diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 186906801..f4cace38a 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -1013,7 +1013,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser ) { // generate new chunk int minChunkSize = (int) getConfigLong("indexDistribution.minChunkSize", 30); - dhtTransferChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, minChunkSize, dhtTransferIndexCount); + dhtTransferChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, minChunkSize, dhtTransferIndexCount, 5000); doneSomething = true; }