better handling of RWI cache for concurrency and less overhead when writing new entries -> even more indexing speed

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5924 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent ae334c7b83
commit e6773cbb33

@ -105,13 +105,18 @@ public class ReferenceContainer<ReferenceType extends Reference> extends RowSet
return new ReferenceContainer<ReferenceType>(this.factory, this.termHash, super.merge(c)); return new ReferenceContainer<ReferenceType>(this.factory, this.termHash, super.merge(c));
} }
public Reference put(final Reference entry) { public Reference replace(final Reference entry) {
assert entry.toKelondroEntry().objectsize() == super.rowdef.objectsize; assert entry.toKelondroEntry().objectsize() == super.rowdef.objectsize;
final Row.Entry r = super.replace(entry.toKelondroEntry()); final Row.Entry r = super.replace(entry.toKelondroEntry());
if (r == null) return null; if (r == null) return null;
return new WordReferenceRow(r); return new WordReferenceRow(r);
} }
public void put(final Reference entry) {
assert entry.toKelondroEntry().objectsize() == super.rowdef.objectsize;
super.put(entry.toKelondroEntry());
}
public boolean putRecent(final Reference entry) { public boolean putRecent(final Reference entry) {
assert entry.toKelondroEntry().objectsize() == super.rowdef.objectsize; assert entry.toKelondroEntry().objectsize() == super.rowdef.objectsize;
// returns true if the new entry was added, false if it already existed // returns true if the new entry was added, false if it already existed

@ -29,13 +29,12 @@ package de.anomic.kelondro.text;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import de.anomic.kelondro.blob.HeapReader; import de.anomic.kelondro.blob.HeapReader;
import de.anomic.kelondro.blob.HeapWriter; import de.anomic.kelondro.blob.HeapWriter;
@ -87,7 +86,7 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
* another dump reading afterwards is not possible * another dump reading afterwards is not possible
*/ */
public void initWriteMode() { public void initWriteMode() {
this.cache = Collections.synchronizedMap(new HashMap<ByteArray, ReferenceContainer<ReferenceType>>()); this.cache = new ConcurrentHashMap<ByteArray, ReferenceContainer<ReferenceType>>();
} }
/** /**
@ -99,7 +98,7 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
Log.logInfo("indexContainerRAMHeap", "restoring rwi blob dump '" + blobFile.getName() + "'"); Log.logInfo("indexContainerRAMHeap", "restoring rwi blob dump '" + blobFile.getName() + "'");
final long start = System.currentTimeMillis(); final long start = System.currentTimeMillis();
//this.cache = Collections.synchronizedSortedMap(new TreeMap<byte[], ReferenceContainer<ReferenceType>>(this.termOrder)); //this.cache = Collections.synchronizedSortedMap(new TreeMap<byte[], ReferenceContainer<ReferenceType>>(this.termOrder));
this.cache = new HashMap<ByteArray, ReferenceContainer<ReferenceType>>(); this.cache = new ConcurrentHashMap<ByteArray, ReferenceContainer<ReferenceType>>();
int urlCount = 0; int urlCount = 0;
synchronized (cache) { synchronized (cache) {
for (final ReferenceContainer<ReferenceType> container : new blobFileEntries<ReferenceType>(blobFile, factory, this.payloadrow)) { for (final ReferenceContainer<ReferenceType> container : new blobFileEntries<ReferenceType>(blobFile, factory, this.payloadrow)) {
@ -245,7 +244,7 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
} }
} }
public synchronized int maxReferences() { public int maxReferences() {
// iterate to find the max score // iterate to find the max score
int max = 0; int max = 0;
for (ReferenceContainer<ReferenceType> container : cache.values()) { for (ReferenceContainer<ReferenceType> container : cache.values()) {
@ -254,7 +253,7 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
return max; return max;
} }
public synchronized byte[] maxReferencesHash() { public byte[] maxReferencesHash() {
// iterate to find the max score // iterate to find the max score
int max = 0; int max = 0;
byte[] hash = null; byte[] hash = null;
@ -267,7 +266,7 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
return hash; return hash;
} }
public synchronized ArrayList<byte[]> maxReferencesHash(int bound) { public ArrayList<byte[]> maxReferencesHash(int bound) {
// iterate to find the max score // iterate to find the max score
ArrayList<byte[]> hashes = new ArrayList<byte[]>(); ArrayList<byte[]> hashes = new ArrayList<byte[]>();
for (ReferenceContainer<ReferenceType> container : cache.values()) { for (ReferenceContainer<ReferenceType> container : cache.values()) {
@ -278,7 +277,7 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
return hashes; return hashes;
} }
public synchronized ReferenceContainer<ReferenceType> latest() { public ReferenceContainer<ReferenceType> latest() {
ReferenceContainer<ReferenceType> c = null; ReferenceContainer<ReferenceType> c = null;
for (ReferenceContainer<ReferenceType> container : cache.values()) { for (ReferenceContainer<ReferenceType> container : cache.values()) {
if (c == null) {c = container; continue;} if (c == null) {c = container; continue;}
@ -287,7 +286,7 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
return c; return c;
} }
public synchronized ReferenceContainer<ReferenceType> first() { public ReferenceContainer<ReferenceType> first() {
ReferenceContainer<ReferenceType> c = null; ReferenceContainer<ReferenceType> c = null;
for (ReferenceContainer<ReferenceType> container : cache.values()) { for (ReferenceContainer<ReferenceType> container : cache.values()) {
if (c == null) {c = container; continue;} if (c == null) {c = container; continue;}
@ -296,7 +295,7 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
return c; return c;
} }
public synchronized ArrayList<byte[]> overAge(long maxage) { public ArrayList<byte[]> overAge(long maxage) {
ArrayList<byte[]> hashes = new ArrayList<byte[]>(); ArrayList<byte[]> hashes = new ArrayList<byte[]>();
long limit = System.currentTimeMillis() - maxage; long limit = System.currentTimeMillis() - maxage;
for (ReferenceContainer<ReferenceType> container : cache.values()) { for (ReferenceContainer<ReferenceType> container : cache.values()) {
@ -426,42 +425,46 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
* @param wordHash * @param wordHash
* @return the indexContainer if the cache contained the container, null othervise * @return the indexContainer if the cache contained the container, null othervise
*/ */
public synchronized ReferenceContainer<ReferenceType> delete(final byte[] termHash) { public ReferenceContainer<ReferenceType> delete(final byte[] termHash) {
// returns the index that had been deleted // returns the index that had been deleted
assert this.cache != null; assert this.cache != null;
return cache.remove(new ByteArray(termHash)); return cache.remove(new ByteArray(termHash));
} }
public synchronized boolean remove(final byte[] termHash, final String urlHash) { public boolean remove(final byte[] termHash, final String urlHash) {
assert this.cache != null; assert this.cache != null;
ByteArray tha = new ByteArray(termHash); ByteArray tha = new ByteArray(termHash);
final ReferenceContainer<ReferenceType> c = cache.get(tha); synchronized (cache) {
if ((c != null) && (c.remove(urlHash) != null)) { final ReferenceContainer<ReferenceType> c = cache.get(tha);
// removal successful if ((c != null) && (c.remove(urlHash) != null)) {
if (c.size() == 0) { // removal successful
delete(termHash); if (c.size() == 0) {
} else { delete(termHash);
cache.put(tha, c); } else {
} cache.put(tha, c);
return true; }
return true;
}
} }
return false; return false;
} }
public synchronized int remove(final byte[] termHash, final Set<String> urlHashes) { public int remove(final byte[] termHash, final Set<String> urlHashes) {
assert this.cache != null; assert this.cache != null;
if (urlHashes.size() == 0) return 0; if (urlHashes.size() == 0) return 0;
ByteArray tha = new ByteArray(termHash); ByteArray tha = new ByteArray(termHash);
final ReferenceContainer<ReferenceType> c = cache.get(tha);
int count; int count;
if ((c != null) && ((count = c.removeEntries(urlHashes)) > 0)) { synchronized (cache) {
// removal successful final ReferenceContainer<ReferenceType> c = cache.get(tha);
if (c.size() == 0) { if ((c != null) && ((count = c.removeEntries(urlHashes)) > 0)) {
delete(termHash); // removal successful
} else { if (c.size() == 0) {
cache.put(tha, c); delete(termHash);
} } else {
return count; cache.put(tha, c);
}
return count;
}
} }
return 0; return 0;
} }
@ -473,9 +476,9 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
// put new words into cache // put new words into cache
ByteArray tha = new ByteArray(container.getTermHash()); ByteArray tha = new ByteArray(container.getTermHash());
synchronized (this) { int added = 0;
synchronized (cache) {
ReferenceContainer<ReferenceType> entries = cache.get(tha); // null pointer exception? wordhash != null! must be cache==null ReferenceContainer<ReferenceType> entries = cache.get(tha); // null pointer exception? wordhash != null! must be cache==null
int added = 0;
if (entries == null) { if (entries == null) {
entries = container.topLevelClone(); entries = container.topLevelClone();
added = entries.size(); added = entries.size();
@ -493,11 +496,27 @@ public final class ReferenceContainerCache<ReferenceType extends Reference> exte
public void add(final byte[] termHash, final ReferenceType newEntry) { public void add(final byte[] termHash, final ReferenceType newEntry) {
assert this.cache != null; assert this.cache != null;
ByteArray tha = new ByteArray(termHash); ByteArray tha = new ByteArray(termHash);
synchronized (this) {
ReferenceContainer<ReferenceType> container = cache.get(tha); // first synchronization: check if the entry is empty, and quickly set the entry
if (container == null) container = new ReferenceContainer<ReferenceType>(factory, termHash, this.payloadrow, 1); ReferenceContainer<ReferenceType> container = null;
container.put(newEntry); synchronized (cache) {
cache.put(tha, container); container = cache.remove(tha);
if (container == null) {
container = new ReferenceContainer<ReferenceType>(factory, termHash, this.payloadrow, 1);
container.put(newEntry);
cache.put(tha, container);
return;
}
}
// if the entry must be merged, first release the synchronization to do the merge concurrently
container.put(newEntry);
// then get a new lock. If the entry was written in the meantime, merge again
synchronized (cache) {
ReferenceContainer<ReferenceType> containerNew = cache.get(tha);
if (container != null) container.putAllRecent(containerNew);
cache.put(tha, container);
} }
} }

Loading…
Cancel
Save