added a column index for tables in blob files. This is heavily used

during receiving of DHT submissions and when answering remote search
requests. Both events together may have caused IO-deadlocking and this
commit shall fix that.
pull/1/head
Michael Peter Christen 13 years ago
parent ffb72249ea
commit 0b67a0a5d8

@ -0,0 +1,168 @@
/**
* MapColumnIndex
* Copyright 2012 by Michael Christen
* First released 01.02.2012 at http://yacy.net
*
* $LastChangedDate$
* $LastChangedRevision$
* $LastChangedBy$
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
*/
package net.yacy.kelondro.blob;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import net.yacy.cora.document.ASCII;
import net.yacy.kelondro.order.NaturalOrder;
/**
* a mapping from a column name to maps with the value of the columns to the primary keys where the entry exist in the table
*/
public class MapColumnIndex extends HashMap<String, Map<String, Collection<byte[]>>> implements Map<String, Map<String, Collection<byte[]>>> {
private static final long serialVersionUID=-424741536889467566L;
public MapColumnIndex() {
super();
}
public Collection<byte[]> getIndex(final String whereKey, final String isValue) throws UnsupportedOperationException {
Map<String, Collection<byte[]>> references = this.get(whereKey);
if (references == null) throw new UnsupportedOperationException();
Collection<byte[]> indexes = references.get(isValue);
if (indexes == null) return new ArrayList<byte[]>(0); // empty collection
return indexes;
}
/**
* create a full index for the whereKey
* @param whereKey
* @param isValue
* @param table
*/
public void init(final String whereKey, final String isValue, final Iterator<Map.Entry<byte[], Map<String, String>>> table) {
Map<String, Collection<byte[]>> valueIdxMap = new HashMap<String, Collection<byte[]>>();
this.put(whereKey, valueIdxMap);
Map.Entry<byte[], Map<String, String>> line;
while (table.hasNext()) {
line = table.next();
String value = line.getValue().get(whereKey);
if (value == null) continue; // we don't need to remember that
indexupdate(line.getKey(), valueIdxMap, value);
}
}
/**
* update an index entry
* @param primarykey the primary key for the row that is updated
* @param row the row that was updated (a mapping from column names to values)
*/
public void update(final byte[] primarykey, final Map<String, String> row) {
for (Map.Entry<String, Map<String, Collection<byte[]>>> entry: this.entrySet()) {
// create an index for all columns that we track
String value = row.get(entry.getKey());
if (value == null) continue; // we don't need to remember that
indexupdate(primarykey, entry.getValue(), value);
}
}
private void indexupdate(final byte[] primarykey, final Map<String, Collection<byte[]>> valueIdxMap, final String value) {
Collection<byte[]> indexes = valueIdxMap.get(value);
if (indexes == null) {
// create a new index entry
indexes = new ArrayList<byte[]>(1);
indexes.add(primarykey);
valueIdxMap.put(value, indexes);
} else {
// update the existing index entry
// check if value already exist
if (!net.yacy.kelondro.util.ByteBuffer.contains(indexes, primarykey)) {
indexes.add(primarykey);
}
}
}
/**
* delete all references to the primary key
* @param primarykey
*/
public void delete(final byte[] primarykey) {
for (Map.Entry<String, Map<String, Collection<byte[]>>> entry: this.entrySet()) {
// we must check all index reference maps: iterate over entries
indexdelete(primarykey, entry.getValue());
}
}
private void indexdelete(final byte[] index, final Map<String, Collection<byte[]>> valueIdxMap) {
Iterator<Map.Entry<String, Collection<byte[]>>> i = valueIdxMap.entrySet().iterator();
Map.Entry<String, Collection<byte[]>> ref;
while (i.hasNext()) {
ref = i.next();
net.yacy.kelondro.util.ByteBuffer.remove(ref.getValue(), index);
if (ref.getValue().isEmpty()) {
i.remove();
}
}
}
private static Collection<byte[]> getIndexWithExceptionHandler(final MapColumnIndex idx, final String whereKey, final String isValue, Map<byte[], Map<String, String>> table) {
try {
return idx.getIndex(whereKey, isValue);
} catch (UnsupportedOperationException e) {
idx.init(whereKey, isValue, table.entrySet().iterator());
try {
return idx.getIndex(whereKey, isValue);
} catch (UnsupportedOperationException ee) {
throw ee;
}
}
}
private static void printIndex(Collection<byte[]> index) {
System.out.print("idx{");
int c = 0;
for (byte[] a: index) {
if (c++ != 0) System.out.print(", ");
System.out.print(ASCII.String(a));
}
System.out.print("}");
}
public static void main(String[] args) {
Map<byte[], Map<String, String>> table = new TreeMap<byte[], Map<String, String>>(NaturalOrder.naturalOrder);
Map<String, String> row;
row = new HashMap<String, String>(); row.put("a", "1"); row.put("b", "2"); row.put("c", "2"); table.put("line1".getBytes(), row);
row = new HashMap<String, String>(); row.put("a", "3"); row.put("b", "2"); row.put("c", "4"); table.put("line2".getBytes(), row);
row = new HashMap<String, String>(); row.put("a", "5"); row.put("b", "2"); row.put("c", "4"); table.put("line3".getBytes(), row);
row = new HashMap<String, String>(); row.put("a", "6"); row.put("b", "7"); row.put("c", "8"); table.put("line4".getBytes(), row);
MapColumnIndex idx = new MapColumnIndex();
System.out.print("colum b, value 2: "); printIndex(getIndexWithExceptionHandler(idx, "b", "2", table)); System.out.println();
System.out.print("colum c, value 4: "); printIndex(getIndexWithExceptionHandler(idx, "c", "4", table)); System.out.println();
System.out.print("colum b, value 2: "); printIndex(getIndexWithExceptionHandler(idx, "b", "7", table)); System.out.println();
System.out.print("colum d, value 0: "); printIndex(getIndexWithExceptionHandler(idx, "d", "0", table)); System.out.println();
row = new HashMap<String, String>(); row.put("a", "9"); row.put("b", "9"); row.put("c", "4"); table.put("line5".getBytes(), row);
idx.update("line5".getBytes(), row);
System.out.print("colum c, value 4: "); printIndex(getIndexWithExceptionHandler(idx, "c", "4", table)); System.out.println();
}
}

@ -29,9 +29,11 @@ package net.yacy.kelondro.blob;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import net.yacy.cora.document.UTF8;
@ -55,6 +57,7 @@ public class MapDataMining extends MapHeap {
private Map<String, ScoreMap<String>> sortClusterMap; // a String-kelondroMScoreCluster - relation
private Map<String, Long> accLong; // to store accumulations of Long cells
private Map<String, Float> accFloat; // to store accumulations of Float cells
private final MapColumnIndex columnIndex; // to store fast select-where indexes
@SuppressWarnings("unchecked")
public MapDataMining(final File heapFile,
@ -73,6 +76,8 @@ public class MapDataMining extends MapHeap {
this.longaccfields = longaccfields;
this.floataccfields = floataccfields;
this.columnIndex = new MapColumnIndex();
ScoreMap<String>[] cluster = null;
if (sortfields == null) this.sortClusterMap = null; else {
this.sortClusterMap = new ConcurrentHashMap<String, ScoreMap<String>>();
@ -192,6 +197,8 @@ public class MapDataMining extends MapHeap {
this.accFloat.put(floataccfield, FLOAT0);
}
}
this.columnIndex.clear();
}
@Override
@ -216,6 +223,8 @@ public class MapDataMining extends MapHeap {
// update sortCluster
if (this.sortClusterMap != null) updateSortCluster(UTF8.String(key), newMap);
this.columnIndex.update(key, newMap);
}
private void updateAcc(final Map<String, String> map, final boolean add) {
@ -294,6 +303,8 @@ public class MapDataMining extends MapHeap {
}
}
super.delete(key);
this.columnIndex.delete(key);
}
private void deleteSortCluster(final String key) {
@ -315,6 +326,10 @@ public class MapDataMining extends MapHeap {
return new string2bytearrayIterator(cluster.keys(up));
}
private synchronized Iterator<byte[]> keys() throws IOException {
return super.keys(true, null);
}
private static class string2bytearrayIterator implements Iterator<byte[]> {
private final Iterator<String> s;
@ -342,15 +357,35 @@ public class MapDataMining extends MapHeap {
}
@Override
public synchronized Iterator<Map.Entry<byte[], Map<String, String>>> entries(final String whereKey, final String isValue) throws IOException {
return super.entries(whereKey, isValue);
Collection<byte[]> idx = null;
try {
idx = this.columnIndex.getIndex(whereKey, isValue);
} catch (UnsupportedOperationException e) {
this.columnIndex.init(whereKey, isValue, new FullMapIterator(keys()));
try {
idx = this.columnIndex.getIndex(whereKey, isValue);
} catch (UnsupportedOperationException ee) {
throw ee;
}
}
Map<byte[], Map<String, String>> resultMap = new TreeMap<byte[], Map<String, String>>(this.ordering());
for (byte[] pk: idx) {
try {
resultMap.put(pk, this.get(pk));
} catch (final IOException e) {
Log.logException(e);
} catch (final RowSpaceExceededException e) {
Log.logException(e);
}
}
return resultMap.entrySet().iterator();
}
public synchronized Iterator<Map.Entry<byte[], Map<String, String>>> entries(final boolean up, final String field) {
return new MapIterator(keys(up, field), null, null);
return new FullMapIterator(keys(up, field));
}
public synchronized long getLongAcc(final String field) {
final Long accumulator = this.accLong.get(field);
if (accumulator == null) return -1;

@ -82,6 +82,14 @@ public class MapHeap implements Map<byte[], Map<String, String>> {
return this.blob.keylength();
}
/**
* get the ordering of the primary keys
* @return
*/
public ByteOrder ordering() {
return this.blob.ordering();
}
/**
* clears the content of the database
* @throws IOException
@ -366,6 +374,10 @@ public class MapHeap implements Map<byte[], Map<String, String>> {
return new KeyIterator(up, rotating, firstKey, secondKey);
}
public synchronized CloneableIterator<byte[]> keys(boolean up, byte[] firstKey) throws IOException {
return this.blob.keys(up, firstKey);
}
public class KeyIterator implements CloneableIterator<byte[]>, Iterator<byte[]> {
final boolean up, rotating;
@ -406,17 +418,13 @@ public class MapHeap implements Map<byte[], Map<String, String>> {
}
}
public synchronized Iterator<Map.Entry<byte[], Map<String, String>>> entries(final String whereKey, final String isValue) throws IOException {
return new MapIterator(this.blob.keys(true, null), whereKey, isValue);
}
public synchronized Iterator<Map.Entry<byte[], Map<String, String>>> entries(final boolean up, final boolean rotating) throws IOException {
return new MapIterator(keys(up, rotating), null, null);
return new FullMapIterator(keys(up, rotating));
}
public synchronized Iterator<Map.Entry<byte[], Map<String, String>>> entries(final boolean up, final boolean rotating, final byte[] firstKey, final byte[] secondKey) throws IOException {
return new MapIterator(keys(up, rotating, firstKey, secondKey), null, null);
return new FullMapIterator(keys(up, rotating, firstKey, secondKey));
}
/**
@ -448,18 +456,15 @@ public class MapHeap implements Map<byte[], Map<String, String>> {
public void finalize() {
close();
}
public class MapIterator extends LookAheadIterator<Map.Entry<byte[], Map<String, String>>> implements Iterator<Map.Entry<byte[], Map<String, String>>> {
protected class FullMapIterator extends LookAheadIterator<Map.Entry<byte[], Map<String, String>>> implements Iterator<Map.Entry<byte[], Map<String, String>>> {
// enumerates Map-Type elements
// the key is also included in every map that is returned; it's key is 'key'
private final Iterator<byte[]> keyIterator;
private final String whereKey, isValue;
MapIterator(final Iterator<byte[]> keyIterator, final String whereKey, final String isValue) {
FullMapIterator(final Iterator<byte[]> keyIterator) {
this.keyIterator = keyIterator;
this.whereKey = whereKey;
this.isValue = isValue;
}
@Override
@ -479,19 +484,14 @@ public class MapHeap implements Map<byte[], Map<String, String>> {
continue;
}
if (map == null) continue; // circumvention of a modified exception
// check if the where case holds
if (this.whereKey != null && this.isValue != null) {
String v = map.get(this.whereKey);
if (v == null) continue;
if (!v.equals(this.isValue)) continue;
}
// produce entry
Map.Entry<byte[], Map<String, String>> entry = new AbstractMap.SimpleImmutableEntry<byte[], Map<String, String>>(nextKey, map);
return entry;
}
return null;
}
} // class mapIterator
} // class FullMapIterator
@Override
public void putAll(final Map<? extends byte[], ? extends Map<String, String>> map) {

@ -32,6 +32,7 @@ import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@ -140,6 +141,7 @@ public final class ByteBuffer extends OutputStream {
this.offset = 0;
}
@Override
public void write(final int b) {
write((byte) (b & 0xff));
}
@ -518,6 +520,20 @@ public final class ByteBuffer extends OutputStream {
return false;
}
public static int remove(final Collection<byte[]> collection, final byte[] key) {
Iterator<byte[]> i = collection.iterator();
byte[] v;
int c = 0;
while (i.hasNext()) {
v = i.next();
if (equals(v, key)) {
i.remove();
c++;
}
}
return c;
}
public static List<byte[]> split(final byte[] b, final byte s) {
final ArrayList<byte[]> a = new ArrayList<byte[]>();
int c = 0;

@ -29,10 +29,8 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.ref.SoftReference;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@ -98,8 +96,6 @@ public final class SeedDB implements AlternativeDomainNames {
private Seed mySeed; // my own seed
private final Set<String> myBotIDs; // list of id's that this bot accepts as robots.txt identification
private final Map<String, String> nameLookupCache; // a name-to-hash relation
private final Map<InetAddress, SoftReference<Seed>> ipLookupCache;
public SeedDB(
final File networkRoot,
@ -128,12 +124,6 @@ public final class SeedDB implements AlternativeDomainNames {
this.seedPassiveDB = openSeedTable(this.seedPassiveDBFile);
this.seedPotentialDB = openSeedTable(this.seedPotentialDBFile);
// start our virtual DNS service for yacy peers with empty cache
this.nameLookupCache = new HashMap<String, String>();
// cache for reverse name lookup
this.ipLookupCache = new HashMap<InetAddress, SoftReference<Seed>>();
// check if we are in the seedCaches: this can happen if someone else published our seed
removeMySeed();
@ -184,12 +174,6 @@ public final class SeedDB implements AlternativeDomainNames {
this.seedPassiveDB = openSeedTable(this.seedPassiveDBFile);
this.seedPotentialDB = openSeedTable(this.seedPotentialDBFile);
// start our virtual DNS service for yacy peers with empty cache
this.nameLookupCache.clear();
// cache for reverse name lookup
this.ipLookupCache.clear();
// check if we are in the seedCaches: this can happen if someone else published our seed
removeMySeed();
@ -497,7 +481,6 @@ public final class SeedDB implements AlternativeDomainNames {
//seed.put(yacySeed.LASTSEEN, yacyCore.shortFormatter.format(new Date(yacyCore.universalTime())));
synchronized (this) {
try {
this.nameLookupCache.put(seed.getName(), seed.hash);
final ConcurrentMap<String, String> seedPropMap = seed.getMap();
this.seedActiveDB.insert(ASCII.getBytes(seed.hash), seedPropMap);
this.seedPassiveDB.delete(ASCII.getBytes(seed.hash));
@ -513,7 +496,6 @@ public final class SeedDB implements AlternativeDomainNames {
if (seed.isProper(false) != null) return;
synchronized (this) {
try {
this.nameLookupCache.remove(seed.getName());
this.seedActiveDB.delete(ASCII.getBytes(seed.hash));
this.seedPotentialDB.delete(ASCII.getBytes(seed.hash));
} catch (final Exception e) { Log.logWarning("yacySeedDB", "could not remove hash ("+ e.getClass() +"): "+ e.getMessage()); }
@ -532,7 +514,6 @@ public final class SeedDB implements AlternativeDomainNames {
if (seed.isProper(false) != null) return;
synchronized (this) {
try {
this.nameLookupCache.remove(seed.getName());
this.seedActiveDB.delete(ASCII.getBytes(seed.hash));
this.seedPassiveDB.delete(ASCII.getBytes(seed.hash));
} catch (final Exception e) { Log.logWarning("yacySeedDB", "could not remove hash ("+ e.getClass() +"): "+ e.getMessage()); }
@ -637,17 +618,8 @@ public final class SeedDB implements AlternativeDomainNames {
return this.mySeed;
}
// then try to use the cache
peerName = peerName.toLowerCase();
final String seedhash = this.nameLookupCache.get(peerName);
Seed seed;
if (seedhash != null) {
seed = this.get(seedhash);
if (seed != null) {
//System.out.println("*** found lookupByName in cache: " + peerName);
return seed;
}
}
// enumerate the cache
String name = Seed.checkPeerName(peerName);
@ -659,7 +631,6 @@ public final class SeedDB implements AlternativeDomainNames {
if (entry == null) break;
seed = this.getConnected(ASCII.String(entry.getKey()));
if (seed == null) continue;
if (seed.isProper(false) == null) this.nameLookupCache.put(seed.getName().toLowerCase(), seed.hash);
//System.out.println("*** found lookupByName in seedActiveDB: " + peerName);
return seed;
}
@ -672,7 +643,6 @@ public final class SeedDB implements AlternativeDomainNames {
if (entry == null) break;
seed = this.getConnected(ASCII.String(entry.getKey()));
if (seed == null) continue;
if (seed.isProper(false) == null) this.nameLookupCache.put(seed.getName().toLowerCase(), seed.hash);
//System.out.println("*** found lookupByName in seedPassiveDB: " + peerName);
return seed;
}
@ -682,7 +652,6 @@ public final class SeedDB implements AlternativeDomainNames {
// check local seed
if (this.mySeed == null) initMySeed();
name = this.mySeed.getName().toLowerCase();
if (this.mySeed.isProper(false) == null) this.nameLookupCache.put(name, this.mySeed.hash);
if (name.equals(peerName)) return this.mySeed;
// nothing found
return null;
@ -705,16 +674,7 @@ public final class SeedDB implements AlternativeDomainNames {
}
// then try to use the cache
final SoftReference<Seed> ref = this.ipLookupCache.get(peerIP);
Seed seed = null;
if (ref != null) {
seed = ref.get();
if (seed != null) {
//System.out.println("*** found lookupByIP in cache: " + peerIP.toString() + " -> " + this.mySeed.getName());
return seed;
}
}
String ipString = peerIP.getHostAddress();
Map.Entry<byte[], Map<String, String>> entry;
@ -729,7 +689,6 @@ public final class SeedDB implements AlternativeDomainNames {
if (port > 0 && Integer.parseInt(p) != port) continue;
seed = this.getConnected(ASCII.String(entry.getKey()));
if (seed == null) continue;
this.ipLookupCache.put(peerIP, new SoftReference<Seed>(seed));
//System.out.println("*** found lookupByIP in connected: " + peerIP.toString() + " -> " + seed.getName());
return seed;
}
@ -748,7 +707,6 @@ public final class SeedDB implements AlternativeDomainNames {
if (port > 0 && Integer.parseInt(p) != port) continue;
seed = this.getDisconnected(ASCII.String(entry.getKey()));
if (seed == null) continue;
this.ipLookupCache.put(peerIP, new SoftReference<Seed>(seed));
//System.out.println("*** found lookupByIP in disconnected: " + peerIP.toString() + " -> " + seed.getName());
return seed;
}
@ -767,7 +725,6 @@ public final class SeedDB implements AlternativeDomainNames {
if (port > 0 && Integer.parseInt(p) != port) continue;
seed = this.getPotential(ASCII.String(entry.getKey()));
if (seed == null) continue;
this.ipLookupCache.put(peerIP, new SoftReference<Seed>(seed));
//System.out.println("*** found lookupByIP in potential: " + peerIP.toString() + " -> " + seed.getName());
return seed;
}

Loading…
Cancel
Save