removed the semaphores from the cache dump process because I believe some of the semaphores may be lost somewhere which then causes that the cache is never flushed and then the peer dies from a OOM. The re-introduced synchronization may not be the best solution but should ensure that the caches are flushed.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@7802 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 14 years ago
parent 6d9e5865ee
commit 267290a821

@ -9,7 +9,7 @@
// $LastChangedBy$
//
// LICENSE
//
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
@ -31,7 +31,6 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.Semaphore;
import net.yacy.cora.storage.ComparableARC;
import net.yacy.kelondro.data.meta.URIMetadataRow;
@ -61,7 +60,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
private static final long cleanupCycle = 60000;
private static final long dumpCycle = 600000;
// class variables
private final ReferenceContainerArray<ReferenceType> array;
private ReferenceContainerCache<ReferenceType> ram;
@ -71,11 +70,9 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
private long lastCleanup, lastDump;
private final long targetFileSize, maxFileSize;
private final int writeBufferSize;
private Semaphore dumperSemaphore = new Semaphore(1);
private Semaphore cleanerSemaphore = new Semaphore(1);
private final Map<byte[], HandleSet> removeDelayedURLs; // mapping from word hashes to a list of url hashes
public IndexCell(
final File cellPath,
final String prefix,
@ -85,11 +82,11 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
final int maxRamEntries,
final long targetFileSize,
final long maxFileSize,
IODispatcher merger,
int writeBufferSize
final IODispatcher merger,
final int writeBufferSize
) throws IOException {
super(factory);
this.array = new ReferenceContainerArray<ReferenceType>(cellPath, prefix, factory, termOrder, termSize, merger);
this.ram = new ReferenceContainerCache<ReferenceType>(factory, termOrder, termSize);
this.countCache = new ComparableARC<byte[], Integer>(1000, termOrder);
@ -103,7 +100,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
this.removeDelayedURLs = new TreeMap<byte[], HandleSet>(URIMetadataRow.rowdef.objectOrder);
}
/*
* methods to implement Index
*/
@ -115,37 +112,37 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
public int termKeyLength() {
return this.ram.termKeyLength();
}
/**
* add entries to the cell: this adds the new entries always to the RAM part, never to BLOBs
* @throws IOException
* @throws IOException
* @throws RowSpaceExceededException
*/
public void add(ReferenceContainer<ReferenceType> newEntries) throws IOException, RowSpaceExceededException {
public void add(final ReferenceContainer<ReferenceType> newEntries) throws IOException, RowSpaceExceededException {
try {
this.ram.add(newEntries);
long t = System.currentTimeMillis();
final long t = System.currentTimeMillis();
if (this.ram.size() % 1000 == 0 || this.lastCleanup + cleanupCycle < t || this.lastDump + dumpCycle < t) {
EventTracker.update(EventTracker.EClass.WORDCACHE, Long.valueOf(this.ram.size()), true);
cleanCache();
}
} catch (RowSpaceExceededException e) {
} catch (final RowSpaceExceededException e) {
EventTracker.update(EventTracker.EClass.WORDCACHE, Long.valueOf(this.ram.size()), true);
cleanCache();
this.ram.add(newEntries);
}
}
public void add(byte[] termHash, ReferenceType entry) throws IOException, RowSpaceExceededException {
public void add(final byte[] termHash, final ReferenceType entry) throws IOException, RowSpaceExceededException {
try {
this.ram.add(termHash, entry);
long t = System.currentTimeMillis();
final long t = System.currentTimeMillis();
if (this.ram.size() % 1000 == 0 || this.lastCleanup + cleanupCycle < t || this.lastDump + dumpCycle < t) {
EventTracker.update(EventTracker.EClass.WORDCACHE, Long.valueOf(this.ram.size()), true);
cleanCache();
}
} catch (RowSpaceExceededException e) {
} catch (final RowSpaceExceededException e) {
EventTracker.update(EventTracker.EClass.WORDCACHE, Long.valueOf(this.ram.size()), true);
cleanCache();
this.ram.add(termHash, entry);
@ -155,7 +152,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
/**
* checks if there is any container for this termHash, either in RAM or any BLOB
*/
public boolean has(byte[] termHash) {
public boolean has(final byte[] termHash) {
if (this.ram.has(termHash)) return true;
return this.array.has(termHash);
}
@ -164,26 +161,26 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
* count number of references for a given term
* this method may cause strong IO load if called too frequently.
*/
public int count(byte[] termHash) {
Integer cachedCount = this.countCache.get(termHash);
public int count(final byte[] termHash) {
final Integer cachedCount = this.countCache.get(termHash);
if (cachedCount != null) return cachedCount.intValue();
int countFile = 0;
// read fresh values from file
try {
countFile = this.array.count(termHash);
} catch (Exception e) {
} catch (final Exception e) {
Log.logException(e);
}
assert countFile >= 0;
// count from container in ram
ReferenceContainer<ReferenceType> countRam = this.ram.get(termHash, null);
final ReferenceContainer<ReferenceType> countRam = this.ram.get(termHash, null);
assert countRam == null || countRam.size() >= 0;
int c = countRam == null ? countFile : countFile + countRam.size();
// exclude entries from delayed remove
synchronized (this.removeDelayedURLs) {
HandleSet s = this.removeDelayedURLs.get(termHash);
final HandleSet s = this.removeDelayedURLs.get(termHash);
if (s != null) c -= s.size();
if (c < 0) c = 0;
}
@ -192,7 +189,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
this.countCache.insert(termHash, c);
return c;
}
/**
* all containers in the BLOBs and the RAM are merged and returned.
* Please be aware that the returned values may be top-level cloned ReferenceContainers or direct links to containers
@ -200,23 +197,23 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
* @throws IOException
* @return a container with merged ReferenceContainer from RAM and the file array or null if there is no data to be returned
*/
public ReferenceContainer<ReferenceType> get(byte[] termHash, HandleSet urlselection) throws IOException {
ReferenceContainer<ReferenceType> c0 = this.ram.get(termHash, null);
public ReferenceContainer<ReferenceType> get(final byte[] termHash, final HandleSet urlselection) throws IOException {
final ReferenceContainer<ReferenceType> c0 = this.ram.get(termHash, null);
ReferenceContainer<ReferenceType> c1 = null;
try {
c1 = this.array.get(termHash);
} catch (RowSpaceExceededException e2) {
} catch (final RowSpaceExceededException e2) {
Log.logException(e2);
}
ReferenceContainer<ReferenceType> result = null;
if (c0 != null && c1 != null) {
try {
result = c1.merge(c0);
} catch (RowSpaceExceededException e) {
} catch (final RowSpaceExceededException e) {
// try to free some ram
try {
result = c1.merge(c0);
} catch (RowSpaceExceededException e1) {
} catch (final RowSpaceExceededException e1) {
// go silently over the problem
result = (c1.size() > c0.size()) ? c1: c0;
}
@ -229,7 +226,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
if (result == null) return null;
// remove the failed urls
synchronized (this.removeDelayedURLs) {
HandleSet s = this.removeDelayedURLs.get(termHash);
final HandleSet s = this.removeDelayedURLs.get(termHash);
if (s != null) result.removeEntries(s);
}
return result;
@ -238,39 +235,39 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
/**
* deleting a container affects the containers in RAM and all the BLOB files
* the deleted containers are merged and returned as result of the method
* @throws IOException
* @throws IOException
*/
public ReferenceContainer<ReferenceType> delete(byte[] termHash) throws IOException {
public ReferenceContainer<ReferenceType> delete(final byte[] termHash) throws IOException {
removeDelayed();
ReferenceContainer<ReferenceType> c1 = null;
try {
c1 = this.array.get(termHash);
} catch (RowSpaceExceededException e2) {
} catch (final RowSpaceExceededException e2) {
Log.logException(e2);
}
if (c1 != null) {
this.array.delete(termHash);
}
ReferenceContainer<ReferenceType> c0 = this.ram.delete(termHash);
final ReferenceContainer<ReferenceType> c0 = this.ram.delete(termHash);
cleanCache();
if (c1 == null) return c0;
if (c0 == null) return c1;
try {
return c1.merge(c0);
} catch (RowSpaceExceededException e) {
} catch (final RowSpaceExceededException e) {
// try to free some ram
try {
return c1.merge(c0);
} catch (RowSpaceExceededException e1) {
} catch (final RowSpaceExceededException e1) {
// go silently over the problem
return (c1.size() > c0.size()) ? c1: c0;
}
}
}
public void removeDelayed(byte[] termHash, HandleSet urlHashes) {
public void removeDelayed(final byte[] termHash, final HandleSet urlHashes) {
HandleSet r;
synchronized (removeDelayedURLs) {
synchronized (this.removeDelayedURLs) {
r = this.removeDelayedURLs.get(termHash);
}
if (r == null) {
@ -278,18 +275,18 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
}
try {
r.putAll(urlHashes);
} catch (RowSpaceExceededException e) {
try {remove(termHash, urlHashes);} catch (IOException e1) {}
} catch (final RowSpaceExceededException e) {
try {remove(termHash, urlHashes);} catch (final IOException e1) {}
return;
}
synchronized (removeDelayedURLs) {
synchronized (this.removeDelayedURLs) {
this.removeDelayedURLs.put(termHash, r);
}
}
public void removeDelayed(byte[] termHash, byte[] urlHashBytes) {
public void removeDelayed(final byte[] termHash, final byte[] urlHashBytes) {
HandleSet r;
synchronized (removeDelayedURLs) {
synchronized (this.removeDelayedURLs) {
r = this.removeDelayedURLs.get(termHash);
}
if (r == null) {
@ -297,45 +294,45 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
}
try {
r.put(urlHashBytes);
} catch (RowSpaceExceededException e) {
try {remove(termHash, urlHashBytes);} catch (IOException e1) {}
} catch (final RowSpaceExceededException e) {
try {remove(termHash, urlHashBytes);} catch (final IOException e1) {}
return;
}
synchronized (removeDelayedURLs) {
synchronized (this.removeDelayedURLs) {
this.removeDelayedURLs.put(termHash, r);
}
}
public void removeDelayed() throws IOException {
HandleSet words = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0); // a set of url hashes where a worker thread tried to work on, but failed.
synchronized (removeDelayedURLs) {
for (byte[] b: removeDelayedURLs.keySet()) try {words.put(b);} catch (RowSpaceExceededException e) {}
final HandleSet words = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0); // a set of url hashes where a worker thread tried to work on, but failed.
synchronized (this.removeDelayedURLs) {
for (final byte[] b: this.removeDelayedURLs.keySet()) try {words.put(b);} catch (final RowSpaceExceededException e) {}
}
synchronized (removeDelayedURLs) {
for (byte[] b: words) {
HandleSet urls = removeDelayedURLs.remove(b);
synchronized (this.removeDelayedURLs) {
for (final byte[] b: words) {
final HandleSet urls = this.removeDelayedURLs.remove(b);
if (urls != null) remove(b, urls);
}
}
this.countCache.clear();
}
/**
* remove url references from a selected word hash. this deletes also in the BLOB
* files, which means that there exists new gap entries after the deletion
* The gaps are never merged in place, but can be eliminated when BLOBs are merged into
* new BLOBs. This returns the sum of all url references that have been removed
* @throws IOException
* @throws IOException
*/
public int remove(byte[] termHash, HandleSet urlHashes) throws IOException {
public int remove(final byte[] termHash, final HandleSet urlHashes) throws IOException {
this.countCache.remove(termHash);
int removed = this.ram.remove(termHash, urlHashes);
final int removed = this.ram.remove(termHash, urlHashes);
int reduced;
//final long am = this.array.mem();
try {
reduced = this.array.reduce(termHash, new RemoveReducer<ReferenceType>(urlHashes));
} catch (RowSpaceExceededException e) {
} catch (final RowSpaceExceededException e) {
reduced = 0;
Log.logWarning("IndexCell", "not possible to remove urlHashes from a RWI because of too low memory. Remove was not applied. Please increase RAM assignment");
}
@ -343,14 +340,14 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
return removed + (reduced / this.array.rowdef().objectsize);
}
public boolean remove(byte[] termHash, byte[] urlHashBytes) throws IOException {
public boolean remove(final byte[] termHash, final byte[] urlHashBytes) throws IOException {
this.countCache.remove(termHash);
boolean removed = this.ram.remove(termHash, urlHashBytes);
final boolean removed = this.ram.remove(termHash, urlHashBytes);
int reduced;
//final long am = this.array.mem();
try {
reduced = this.array.reduce(termHash, new RemoveReducer<ReferenceType>(urlHashBytes));
} catch (RowSpaceExceededException e) {
} catch (final RowSpaceExceededException e) {
reduced = 0;
Log.logWarning("IndexCell", "not possible to remove urlHashes from a RWI because of too low memory. Remove was not applied. Please increase RAM assignment");
}
@ -359,37 +356,37 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
}
private static class RemoveReducer<ReferenceType extends Reference> implements ReferenceContainerArray.ContainerReducer<ReferenceType> {
HandleSet urlHashes;
public RemoveReducer(HandleSet urlHashes) {
public RemoveReducer(final HandleSet urlHashes) {
this.urlHashes = urlHashes;
}
public RemoveReducer(byte[] urlHashBytes) {
public RemoveReducer(final byte[] urlHashBytes) {
this.urlHashes = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0);
try {
this.urlHashes.put(urlHashBytes);
} catch (RowSpaceExceededException e) {
} catch (final RowSpaceExceededException e) {
Log.logException(e);
}
}
public ReferenceContainer<ReferenceType> reduce(ReferenceContainer<ReferenceType> container) {
public ReferenceContainer<ReferenceType> reduce(final ReferenceContainer<ReferenceType> container) {
container.sort();
container.removeEntries(urlHashes);
container.removeEntries(this.urlHashes);
return container;
}
}
public Iterator<ReferenceContainer<ReferenceType>> iterator() {
return references(null, false);
}
public CloneableIterator<ReferenceContainer<ReferenceType>> references(byte[] starttermHash, boolean rot) {
final Order<ReferenceContainer<ReferenceType>> containerOrder = new ReferenceContainerOrder<ReferenceType>(factory, this.ram.rowdef().getOrdering().clone());
containerOrder.rotate(new ReferenceContainer<ReferenceType>(factory, starttermHash));
}
public CloneableIterator<ReferenceContainer<ReferenceType>> references(final byte[] starttermHash, final boolean rot) {
final Order<ReferenceContainer<ReferenceType>> containerOrder = new ReferenceContainerOrder<ReferenceType>(this.factory, this.ram.rowdef().getOrdering().clone());
containerOrder.rotate(new ReferenceContainer<ReferenceType>(this.factory, starttermHash));
return new MergeIterator<ReferenceContainer<ReferenceType>>(
this.ram.references(starttermHash, rot),
new MergeIterator<ReferenceContainer<ReferenceType>>(
@ -403,9 +400,9 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
true);
}
public CloneableIterator<ReferenceContainer<ReferenceType>> references(byte[] startTermHash, boolean rot, boolean ram) {
final Order<ReferenceContainer<ReferenceType>> containerOrder = new ReferenceContainerOrder<ReferenceType>(factory, this.ram.rowdef().getOrdering().clone());
containerOrder.rotate(new ReferenceContainer<ReferenceType>(factory, startTermHash));
public CloneableIterator<ReferenceContainer<ReferenceType>> references(final byte[] startTermHash, final boolean rot, final boolean ram) {
final Order<ReferenceContainer<ReferenceType>> containerOrder = new ReferenceContainerOrder<ReferenceType>(this.factory, this.ram.rowdef().getOrdering().clone());
containerOrder.rotate(new ReferenceContainer<ReferenceType>(this.factory, startTermHash));
if (ram) {
return this.ram.references(startTermHash, rot);
}
@ -419,7 +416,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
/**
* clear the RAM and BLOB part, deletes everything in the cell
* @throws IOException
* @throws IOException
*/
public synchronized void clear() throws IOException {
this.countCache.clear();
@ -427,7 +424,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
this.ram.clear();
this.array.clear();
}
/**
* when a cell is closed, the current RAM is dumped to a file which will be opened as
* BLOB file the next time a cell is opened. A name for the dump is automatically generated
@ -435,8 +432,8 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
*/
public synchronized void close() {
this.countCache.clear();
try {removeDelayed();} catch (IOException e) {}
if (!this.ram.isEmpty()) this.ram.dump(this.array.newContainerBLOBFile(), (int) Math.min(MemoryControl.available() / 3, writeBufferSize), true);
try {removeDelayed();} catch (final IOException e) {}
if (!this.ram.isEmpty()) this.ram.dump(this.array.newContainerBLOBFile(), (int) Math.min(MemoryControl.available() / 3, this.writeBufferSize), true);
// close all
this.ram.close();
this.array.close();
@ -447,17 +444,18 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
}
public int[] sizes() {
int[] as = this.array.sizes();
int[] asr = new int[as.length + 1];
final int[] as = this.array.sizes();
final int[] asr = new int[as.length + 1];
System.arraycopy(as, 0, asr, 0, as.length);
asr[as.length] = this.ram.size();
return asr;
}
public int sizesMax() {
int m = 0;
int[] s = sizes();
for (int i = 0; i < s.length; i++) if (s[i] > m) m = s[i];
final int[] s = sizes();
for (final int element : s)
if (element > m) m = element;
return m;
}
@ -468,82 +466,71 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
public ByteOrder termKeyOrdering() {
return this.array.ordering();
}
/*
* cache control methods
*/
private void cleanCache() {
// dump the cache if necessary
long t = System.currentTimeMillis();
if (this.dumperSemaphore.availablePermits() > 0 &&
(this.ram.size() >= this.maxRamEntries ||
(this.ram.size() > 3000 && !MemoryControl.request(80L * 1024L * 1024L, false)) ||
(this.ram.size() > 3000 && this.lastDump + dumpCycle < t))) {
int termSize = this.ram.termKeyLength();
ByteOrder termOrder = this.ram.termKeyOrdering();
try {
this.dumperSemaphore.acquire(); // only one may pass
final long t = System.currentTimeMillis();
if ((this.ram.size() >= this.maxRamEntries ||
(this.ram.size() > 3000 && !MemoryControl.request(80L * 1024L * 1024L, false)) ||
(this.ram.size() > 0 && this.lastDump + dumpCycle < t))) {
synchronized (this.merger) {
if (this.ram.size() >= this.maxRamEntries ||
(this.ram.size() > 3000 && !MemoryControl.request(80L * 1024L * 1024L, false)) ||
(this.ram.size() > 0 && this.lastDump + dumpCycle < t)) try {
this.lastDump = System.currentTimeMillis();
// removed delayed
try {removeDelayed();} catch (IOException e) {}
try {removeDelayed();} catch (final IOException e) {}
// dump the ram
File dumpFile = this.array.newContainerBLOBFile();
final File dumpFile = this.array.newContainerBLOBFile();
// a critical point: when the ram is handed to the dump job,
// don't write into it any more. Use a fresh one instead
ReferenceContainerCache<ReferenceType> ramdump;
final ByteOrder termOrder = this.ram.termKeyOrdering();
final int termSize = this.ram.termKeyLength();
synchronized (this) {
ramdump = this.ram;
// get a fresh ram cache
this.ram = new ReferenceContainerCache<ReferenceType>(factory, termOrder, termSize);
this.ram = new ReferenceContainerCache<ReferenceType>(this.factory, termOrder, termSize);
}
// dump the buffer
merger.dump(ramdump, dumpFile, array);
this.merger.dump(ramdump, dumpFile, this.array);
this.lastDump = System.currentTimeMillis();
} catch (Exception e) {
// catch all exceptions to prevent that no semaphore is released
} catch (final Exception e) {
// catch all exceptions
Log.logException(e);
}
this.dumperSemaphore.release();
} catch (InterruptedException e) {
Log.logException(e);
}
}
// clean-up the cache
if (this.cleanerSemaphore.availablePermits() > 0 &&
(this.array.entries() > 50 ||
if ((this.array.entries() > 50 ||
this.lastCleanup + cleanupCycle < t)) {
try {
this.cleanerSemaphore.acquire();
synchronized (this.array) {
if (this.array.entries() > 50 || (this.lastCleanup + cleanupCycle < System.currentTimeMillis())) try {
this.lastCleanup = System.currentTimeMillis(); // set time to prevent that this is called to soo again
this.lastCleanup = System.currentTimeMillis(); // set time to prevent that this is called to soon again
this.array.shrink(this.targetFileSize, this.maxFileSize);
this.lastCleanup = System.currentTimeMillis(); // set again to mark end of procedure
} catch (Exception e) {
// catch all exceptions to prevent that no semaphore is released
} catch (final Exception e) {
// catch all exceptions
Log.logException(e);
} finally {
this.cleanerSemaphore.release();
}
} catch (InterruptedException e) {
Log.logException(e);
}
}
}
public File newContainerBLOBFile() {
// for migration of cache files
return this.array.newContainerBLOBFile();
}
public void mountBLOBFile(File blobFile) throws IOException {
public void mountBLOBFile(final File blobFile) throws IOException {
// for migration of cache files
this.array.mountBLOBFile(blobFile);
}
@ -568,7 +555,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
return 10000 * this.ram.size(); // guessed; we don't know that exactly because there is no statistics here (expensive, not necessary)
}
public void setBufferMaxWordCount(int maxWords) {
public void setBufferMaxWordCount(final int maxWords) {
this.maxRamEntries = maxWords;
this.cleanCache();
}

Loading…
Cancel
Save