// IndexCell.java // (C) 2009 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany // first published 1.3.2009 on http://yacy.net // // This is a part of YaCy, a peer-to-peer based web search engine // // $LastChangedDate$ // $LastChangedRevision$ // $LastChangedBy$ // // LICENSE // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA package net.yacy.kelondro.rwi; import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; import net.yacy.cora.order.ByteOrder; import net.yacy.cora.order.CloneableIterator; import net.yacy.cora.order.Order; import net.yacy.cora.sorting.Rating; import net.yacy.cora.storage.ComparableARC; import net.yacy.kelondro.data.meta.URIMetadataRow; import net.yacy.kelondro.index.HandleSet; import net.yacy.kelondro.index.RowSpaceExceededException; import net.yacy.kelondro.logging.Log; import net.yacy.kelondro.order.MergeIterator; import net.yacy.kelondro.util.EventTracker; import net.yacy.kelondro.util.MemoryControl; /* * an index cell is a part of the horizontal index in the new segment-oriented index * data structure of YaCy. If there is no filter in front of a cell, it might also be * the organization for a complete segment index. Each cell consists of a number of BLOB files, that * must be merged to represent a single index. In fact these index files are only merged on demand * if there are too many of them. An index merge can be done with a stream read and stream write operation. * in normal operation, there are only a number of read-only BLOB files and a single RAM cache that is * kept in the RAM as long as a given limit of entries is reached. Then the cache is flushed and becomes * another BLOB file in the index array. */ public final class IndexCell extends AbstractBufferedIndex implements BufferedIndex, Iterable> { private static final long cleanupCycle = 60000; private static final long dumpCycle = 600000; // class variables private final ReferenceContainerArray array; private ReferenceContainerCache ram; private final ComparableARC countCache; private int maxRamEntries; private final IODispatcher merger; private long lastCleanup; private long lastDump; private final long targetFileSize, maxFileSize; private final int writeBufferSize; private final Map removeDelayedURLs; // mapping from word hashes to a list of url hashes private boolean cleanupShallRun; private final Thread cleanupThread; public IndexCell( final File cellPath, final String prefix, final ReferenceFactory factory, final ByteOrder termOrder, final int termSize, final int maxRamEntries, final long targetFileSize, final long maxFileSize, final int writeBufferSize ) throws IOException { super(factory); this.merger = new IODispatcher(1, 1, writeBufferSize); this.array = new ReferenceContainerArray(cellPath, prefix, factory, termOrder, termSize); this.ram = new ReferenceContainerCache(factory, termOrder, termSize); this.countCache = new ComparableARC(1000, termOrder); this.maxRamEntries = maxRamEntries; this.merger.start(); this.lastCleanup = System.currentTimeMillis(); this.lastDump = System.currentTimeMillis(); this.targetFileSize = targetFileSize; this.maxFileSize = maxFileSize; this.writeBufferSize = writeBufferSize; this.removeDelayedURLs = new TreeMap(URIMetadataRow.rowdef.objectOrder); this.cleanupShallRun = true; this.cleanupThread = new CleanupThread(); this.cleanupThread.start(); } private class CleanupThread extends Thread { @Override public void run() { while (IndexCell.this.cleanupShallRun) { try { cleanCache(); } catch (final Throwable e) { Log.logException(e); } try { Thread.sleep(3000); } catch (final InterruptedException e) {} } } private void cleanCache() { // dump the cache if necessary final long t = System.currentTimeMillis(); if ((IndexCell.this.ram.size() >= IndexCell.this.maxRamEntries || (IndexCell.this.ram.size() > 3000 && !MemoryControl.request(80L * 1024L * 1024L, false)) || (IndexCell.this.ram.size() > 0 && IndexCell.this.lastDump + dumpCycle < t))) { synchronized (IndexCell.this.merger) { if (IndexCell.this.ram.size() >= IndexCell.this.maxRamEntries || (IndexCell.this.ram.size() > 3000 && !MemoryControl.request(80L * 1024L * 1024L, false)) || (IndexCell.this.ram.size() > 0 && IndexCell.this.lastDump + dumpCycle < t)) try { IndexCell.this.lastDump = System.currentTimeMillis(); // removed delayed try {removeDelayed();} catch (final IOException e) {} // dump the ram final File dumpFile = IndexCell.this.array.newContainerBLOBFile(); // a critical point: when the ram is handed to the dump job, // don't write into it any more. Use a fresh one instead ReferenceContainerCache ramdump; final ByteOrder termOrder = IndexCell.this.ram.termKeyOrdering(); final int termSize = IndexCell.this.ram.termKeyLength(); synchronized (this) { ramdump = IndexCell.this.ram; // get a fresh ram cache IndexCell.this.ram = new ReferenceContainerCache(IndexCell.this.factory, termOrder, termSize); } // dump the buffer IndexCell.this.merger.dump(ramdump, dumpFile, IndexCell.this.array); IndexCell.this.lastDump = System.currentTimeMillis(); } catch (final Throwable e) { // catch all exceptions Log.logException(e); } } } // clean-up the cache if ((IndexCell.this.array.entries() > 50 || IndexCell.this.lastCleanup + cleanupCycle < t)) { synchronized (IndexCell.this.array) { if (IndexCell.this.array.entries() > 50 || (IndexCell.this.lastCleanup + cleanupCycle < System.currentTimeMillis())) try { IndexCell.this.lastCleanup = System.currentTimeMillis(); // set time to prevent that this is called to soon again IndexCell.this.shrink(IndexCell.this.targetFileSize, IndexCell.this.maxFileSize); IndexCell.this.lastCleanup = System.currentTimeMillis(); // set again to mark end of procedure } catch (final Throwable e) { // catch all exceptions Log.logException(e); } } } } } public boolean shrink(final long targetFileSize, final long maxFileSize) { if (this.array.entries() < 2) return false; boolean donesomething = false; // first try to merge small files that match int term = 10; while (term-- > 0 && (this.merger.queueLength() < 3 || this.array.entries() >= 50)) { if (!this.array.shrinkBestSmallFiles(this.merger, targetFileSize)) break; else donesomething = true; } // then try to merge simply any small file term = 10; while (term-- > 0 && (this.merger.queueLength() < 2)) { if (!this.array.shrinkAnySmallFiles(this.merger, targetFileSize)) break; else donesomething = true; } // if there is no small file, then merge matching files up to limit term = 10; while (term-- > 0 && (this.merger.queueLength() < 1)) { if (!this.array.shrinkUpToMaxSizeFiles(this.merger, maxFileSize)) break; else donesomething = true; } // rewrite old files (hack from sixcooler, see http://forum.yacy-websuche.de/viewtopic.php?p=15004#p15004) term = 10; while (term-- > 0 && (this.merger.queueLength() < 1)) { if (!this.array.shrinkOldFiles(this.merger, targetFileSize)) break; else donesomething = true; } return donesomething; } /* * methods to implement Index */ /** * every index entry is made for a term which has a fixed size * @return the size of the term */ @Override public int termKeyLength() { return this.ram.termKeyLength(); } /** * add entries to the cell: this adds the new entries always to the RAM part, never to BLOBs * @throws IOException * @throws RowSpaceExceededException */ @Override public void add(final ReferenceContainer newEntries) throws IOException, RowSpaceExceededException { try { this.ram.add(newEntries); final long t = System.currentTimeMillis(); if (this.ram.size() % 1000 == 0 || this.lastCleanup + cleanupCycle < t || this.lastDump + dumpCycle < t) { EventTracker.update(EventTracker.EClass.WORDCACHE, Long.valueOf(this.ram.size()), true); } } catch (final RowSpaceExceededException e) { EventTracker.update(EventTracker.EClass.WORDCACHE, Long.valueOf(this.ram.size()), true); this.ram.add(newEntries); } } @Override public void add(final byte[] termHash, final ReferenceType entry) throws IOException, RowSpaceExceededException { try { this.ram.add(termHash, entry); final long t = System.currentTimeMillis(); if (this.ram.size() % 1000 == 0 || this.lastCleanup + cleanupCycle < t || this.lastDump + dumpCycle < t) { EventTracker.update(EventTracker.EClass.WORDCACHE, Long.valueOf(this.ram.size()), true); } } catch (final RowSpaceExceededException e) { EventTracker.update(EventTracker.EClass.WORDCACHE, Long.valueOf(this.ram.size()), true); this.ram.add(termHash, entry); } } /** * checks if there is any container for this termHash, either in RAM or any BLOB */ @Override public boolean has(final byte[] termHash) { if (this.ram.has(termHash)) return true; return this.array.has(termHash); } /** * count number of references for a given term * this method may cause strong IO load if called too frequently. */ @Override public int count(final byte[] termHash) { final Integer cachedCount = this.countCache.get(termHash); if (cachedCount != null) return cachedCount.intValue(); int countFile = 0; // read fresh values from file try { countFile = this.array.count(termHash); } catch (final Throwable e) { Log.logException(e); } assert countFile >= 0; // count from container in ram final ReferenceContainer countRam = this.ram.get(termHash, null); assert countRam == null || countRam.size() >= 0; int c = countRam == null ? countFile : countFile + countRam.size(); // exclude entries from delayed remove synchronized (this.removeDelayedURLs) { final HandleSet s = this.removeDelayedURLs.get(termHash); if (s != null) c -= s.size(); if (c < 0) c = 0; } // put count result into cache if (MemoryControl.shortStatus()) this.countCache.clear(); this.countCache.insert(termHash, c); return c; } /** * all containers in the BLOBs and the RAM are merged and returned. * Please be aware that the returned values may be top-level cloned ReferenceContainers or direct links to containers * If the containers are modified after they are returned, they MAY alter the stored index. * @throws IOException * @return a container with merged ReferenceContainer from RAM and the file array or null if there is no data to be returned */ @Override public ReferenceContainer get(final byte[] termHash, final HandleSet urlselection) throws IOException { final ReferenceContainer c0 = this.ram.get(termHash, null); ReferenceContainer c1 = null; try { c1 = this.array.get(termHash); } catch (final RowSpaceExceededException e2) { Log.logException(e2); } ReferenceContainer result = null; if (c0 != null && c1 != null) { try { result = c1.merge(c0); } catch (final RowSpaceExceededException e) { // try to free some ram try { result = c1.merge(c0); } catch (final RowSpaceExceededException e1) { // go silently over the problem result = (c1.size() > c0.size()) ? c1: c0; } } } else if (c0 != null) { result = c0; } else if (c1 != null) { result = c1; } if (result == null) return null; // remove the failed urls synchronized (this.removeDelayedURLs) { final HandleSet s = this.removeDelayedURLs.get(termHash); if (s != null) result.removeEntries(s); } return result; } /** * deleting a container affects the containers in RAM and all the BLOB files * the deleted containers are merged and returned as result of the method * @throws IOException */ @Override public ReferenceContainer delete(final byte[] termHash) throws IOException { removeDelayed(); ReferenceContainer c1 = null; try { c1 = this.array.get(termHash); } catch (final RowSpaceExceededException e2) { Log.logException(e2); } if (c1 != null) { this.array.delete(termHash); } final ReferenceContainer c0 = this.ram.delete(termHash); if (c1 == null) return c0; if (c0 == null) return c1; try { return c1.merge(c0); } catch (final RowSpaceExceededException e) { // try to free some ram try { return c1.merge(c0); } catch (final RowSpaceExceededException e1) { // go silently over the problem return (c1.size() > c0.size()) ? c1: c0; } } } @Override public void removeDelayed(final byte[] termHash, final HandleSet urlHashes) { HandleSet r; synchronized (this.removeDelayedURLs) { r = this.removeDelayedURLs.get(termHash); } if (r == null) { r = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0); } try { r.putAll(urlHashes); } catch (final RowSpaceExceededException e) { try {remove(termHash, urlHashes);} catch (final IOException e1) {} return; } synchronized (this.removeDelayedURLs) { this.removeDelayedURLs.put(termHash, r); } } @Override public void removeDelayed(final byte[] termHash, final byte[] urlHashBytes) { HandleSet r; synchronized (this.removeDelayedURLs) { r = this.removeDelayedURLs.get(termHash); } if (r == null) { r = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0); } try { r.put(urlHashBytes); } catch (final RowSpaceExceededException e) { try {remove(termHash, urlHashBytes);} catch (final IOException e1) {} return; } synchronized (this.removeDelayedURLs) { this.removeDelayedURLs.put(termHash, r); } } @Override public void removeDelayed() throws IOException { final HandleSet words = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0); // a set of url hashes where a worker thread tried to work on, but failed. synchronized (this.removeDelayedURLs) { for (final byte[] b: this.removeDelayedURLs.keySet()) try {words.put(b);} catch (final RowSpaceExceededException e) {} } synchronized (this.removeDelayedURLs) { for (final byte[] b: words) { final HandleSet urls = this.removeDelayedURLs.remove(b); if (urls != null) remove(b, urls); } } this.countCache.clear(); } /** * remove url references from a selected word hash. this deletes also in the BLOB * files, which means that there exists new gap entries after the deletion * The gaps are never merged in place, but can be eliminated when BLOBs are merged into * new BLOBs. This returns the sum of all url references that have been removed * @throws IOException */ @Override public int remove(final byte[] termHash, final HandleSet urlHashes) throws IOException { this.countCache.remove(termHash); final int removed = this.ram.remove(termHash, urlHashes); int reduced; //final long am = this.array.mem(); try { reduced = this.array.reduce(termHash, new RemoveReducer(urlHashes)); } catch (final RowSpaceExceededException e) { reduced = 0; Log.logWarning("IndexCell", "not possible to remove urlHashes from a RWI because of too low memory. Remove was not applied. Please increase RAM assignment"); } //assert this.array.mem() <= am : "am = " + am + ", array.mem() = " + this.array.mem(); return removed + (reduced / this.array.rowdef().objectsize); } @Override public boolean remove(final byte[] termHash, final byte[] urlHashBytes) throws IOException { this.countCache.remove(termHash); final boolean removed = this.ram.remove(termHash, urlHashBytes); int reduced; //final long am = this.array.mem(); try { reduced = this.array.reduce(termHash, new RemoveReducer(urlHashBytes)); } catch (final RowSpaceExceededException e) { reduced = 0; Log.logWarning("IndexCell", "not possible to remove urlHashes from a RWI because of too low memory. Remove was not applied. Please increase RAM assignment"); } //assert this.array.mem() <= am : "am = " + am + ", array.mem() = " + this.array.mem(); return removed || (reduced > 0); } private static class RemoveReducer implements ReferenceContainerArray.ContainerReducer { HandleSet urlHashes; public RemoveReducer(final HandleSet urlHashes) { this.urlHashes = urlHashes; } public RemoveReducer(final byte[] urlHashBytes) { this.urlHashes = new HandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 0); try { this.urlHashes.put(urlHashBytes); } catch (final RowSpaceExceededException e) { Log.logException(e); } } @Override public ReferenceContainer reduce(final ReferenceContainer container) { container.sort(); container.removeEntries(this.urlHashes); return container; } } @Override public Iterator> iterator() { return referenceContainerIterator(null, false, false); } @Override public CloneableIterator> referenceCountIterator(final byte[] starttermHash, final boolean rot, final boolean excludePrivate) { return this.array.referenceCountIterator(starttermHash, false, excludePrivate); } @Override public CloneableIterator> referenceContainerIterator(final byte[] startTermHash, final boolean rot, final boolean excludePrivate) { return referenceContainerIterator(startTermHash, rot, excludePrivate); } @Override public CloneableIterator> referenceContainerIterator(final byte[] startTermHash, final boolean rot, final boolean excludePrivate, final boolean ram) { final Order> containerOrder = new ReferenceContainerOrder(this.factory, this.ram.rowdef().getOrdering().clone()); containerOrder.rotate(new ReferenceContainer(this.factory, startTermHash)); if (ram) { return this.ram.referenceContainerIterator(startTermHash, rot, excludePrivate); } return new MergeIterator>( this.ram.referenceContainerIterator(startTermHash, rot, excludePrivate), new MergeIterator>( this.ram.referenceContainerIterator(startTermHash, false, excludePrivate), this.array.referenceContainerIterator(startTermHash, false, excludePrivate), containerOrder, ReferenceContainer.containerMergeMethod, true), containerOrder, ReferenceContainer.containerMergeMethod, true); } /** * clear the RAM and BLOB part, deletes everything in the cell * @throws IOException */ @Override public synchronized void clear() throws IOException { this.countCache.clear(); this.removeDelayedURLs.clear(); this.ram.clear(); this.array.clear(); } /** * when a cell is closed, the current RAM is dumped to a file which will be opened as * BLOB file the next time a cell is opened. A name for the dump is automatically generated * and is composed of the current date and the cell salt */ @Override public synchronized void close() { this.countCache.clear(); try {removeDelayed();} catch (final IOException e) {} if (!this.ram.isEmpty()) this.ram.dump(this.array.newContainerBLOBFile(), (int) Math.min(MemoryControl.available() / 3, this.writeBufferSize), true); // close all this.cleanupShallRun = false; if (this.cleanupThread != null) try { this.cleanupThread.join(); } catch (final InterruptedException e) {} this.merger.terminate(); this.ram.close(); this.array.close(); } @Override public int size() { throw new UnsupportedOperationException("an accumulated size of index entries would not reflect the real number of words, which cannot be computed easily"); } public int[] sizes() { final int[] as = this.array.sizes(); final int[] asr = new int[as.length + 1]; System.arraycopy(as, 0, asr, 0, as.length); asr[as.length] = this.ram.size(); return asr; } public int sizesMax() { int m = 0; final int[] s = sizes(); for (final int element : s) if (element > m) m = element; return m; } @Override public int minMem() { return 10 * 1024 * 1024; } @Override public ByteOrder termKeyOrdering() { return this.array.ordering(); } public File newContainerBLOBFile() { // for migration of cache files return this.array.newContainerBLOBFile(); } public void mountBLOBFile(final File blobFile) throws IOException { // for migration of cache files this.array.mountBLOBFile(blobFile); } @Override public long getBufferMaxAge() { return System.currentTimeMillis(); } @Override public int getBufferMaxReferences() { return this.ram.maxReferences(); } @Override public long getBufferMinAge() { return System.currentTimeMillis(); } @Override public int getBufferSize() { return this.ram.size(); } @Override public long getBufferSizeBytes() { return 10000 * this.ram.size(); // guessed; we don't know that exactly because there is no statistics here (expensive, not necessary) } @Override public void setBufferMaxWordCount(final int maxWords) { this.maxRamEntries = maxWords; } }