bugfix in collection index. the index for collections was not created correctly

The bugfix includes a migration function which starts automatically
after startup of yacy.
This applies only to you, if you are using the new collection index.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@2711 6c8d7289-2bf4-0310-a012-ef5d649a1542
orbiter 19 years ago
parent 07155ef3b0
commit 43614f1b36

@ -1,3 +1,29 @@
// htmlFilterInputStream.java
// (C) 2005, 2006 by Michael Peter Christen; mc@anomic.de, Frankfurt a. M., Germany
// first published 2005 on http://www.anomic.de
// This is a part of YaCy, a peer-to-peer based web search engine
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.htmlFilter;
import java.io.BufferedInputStream;

@ -33,10 +33,12 @@ import java.util.Map;
import java.util.Set;
import de.anomic.server.serverFileUtils;
import de.anomic.server.logging.serverLog;
public class kelondroCollectionIndex {
protected kelondroIndex index;
int keylength;
private File path;
private String filenameStub;
private int loadfactor;
@ -47,15 +49,19 @@ public class kelondroCollectionIndex {
private static final int idx_col_key = 0; // the index
private static final int idx_col_chunksize = 1; // chunksize (number of bytes in a single chunk, needed for migration option)
private static final int idx_col_chunkcount = 2; // chunkcount (number of chunks in this collection) needed to identify array file that has the chunks
private static final int idx_col_indexpos = 3; // indexpos (position in index file)
private static final int idx_col_lastread = 4; // a time stamp, update time in days since 1.1.2000
private static final int idx_col_lastwrote = 5; // a time stamp, update time in days since 1.1.2000
private static final int idx_col_clusteridx = 3; // selector for right cluster file, must be >= arrayIndex(chunkcount)
private static final int idx_col_flags = 4; // flags (for future use)
private static final int idx_col_indexpos = 5; // indexpos (position in index file)
private static final int idx_col_lastread = 6; // a time stamp, update time in days since 1.1.2000
private static final int idx_col_lastwrote = 7; // a time stamp, update time in days since 1.1.2000
private static kelondroRow indexRow(int keylen) {
private kelondroRow indexRow() {
return new kelondroRow(
"byte[] key-" + keylen + "," +
"byte[] key-" + keylength + "," +
"int chunksize-4 {b256}," +
"int chunkcount-4 {b256}," +
"byte clusteridx-1 {b256}," +
"byte flags-1 {b256}," +
"int indexpos-4 {b256}," +
"short lastread-2 {b256}, " +
"short lastwrote-2 {b256}"
@ -78,7 +84,7 @@ public class kelondroCollectionIndex {
private static File propertyFile(File path, String filenameStub, int loadfactor, int chunksize) {
String lf = fillZ(Integer.toHexString(loadfactor).toUpperCase(), 2);
String cs = fillZ(Integer.toHexString(chunksize).toUpperCase(), 4);
return new File(path, filenameStub + "." + lf + "." + cs + ".properties"); // kelondro collection array
return new File(path, filenameStub + "." + lf + "." + cs + ".properties");
public kelondroCollectionIndex(File path, String filenameStub, int keyLength, kelondroOrder indexOrder,
@ -87,11 +93,87 @@ public class kelondroCollectionIndex {
// the buffersize is number of bytes that are only used if the kelondroFlexTable is backed up with a kelondroTree
this.path = path;
this.filenameStub = filenameStub;
this.keylength = keyLength;
this.playloadrow = rowdef;
this.loadfactor = loadfactor;
// create index table
index = new kelondroFlexTable(path, filenameStub + ".index.table", buffersize, preloadTime, indexRow(keyLength), indexOrder);
boolean ramIndexGeneration = false;
boolean fileIndexGeneration = !(new File(path, filenameStub + ".index").exists());
if (ramIndexGeneration) index = new kelondroRAMIndex(indexOrder, indexRow());
if (fileIndexGeneration) index = new kelondroFlexTable(path, filenameStub + ".index", buffersize, preloadTime, indexRow(), indexOrder);
// open array files
this.arrays = new HashMap(); // all entries will be dynamically created with getArray()
if (((fileIndexGeneration) || (ramIndexGeneration))) {
openAllArrayFiles(((fileIndexGeneration) || (ramIndexGeneration)), indexOrder);
// open/create index table
if (index == null) index = openIndexFile(path, filenameStub, indexOrder, buffersize, preloadTime, loadfactor, rowdef);
private void openAllArrayFiles(boolean indexGeneration, kelondroOrder indexOrder) throws IOException {
String[] list = this.path.list();
kelondroFixedWidthArray array;
kelondroRow irow = indexRow();
int t = kelondroRowCollection.daysSince2000(System.currentTimeMillis());
for (int i = 0; i < list.length; i++) if (list[i].endsWith(".kca")) {
// open array
int pos = list[i].indexOf('.');
if (pos < 0) continue;
int chunksize = Integer.parseInt(list[i].substring(pos + 4, pos + 8), 16);
int partitionNumber = Integer.parseInt(list[i].substring(pos + 9, pos + 11), 16);
int serialNumber = Integer.parseInt(list[i].substring(pos + 12, pos + 14), 16);
try {
array = openArrayFile(partitionNumber, serialNumber, true);
} catch (IOException e) {
// remember that we opened the array
arrays.put(partitionNumber + "-" + chunksize, array);
if ((index != null) && (indexGeneration)) {
// loop over all elements in array and create index entry for each row
kelondroRow.Entry aentry, ientry;
byte[] key;
long start = System.currentTimeMillis();
long lastlog = start;
for (int j = 0; j < array.size(); j++) {
aentry = array.get(j);
key = aentry.getColBytes(0);
if (key == null) continue; // skip deleted entries
kelondroRowSet indexrows = new kelondroRowSet(this.playloadrow, aentry.getColBytes(1));
ientry = irow.newEntry();
ientry.setCol(idx_col_key, key);
ientry.setCol(idx_col_chunksize, chunksize);
ientry.setCol(idx_col_chunkcount, indexrows.size());
ientry.setCol(idx_col_clusteridx, (byte) partitionNumber);
ientry.setCol(idx_col_flags, (byte) 0);
ientry.setCol(idx_col_indexpos, j);
ientry.setCol(idx_col_lastread, t);
ientry.setCol(idx_col_lastwrote, t);
// write a log
if (System.currentTimeMillis() - lastlog > 30000) {
serverLog.logFine("STARTUP", "created " + j + " RWI index entries. " + (((System.currentTimeMillis() - start) * (array.size() - j) / j) / 60000) + " minutes remaining for this array");
lastlog = System.currentTimeMillis();
private kelondroIndex openIndexFile(File path, String filenameStub, kelondroOrder indexOrder,
long buffersize, long preloadTime,
int loadfactor, kelondroRow rowdef) throws IOException {
// open/create index table
kelondroFlexTable theindex = new kelondroFlexTable(path, filenameStub + ".index", buffersize, preloadTime, indexRow(), indexOrder);
// save/check property file for this array
File propfile = propertyFile(path, filenameStub, loadfactor, rowdef.objectsize());
@ -108,19 +190,20 @@ public class kelondroCollectionIndex {
props.put("rowdef", rowdef.toString());
serverFileUtils.saveMap(propfile, props, "CollectionIndex properties");
// open array files
this.arrays = new HashMap(); // all entries will be dynamically created with getArray()
return theindex;
private kelondroFixedWidthArray openArrayFile(int partitionNumber, int serialNumber, boolean create) throws IOException {
File f = arrayFile(path, filenameStub, loadfactor, playloadrow.objectsize(), partitionNumber, serialNumber);
int load = arrayCapacity(partitionNumber);
kelondroRow rowdef = new kelondroRow(
"byte[] key-" + index.row().width(0) + "," +
"byte[] key-" + keylength + "," +
"byte[] collection-" + (kelondroRowCollection.exportOverheadSize + load * this.playloadrow.objectsize())
if ((!(f.exists())) && (!create)) return null;
return new kelondroFixedWidthArray(f, rowdef, 0);
kelondroFixedWidthArray a = new kelondroFixedWidthArray(f, rowdef, 0);
serverLog.logFine("STARTUP", "opened array file " + f + " with " + a.size() + " RWIs");
return a;
private kelondroFixedWidthArray getArray(int partitionNumber, int serialNumber, int chunksize) {
@ -191,82 +274,86 @@ public class kelondroCollectionIndex {
return 0;
// overwrite the old collection
// read old information
int oldchunksize = (int) oldindexrow.getColLong(idx_col_chunksize); // needed only for migration
int oldchunkcount = (int) oldindexrow.getColLong(idx_col_chunkcount);
int oldrownumber = (int) oldindexrow.getColLong(idx_col_indexpos);
int oldPartitionNumber = arrayIndex(oldchunkcount);
int oldSerialNumber = 0;
if (merge) {
// load the old collection and join it with the old
kelondroRowSet oldcollection = getdelete(oldindexrow, false, false);
// join with new collection
collection = oldcollection;
int removed = 0;
if (removekeys != null) {
// load the old collection and remove keys
kelondroRowSet oldcollection = getdelete(oldindexrow, false, false);
// remove the keys from the set
Iterator i = removekeys.iterator();
Object k;
while (i.hasNext()) {
k = i.next();
if (k instanceof byte[]) {if (oldcollection.remove((byte[]) k) != null) removed++;}
if (k instanceof String) {if (oldcollection.remove(((String) k).getBytes()) != null) removed++;}
collection = oldcollection;
// overwrite the old collection
// read old information
int oldchunksize = (int) oldindexrow.getColLong(idx_col_chunksize); // needed only for migration
int oldchunkcount = (int) oldindexrow.getColLong(idx_col_chunkcount);
int oldrownumber = (int) oldindexrow.getColLong(idx_col_indexpos);
int oldPartitionNumber = (int) oldindexrow.getColByte(idx_col_clusteridx);
assert (oldPartitionNumber >= arrayIndex(oldchunkcount));
int oldSerialNumber = 0;
if (merge) {
// load the old collection and join it
kelondroRowSet oldcollection = getdelete(oldindexrow, false, false);
// join with new collection
collection = oldcollection;
int removed = 0;
if (removekeys != null) {
// load the old collection and remove keys
kelondroRowSet oldcollection = getdelete(oldindexrow, false, false);
// remove the keys from the set
Iterator i = removekeys.iterator();
Object k;
while (i.hasNext()) {
k = i.next();
if ((k instanceof byte[]) && (oldcollection.remove((byte[]) k) != null)) removed++;
if ((k instanceof String) && (oldcollection.remove(((String) k).getBytes()) != null)) removed++;
collection = oldcollection;
if (collection.size() == 0) {
if (deletecomplete) {
kelondroFixedWidthArray array = getArray(oldPartitionNumber, oldSerialNumber, oldchunksize);
return removed;
if (collection.size() == 0) {
if (deletecomplete) {
kelondroFixedWidthArray array = getArray(oldPartitionNumber, oldSerialNumber, oldchunksize);
return removed;
int newPartitionNumber = arrayIndex(collection.size());
int newSerialNumber = 0;
int newPartitionNumber = arrayIndex(collection.size());
int newSerialNumber = 0;
// see if we need new space or if we can overwrite the old space
if (oldPartitionNumber == newPartitionNumber) {
// we don't need a new slot, just write into the old one
// see if we need new space or if we can overwrite the old space
if (oldPartitionNumber == newPartitionNumber) {
// we don't need a new slot, just write into the old one
// find array file
kelondroFixedWidthArray array = getArray(newPartitionNumber, newSerialNumber, this.playloadrow.objectsize());
// find array file
kelondroFixedWidthArray array = getArray(newPartitionNumber, newSerialNumber, this.playloadrow.objectsize());
// define row
kelondroRow.Entry arrayEntry = array.row().newEntry();
arrayEntry.setCol(0, key);
arrayEntry.setCol(1, collection.exportCollection());
// define row
kelondroRow.Entry arrayEntry = array.row().newEntry();
arrayEntry.setCol(0, key);
arrayEntry.setCol(1, collection.exportCollection());
// overwrite entry in this array
array.set(oldrownumber, arrayEntry);
// overwrite entry in this array
array.set(oldrownumber, arrayEntry);
// update the index entry
oldindexrow.setCol(idx_col_chunkcount, collection.size());
oldindexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
} else {
// we need a new slot, that means we must first delete the old entry
// find array file
kelondroFixedWidthArray array = getArray(oldPartitionNumber, oldSerialNumber, oldchunksize);
// update the index entry
oldindexrow.setCol(idx_col_chunkcount, collection.size());
oldindexrow.setCol(idx_col_clusteridx, (byte) newPartitionNumber);
oldindexrow.setCol(idx_col_flags, (byte) 0);
oldindexrow.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
} else {
// we need a new slot, that means we must first delete the old entry
// find array file
kelondroFixedWidthArray array = getArray(oldPartitionNumber, oldSerialNumber, oldchunksize);
// delete old entry
// delete old entry
// write a new entry in the other array
overwrite(key, collection);
return removed;
// write a new entry in the other array
overwrite(key, collection);
return removed;
@ -275,7 +362,8 @@ public class kelondroCollectionIndex {
// simply store a collection without check if the collection existed before
// find array file
kelondroFixedWidthArray array = getArray(arrayIndex(collection.size()), 0, this.playloadrow.objectsize());
int clusteridx = arrayIndex(collection.size());
kelondroFixedWidthArray array = getArray(clusteridx, 0, this.playloadrow.objectsize());
// define row
kelondroRow.Entry arrayEntry = array.row().newEntry();
@ -290,6 +378,8 @@ public class kelondroCollectionIndex {
indexEntry.setCol(idx_col_key, key);
indexEntry.setCol(idx_col_chunksize, this.playloadrow.objectsize());
indexEntry.setCol(idx_col_chunkcount, collection.size());
indexEntry.setCol(idx_col_clusteridx, (byte) clusteridx);
indexEntry.setCol(idx_col_flags, (byte) 0);
indexEntry.setCol(idx_col_indexpos, (long) newRowNumber);
indexEntry.setCol(idx_col_lastread, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
indexEntry.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
@ -326,10 +416,11 @@ public class kelondroCollectionIndex {
// call this only within a synchronized(index) environment
// read values
int chunksize = (int) indexrow.getColLong(idx_col_chunksize);
int chunkcount = (int) indexrow.getColLong(idx_col_chunkcount);
int rownumber = (int) indexrow.getColLong(idx_col_indexpos);
int partitionnumber = arrayIndex(chunkcount);
int chunksize = (int) indexrow.getColLong(idx_col_chunksize);
int chunkcount = (int) indexrow.getColLong(idx_col_chunkcount);
int rownumber = (int) indexrow.getColLong(idx_col_indexpos);
int partitionnumber = (int) indexrow.getColByte(idx_col_clusteridx);
assert(partitionnumber >= arrayIndex(chunkcount));
int serialnumber = 0;
// open array entry
@ -347,6 +438,8 @@ public class kelondroCollectionIndex {
indexEntry.setCol(idx_col_key, arrayrow.getColBytes(0));
indexEntry.setCol(idx_col_chunksize, this.playloadrow.objectsize());
indexEntry.setCol(idx_col_chunkcount, collection.size());
indexEntry.setCol(idx_col_clusteridx, (byte) partitionnumber);
indexEntry.setCol(idx_col_flags, (byte) 0);
indexEntry.setCol(idx_col_indexpos, (long) rownumber);
indexEntry.setCol(idx_col_lastread, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
indexEntry.setCol(idx_col_lastwrote, kelondroRowCollection.daysSince2000(System.currentTimeMillis()));
@ -455,10 +548,9 @@ public class kelondroCollectionIndex {
collectionIndex.merge(("key-" + i).getBytes(), collection);
// printout of index
kelondroFlexTable index = new kelondroFlexTable(path, filenameStub + ".index", buffersize, preloadTime, indexRow(9), kelondroNaturalOrder.naturalOrder);
kelondroFlexTable index = new kelondroFlexTable(path, filenameStub + ".index", buffersize, preloadTime, collectionIndex.indexRow(), kelondroNaturalOrder.naturalOrder);
} catch (IOException e) {

@ -128,7 +128,18 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
public synchronized void remove(int index) throws IOException {
deleteNode(new Handle(index));
if (index >= size()) throw new IOException("remove: index " + index + " out of bounds " + size());
// get the node at position index
Handle h = new Handle(index);
Node n = getNode(h);
// erase the row
// mark row as deleted so it can be re-used
public void print() throws IOException {

@ -89,7 +89,7 @@ public class kelondroRowCollection {
this.lastTimeRead = (exportedCollection.getColLong(exp_last_read) + 10957) * day;
this.lastTimeWrote = (exportedCollection.getColLong(exp_last_wrote) + 10957) * day;
String sortOrderKey = exportedCollection.getColString(exp_order_type, null);
if (sortOrderKey.equals("__")) {
if ((sortOrderKey == null) || (sortOrderKey.equals("__"))) {
this.sortOrder = null;
} else {
this.sortOrder = kelondroNaturalOrder.bySignature(sortOrderKey);

@ -121,14 +121,14 @@ public class plasmaDHTChunk {
return this.status;
public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount) {
public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount, int maxtime) {
try {
this.log = log;
this.wordIndex = wordIndex;
this.lurls = lurls;
this.startPointHash = selectTransferStart();
log.logFine("Selected hash " + this.startPointHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, this.startPointHash));
selectTransferContainers(this.startPointHash, minCount, maxCount);
selectTransferContainers(this.startPointHash, minCount, maxCount, maxtime);
// count the indexes, can be smaller as expected
this.idxCount = indexCounter();
@ -141,13 +141,13 @@ public class plasmaDHTChunk {
public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount, String startHash) {
public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount, int maxtime, String startHash) {
try {
this.log = log;
this.wordIndex = wordIndex;
this.lurls = lurls;
log.logFine("Demanded hash " + startHash + " as start point for index distribution, distance = " + yacyDHTAction.dhtDistance(yacyCore.seedDB.mySeed.hash, this.startPointHash));
selectTransferContainers(startHash, minCount, maxCount);
selectTransferContainers(startHash, minCount, maxCount, maxtime);
// count the indexes, can be smaller as expected
this.idxCount = indexCounter();
@ -175,15 +175,15 @@ public class plasmaDHTChunk {
return startPointHash;
private void selectTransferContainers(String hash, int mincount, int maxcount) throws InterruptedException {
private void selectTransferContainers(String hash, int mincount, int maxcount, int maxtime) throws InterruptedException {
try {
this.selectionStartTime = System.currentTimeMillis();
int refcountRAM = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount);
int refcountRAM = selectTransferContainersResource(hash, plasmaWordIndex.RL_RAMCACHE, maxcount, maxtime);
if (refcountRAM >= mincount) {
log.logFine("DHT selection from RAM: " + refcountRAM + " entries");
int refcountFile = selectTransferContainersResource(hash, plasmaWordIndex.RL_WORDFILES, maxcount);
int refcountFile = selectTransferContainersResource(hash, plasmaWordIndex.RL_WORDFILES, maxcount, maxtime);
log.logFine("DHT selection from FILE: " + refcountFile + " entries, RAM provided only " + refcountRAM + " entries");
} finally {
@ -191,7 +191,7 @@ public class plasmaDHTChunk {
private int selectTransferContainersResource(String hash, int resourceLevel, int maxcount) throws InterruptedException {
private int selectTransferContainersResource(String hash, int resourceLevel, int maxcount, int maxtime) throws InterruptedException {
// the hash is a start hash from where the indexes are picked
ArrayList tmpContainers = new ArrayList(maxcount);
try {
@ -205,16 +205,15 @@ public class plasmaDHTChunk {
urlCache = new HashMap();
double maximumDistance = ((double) peerRedundancy * 2) / ((double) yacyCore.seedDB.sizeConnected());
long timeout = (maxtime < 0) ? Long.MAX_VALUE : System.currentTimeMillis() + maxtime;
while (
(maxcount > refcount) &&
(indexContainerIterator.hasNext()) &&
((container = (indexContainer) indexContainerIterator.next()) != null) &&
(container.size() > 0) &&
(tmpContainers.size() == 0) ||
(yacyDHTAction.dhtDistance(container.getWordHash(), ((indexContainer) tmpContainers.get(0)).getWordHash()) < maximumDistance)
((tmpContainers.size() == 0) ||
(yacyDHTAction.dhtDistance(container.getWordHash(), ((indexContainer) tmpContainers.get(0)).getWordHash()) < maximumDistance)) &&
(System.currentTimeMillis() < timeout)
) {
// check for interruption
if (Thread.currentThread().isInterrupted()) throw new InterruptedException("Shutdown in progress");
@ -225,7 +224,7 @@ public class plasmaDHTChunk {
wholesize = container.size();
urlIter = container.entries();
// iterate over indexes to fetch url entries and store them in the urlCache
while ((urlIter.hasNext()) && (maxcount > refcount)) {
while ((urlIter.hasNext()) && (maxcount > refcount) && (System.currentTimeMillis() < timeout)) {
iEntry = (indexEntry) urlIter.next();
lurl = lurls.load(iEntry.urlHash(), iEntry);
if ((lurl == null) || (lurl.url() == null)) {

@ -169,7 +169,7 @@ public class plasmaDHTFlush extends Thread {
// selecting 500 words to transfer
this.status = "Running: Selecting chunk " + iteration;
newDHTChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.sb.urlPool.loadedURL, this.chunkSize/3*2, this.chunkSize, this.startPointHash);
newDHTChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.sb.urlPool.loadedURL, this.chunkSize/3*2, this.chunkSize, -1, this.startPointHash);
/* If we havn't selected a word chunk this could be because of
* a) no words are left in the index

@ -1013,7 +1013,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
) {
// generate new chunk
int minChunkSize = (int) getConfigLong("indexDistribution.minChunkSize", 30);
dhtTransferChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, minChunkSize, dhtTransferIndexCount);
dhtTransferChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, minChunkSize, dhtTransferIndexCount, 5000);
doneSomething = true;
