From 267290a8216a5caa2c3258c6cdb1d2510446eb8a Mon Sep 17 00:00:00 2001 From: orbiter Date: Sun, 26 Jun 2011 21:45:04 +0000 Subject: [PATCH] removed the semaphores from the cache dump process because I believe some of the semaphores may be lost somewhere which then causes that the cache is never flushed and then the peer dies from a OOM. The re-introduced synchronization may not be the best solution but should ensure that the caches are flushed. git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@7802 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/net/yacy/kelondro/rwi/IndexCell.java | 257 ++++++++++---------- 1 file changed, 122 insertions(+), 135 deletions(-) diff --git a/source/net/yacy/kelondro/rwi/IndexCell.java b/source/net/yacy/kelondro/rwi/IndexCell.java index 03c47ae93..863c3456b 100644 --- a/source/net/yacy/kelondro/rwi/IndexCell.java +++ b/source/net/yacy/kelondro/rwi/IndexCell.java @@ -9,7 +9,7 @@ // $LastChangedBy$ // // LICENSE -// +// // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or @@ -31,7 +31,6 @@ import java.io.IOException; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.Semaphore; import net.yacy.cora.storage.ComparableARC; import net.yacy.kelondro.data.meta.URIMetadataRow; @@ -61,7 +60,7 @@ public final class IndexCell extends AbstractBu private static final long cleanupCycle = 60000; private static final long dumpCycle = 600000; - + // class variables private final ReferenceContainerArray array; private ReferenceContainerCache ram; @@ -71,11 +70,9 @@ public final class IndexCell extends AbstractBu private long lastCleanup, lastDump; private final long targetFileSize, maxFileSize; private final int writeBufferSize; - private Semaphore dumperSemaphore = new Semaphore(1); - private Semaphore cleanerSemaphore = new Semaphore(1); private final Map removeDelayedURLs; // mapping from word hashes to a list of url hashes - - + + public IndexCell( final File cellPath, final String prefix, @@ -85,11 +82,11 @@ public final class IndexCell extends AbstractBu final int maxRamEntries, final long targetFileSize, final long maxFileSize, - IODispatcher merger, - int writeBufferSize + final IODispatcher merger, + final int writeBufferSize ) throws IOException { super(factory); - + this.array = new ReferenceContainerArray(cellPath, prefix, factory, termOrder, termSize, merger); this.ram = new ReferenceContainerCache(factory, termOrder, termSize); this.countCache = new ComparableARC(1000, termOrder); @@ -103,7 +100,7 @@ public final class IndexCell extends AbstractBu this.removeDelayedURLs = new TreeMap(URIMetadataRow.rowdef.objectOrder); } - + /* * methods to implement Index */ @@ -115,37 +112,37 @@ public final class IndexCell extends AbstractBu public int termKeyLength() { return this.ram.termKeyLength(); } - + /** * add entries to the cell: this adds the new entries always to the RAM part, never to BLOBs - * @throws IOException + * @throws IOException * @throws RowSpaceExceededException */ - public void add(ReferenceContainer newEntries) throws IOException, RowSpaceExceededException { + public void add(final ReferenceContainer newEntries) throws IOException, RowSpaceExceededException { try { this.ram.add(newEntries); - long t = System.currentTimeMillis(); + final long t = System.currentTimeMillis(); if (this.ram.size() % 1000 == 0 || this.lastCleanup + cleanupCycle < t || this.lastDump + dumpCycle < t) { EventTracker.update(EventTracker.EClass.WORDCACHE, Long.valueOf(this.ram.size()), true); cleanCache(); } - } catch (RowSpaceExceededException e) { + } catch (final RowSpaceExceededException e) { EventTracker.update(EventTracker.EClass.WORDCACHE, Long.valueOf(this.ram.size()), true); cleanCache(); this.ram.add(newEntries); } - + } - public void add(byte[] termHash, ReferenceType entry) throws IOException, RowSpaceExceededException { + public void add(final byte[] termHash, final ReferenceType entry) throws IOException, RowSpaceExceededException { try { this.ram.add(termHash, entry); - long t = System.currentTimeMillis(); + final long t = System.currentTimeMillis(); if (this.ram.size() % 1000 == 0 || this.lastCleanup + cleanupCycle < t || this.lastDump + dumpCycle < t) { EventTracker.update(EventTracker.EClass.WORDCACHE, Long.valueOf(this.ram.size()), true); cleanCache(); } - } catch (RowSpaceExceededException e) { + } catch (final RowSpaceExceededException e) { EventTracker.update(EventTracker.EClass.WORDCACHE, Long.valueOf(this.ram.size()), true); cleanCache(); this.ram.add(termHash, entry); @@ -155,7 +152,7 @@ public final class IndexCell extends AbstractBu /** * checks if there is any container for this termHash, either in RAM or any BLOB */ - public boolean has(byte[] termHash) { + public boolean has(final byte[] termHash) { if (this.ram.has(termHash)) return true; return this.array.has(termHash); } @@ -164,26 +161,26 @@ public final class IndexCell extends AbstractBu * count number of references for a given term * this method may cause strong IO load if called too frequently. */ - public int count(byte[] termHash) { - Integer cachedCount = this.countCache.get(termHash); + public int count(final byte[] termHash) { + final Integer cachedCount = this.countCache.get(termHash); if (cachedCount != null) return cachedCount.intValue(); - + int countFile = 0; // read fresh values from file try { countFile = this.array.count(termHash); - } catch (Exception e) { + } catch (final Exception e) { Log.logException(e); } assert countFile >= 0; - + // count from container in ram - ReferenceContainer countRam = this.ram.get(termHash, null); + final ReferenceContainer countRam = this.ram.get(termHash, null); assert countRam == null || countRam.size() >= 0; int c = countRam == null ? countFile : countFile + countRam.size(); // exclude entries from delayed remove synchronized (this.removeDelayedURLs) { - HandleSet s = this.removeDelayedURLs.get(termHash); + final HandleSet s = this.removeDelayedURLs.get(termHash); if (s != null) c -= s.size(); if (c < 0) c = 0; } @@ -192,7 +189,7 @@ public final class IndexCell extends AbstractBu this.countCache.insert(termHash, c); return c; } - + /** * all containers in the BLOBs and the RAM are merged and returned. * Please be aware that the returned values may be top-level cloned ReferenceContainers or direct links to containers @@ -200,23 +197,23 @@ public final class IndexCell extends AbstractBu * @throws IOException * @return a container with merged ReferenceContainer from RAM and the file array or null if there is no data to be returned */ - public ReferenceContainer get(byte[] termHash, HandleSet urlselection) throws IOException { - ReferenceContainer c0 = this.ram.get(termHash, null); + public ReferenceContainer get(final byte[] termHash, final HandleSet urlselection) throws IOException { + final ReferenceContainer c0 = this.ram.get(termHash, null); ReferenceContainer c1 = null; try { c1 = this.array.get(termHash); - } catch (RowSpaceExceededException e2) { + } catch (final RowSpaceExceededException e2) { Log.logException(e2); } ReferenceContainer result = null; if (c0 != null && c1 != null) { try { result = c1.merge(c0); - } catch (RowSpaceExceededException e) { + } catch (final RowSpaceExceededException e) { // try to free some ram try { result = c1.merge(c0); - } catch (RowSpaceExceededException e1) { + } catch (final RowSpaceExceededException e1) { // go silently over the problem result = (c1.size() > c0.size()) ? c1: c0; } @@ -229,7 +226,7 @@ public final class IndexCell extends AbstractBu if (result == null) return null; // remove the failed urls synchronized (this.removeDelayedURLs) { - HandleSet s = this.removeDelayedURLs.get(termHash); + final HandleSet s = this.removeDelayedURLs.get(termHash); if (s != null) result.removeEntries(s); } return result; @@ -238,39 +235,39 @@ public final class IndexCell extends AbstractBu /** * deleting a container affects the containers in RAM and all the BLOB files * the deleted containers are merged and returned as result of the method - * @throws IOException + * @throws IOException */ - public ReferenceContainer delete(byte[] termHash) throws IOException { + public ReferenceContainer delete(final byte[] termHash) throws IOException { removeDelayed(); ReferenceContainer c1 = null; try { c1 = this.array.get(termHash); - } catch (RowSpaceExceededException e2) { + } catch (final RowSpaceExceededException e2) { Log.logException(e2); } if (c1 != null) { this.array.delete(termHash); } - ReferenceContainer c0 = this.ram.delete(termHash); + final ReferenceContainer c0 = this.ram.delete(termHash); cleanCache(); if (c1 == null) return c0; if (c0 == null) return c1; try { return c1.merge(c0); - } catch (RowSpaceExceededException e) { + } catch (final RowSpaceExceededException e) { // try to free some ram try { return c1.merge(c0); - } catch (RowSpaceExceededException e1) { + } catch (final RowSpaceExceededException e1) { // go silently over the problem return (c1.size() > c0.size()) ? c1: c0; } } } - - public void removeDelayed(byte[] termHash, HandleSet urlHashes) { + + public void removeDelayed(final byte[] termHash, final HandleSet urlHashes) { HandleSet r; - synchronized (removeDelayedURLs) { + synchronized (this.removeDelayedURLs) { r = this.removeDelayedURLs.get(termHash); } if (r == null) { @@ -278,18 +275,18 @@ public final class IndexCell extends AbstractBu } try { r.putAll(urlHashes); - } catch (RowSpaceExceededException e) { - try {remove(termHash, urlHashes);} catch (IOException e1) {} + } catch (final RowSpaceExceededException e) { + try {remove(termHash, urlHashes);} catch (final IOException e1) {} return; } - synchronized (removeDelayedURLs) { + synchronized (this.removeDelayedURLs) { this.removeDelayedURLs.put(termHash, r); } } - - public void removeDelayed(byte[] termHash, byte[] urlHashBytes) { + + public void removeDelayed(final byte[] termHash, final byte[] urlHashBytes) { HandleSet r; - synchronized (removeDelayedURLs) { + synchronized (this.removeDelayedURLs) { r = this.removeDelayedURLs.get(termHash); } if (r == null) { @@ -297,45 +294,45 @@ public final class IndexCell extends AbstractBu } try { r.put(urlHashBytes); - } catch (RowSpaceExceededException e) { - try {remove(termHash, urlHashBytes);} catch (IOException e1) {} + } catch (final RowSpaceExceededException e) { + try {remove(termHash, urlHashBytes);} catch (final IOException e1) {} return; } - synchronized (removeDelayedURLs) { + synchronized (this.removeDelayedURLs) { this.removeDelayedURLs.put(termHash, r); } } - + public void removeDelayed() throws IOException { - HandleSet words = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0); // a set of url hashes where a worker thread tried to work on, but failed. - synchronized (removeDelayedURLs) { - for (byte[] b: removeDelayedURLs.keySet()) try {words.put(b);} catch (RowSpaceExceededException e) {} + final HandleSet words = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0); // a set of url hashes where a worker thread tried to work on, but failed. + synchronized (this.removeDelayedURLs) { + for (final byte[] b: this.removeDelayedURLs.keySet()) try {words.put(b);} catch (final RowSpaceExceededException e) {} } - synchronized (removeDelayedURLs) { - for (byte[] b: words) { - HandleSet urls = removeDelayedURLs.remove(b); + synchronized (this.removeDelayedURLs) { + for (final byte[] b: words) { + final HandleSet urls = this.removeDelayedURLs.remove(b); if (urls != null) remove(b, urls); } } this.countCache.clear(); } - + /** * remove url references from a selected word hash. this deletes also in the BLOB * files, which means that there exists new gap entries after the deletion * The gaps are never merged in place, but can be eliminated when BLOBs are merged into * new BLOBs. This returns the sum of all url references that have been removed - * @throws IOException + * @throws IOException */ - public int remove(byte[] termHash, HandleSet urlHashes) throws IOException { + public int remove(final byte[] termHash, final HandleSet urlHashes) throws IOException { this.countCache.remove(termHash); - int removed = this.ram.remove(termHash, urlHashes); + final int removed = this.ram.remove(termHash, urlHashes); int reduced; //final long am = this.array.mem(); try { reduced = this.array.reduce(termHash, new RemoveReducer(urlHashes)); - } catch (RowSpaceExceededException e) { + } catch (final RowSpaceExceededException e) { reduced = 0; Log.logWarning("IndexCell", "not possible to remove urlHashes from a RWI because of too low memory. Remove was not applied. Please increase RAM assignment"); } @@ -343,14 +340,14 @@ public final class IndexCell extends AbstractBu return removed + (reduced / this.array.rowdef().objectsize); } - public boolean remove(byte[] termHash, byte[] urlHashBytes) throws IOException { + public boolean remove(final byte[] termHash, final byte[] urlHashBytes) throws IOException { this.countCache.remove(termHash); - boolean removed = this.ram.remove(termHash, urlHashBytes); + final boolean removed = this.ram.remove(termHash, urlHashBytes); int reduced; //final long am = this.array.mem(); try { reduced = this.array.reduce(termHash, new RemoveReducer(urlHashBytes)); - } catch (RowSpaceExceededException e) { + } catch (final RowSpaceExceededException e) { reduced = 0; Log.logWarning("IndexCell", "not possible to remove urlHashes from a RWI because of too low memory. Remove was not applied. Please increase RAM assignment"); } @@ -359,37 +356,37 @@ public final class IndexCell extends AbstractBu } private static class RemoveReducer implements ReferenceContainerArray.ContainerReducer { - + HandleSet urlHashes; - - public RemoveReducer(HandleSet urlHashes) { + + public RemoveReducer(final HandleSet urlHashes) { this.urlHashes = urlHashes; } - - public RemoveReducer(byte[] urlHashBytes) { + + public RemoveReducer(final byte[] urlHashBytes) { this.urlHashes = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0); try { this.urlHashes.put(urlHashBytes); - } catch (RowSpaceExceededException e) { + } catch (final RowSpaceExceededException e) { Log.logException(e); } } - - public ReferenceContainer reduce(ReferenceContainer container) { + + public ReferenceContainer reduce(final ReferenceContainer container) { container.sort(); - container.removeEntries(urlHashes); + container.removeEntries(this.urlHashes); return container; } - + } - + public Iterator> iterator() { return references(null, false); - } - - public CloneableIterator> references(byte[] starttermHash, boolean rot) { - final Order> containerOrder = new ReferenceContainerOrder(factory, this.ram.rowdef().getOrdering().clone()); - containerOrder.rotate(new ReferenceContainer(factory, starttermHash)); + } + + public CloneableIterator> references(final byte[] starttermHash, final boolean rot) { + final Order> containerOrder = new ReferenceContainerOrder(this.factory, this.ram.rowdef().getOrdering().clone()); + containerOrder.rotate(new ReferenceContainer(this.factory, starttermHash)); return new MergeIterator>( this.ram.references(starttermHash, rot), new MergeIterator>( @@ -403,9 +400,9 @@ public final class IndexCell extends AbstractBu true); } - public CloneableIterator> references(byte[] startTermHash, boolean rot, boolean ram) { - final Order> containerOrder = new ReferenceContainerOrder(factory, this.ram.rowdef().getOrdering().clone()); - containerOrder.rotate(new ReferenceContainer(factory, startTermHash)); + public CloneableIterator> references(final byte[] startTermHash, final boolean rot, final boolean ram) { + final Order> containerOrder = new ReferenceContainerOrder(this.factory, this.ram.rowdef().getOrdering().clone()); + containerOrder.rotate(new ReferenceContainer(this.factory, startTermHash)); if (ram) { return this.ram.references(startTermHash, rot); } @@ -419,7 +416,7 @@ public final class IndexCell extends AbstractBu /** * clear the RAM and BLOB part, deletes everything in the cell - * @throws IOException + * @throws IOException */ public synchronized void clear() throws IOException { this.countCache.clear(); @@ -427,7 +424,7 @@ public final class IndexCell extends AbstractBu this.ram.clear(); this.array.clear(); } - + /** * when a cell is closed, the current RAM is dumped to a file which will be opened as * BLOB file the next time a cell is opened. A name for the dump is automatically generated @@ -435,8 +432,8 @@ public final class IndexCell extends AbstractBu */ public synchronized void close() { this.countCache.clear(); - try {removeDelayed();} catch (IOException e) {} - if (!this.ram.isEmpty()) this.ram.dump(this.array.newContainerBLOBFile(), (int) Math.min(MemoryControl.available() / 3, writeBufferSize), true); + try {removeDelayed();} catch (final IOException e) {} + if (!this.ram.isEmpty()) this.ram.dump(this.array.newContainerBLOBFile(), (int) Math.min(MemoryControl.available() / 3, this.writeBufferSize), true); // close all this.ram.close(); this.array.close(); @@ -447,17 +444,18 @@ public final class IndexCell extends AbstractBu } public int[] sizes() { - int[] as = this.array.sizes(); - int[] asr = new int[as.length + 1]; + final int[] as = this.array.sizes(); + final int[] asr = new int[as.length + 1]; System.arraycopy(as, 0, asr, 0, as.length); asr[as.length] = this.ram.size(); return asr; } - + public int sizesMax() { int m = 0; - int[] s = sizes(); - for (int i = 0; i < s.length; i++) if (s[i] > m) m = s[i]; + final int[] s = sizes(); + for (final int element : s) + if (element > m) m = element; return m; } @@ -468,82 +466,71 @@ public final class IndexCell extends AbstractBu public ByteOrder termKeyOrdering() { return this.array.ordering(); } - - + + /* * cache control methods */ - + private void cleanCache() { - + // dump the cache if necessary - long t = System.currentTimeMillis(); - if (this.dumperSemaphore.availablePermits() > 0 && - (this.ram.size() >= this.maxRamEntries || - (this.ram.size() > 3000 && !MemoryControl.request(80L * 1024L * 1024L, false)) || - (this.ram.size() > 3000 && this.lastDump + dumpCycle < t))) { - int termSize = this.ram.termKeyLength(); - ByteOrder termOrder = this.ram.termKeyOrdering(); - try { - this.dumperSemaphore.acquire(); // only one may pass + final long t = System.currentTimeMillis(); + if ((this.ram.size() >= this.maxRamEntries || + (this.ram.size() > 3000 && !MemoryControl.request(80L * 1024L * 1024L, false)) || + (this.ram.size() > 0 && this.lastDump + dumpCycle < t))) { + synchronized (this.merger) { if (this.ram.size() >= this.maxRamEntries || (this.ram.size() > 3000 && !MemoryControl.request(80L * 1024L * 1024L, false)) || (this.ram.size() > 0 && this.lastDump + dumpCycle < t)) try { this.lastDump = System.currentTimeMillis(); // removed delayed - try {removeDelayed();} catch (IOException e) {} + try {removeDelayed();} catch (final IOException e) {} // dump the ram - File dumpFile = this.array.newContainerBLOBFile(); + final File dumpFile = this.array.newContainerBLOBFile(); // a critical point: when the ram is handed to the dump job, // don't write into it any more. Use a fresh one instead ReferenceContainerCache ramdump; + final ByteOrder termOrder = this.ram.termKeyOrdering(); + final int termSize = this.ram.termKeyLength(); synchronized (this) { ramdump = this.ram; // get a fresh ram cache - this.ram = new ReferenceContainerCache(factory, termOrder, termSize); + this.ram = new ReferenceContainerCache(this.factory, termOrder, termSize); } // dump the buffer - merger.dump(ramdump, dumpFile, array); + this.merger.dump(ramdump, dumpFile, this.array); this.lastDump = System.currentTimeMillis(); - } catch (Exception e) { - // catch all exceptions to prevent that no semaphore is released + } catch (final Exception e) { + // catch all exceptions Log.logException(e); } - this.dumperSemaphore.release(); - } catch (InterruptedException e) { - Log.logException(e); } } - + // clean-up the cache - if (this.cleanerSemaphore.availablePermits() > 0 && - (this.array.entries() > 50 || + if ((this.array.entries() > 50 || this.lastCleanup + cleanupCycle < t)) { - try { - this.cleanerSemaphore.acquire(); + synchronized (this.array) { if (this.array.entries() > 50 || (this.lastCleanup + cleanupCycle < System.currentTimeMillis())) try { - this.lastCleanup = System.currentTimeMillis(); // set time to prevent that this is called to soo again + this.lastCleanup = System.currentTimeMillis(); // set time to prevent that this is called to soon again this.array.shrink(this.targetFileSize, this.maxFileSize); this.lastCleanup = System.currentTimeMillis(); // set again to mark end of procedure - } catch (Exception e) { - // catch all exceptions to prevent that no semaphore is released + } catch (final Exception e) { + // catch all exceptions Log.logException(e); - } finally { - this.cleanerSemaphore.release(); } - } catch (InterruptedException e) { - Log.logException(e); } } } - - + + public File newContainerBLOBFile() { // for migration of cache files return this.array.newContainerBLOBFile(); } - - public void mountBLOBFile(File blobFile) throws IOException { + + public void mountBLOBFile(final File blobFile) throws IOException { // for migration of cache files this.array.mountBLOBFile(blobFile); } @@ -568,7 +555,7 @@ public final class IndexCell extends AbstractBu return 10000 * this.ram.size(); // guessed; we don't know that exactly because there is no statistics here (expensive, not necessary) } - public void setBufferMaxWordCount(int maxWords) { + public void setBufferMaxWordCount(final int maxWords) { this.maxRamEntries = maxWords; this.cleanCache(); }