From 0139988c044f72c24e797cbc7cae419013b02b39 Mon Sep 17 00:00:00 2001 From: orbiter Date: Wed, 1 Apr 2009 12:39:11 +0000 Subject: [PATCH] - added writing of temporary file names and renaming to final file name when index dump/merge are done. Interrupted merges can be cleaned up. - added clean-up of unfinished merges and unused idx/gap files - enhanced merge file selection method git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5764 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/de/anomic/crawler/Balancer.java | 2 +- source/de/anomic/crawler/CrawlEntry.java | 1 + source/de/anomic/kelondro/blob/BLOBArray.java | 22 ++++++++- .../kelondro/blob/BLOBHeapModifier.java | 5 +- source/de/anomic/kelondro/blob/Gap.java | 7 ++- .../de/anomic/kelondro/blob/HeapReader.java | 11 +++-- .../de/anomic/kelondro/blob/HeapWriter.java | 48 ++++++++++++------- .../kelondro/index/LongHandleIndex.java | 6 ++- .../de/anomic/kelondro/text/IODispatcher.java | 3 +- source/de/anomic/kelondro/text/IndexCell.java | 30 +++++------- .../text/ReferenceContainerArray.java | 36 ++++++++------ .../text/ReferenceContainerCache.java | 3 +- 12 files changed, 111 insertions(+), 63 deletions(-) diff --git a/source/de/anomic/crawler/Balancer.java b/source/de/anomic/crawler/Balancer.java index 9da525303..e40beb9b7 100644 --- a/source/de/anomic/crawler/Balancer.java +++ b/source/de/anomic/crawler/Balancer.java @@ -582,7 +582,7 @@ public class Balancer { // in best case, this should never happen if the balancer works propertly // this is only to protection against the worst case, where the crawler could // behave in a DoS-manner - Log.logInfo("BALANCER", "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ((sleeptime > Math.max(minimumLocalDelta, minimumGlobalDelta)) ? " (caused by robots.txt)" : "")); + Log.logInfo("BALANCER", "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ((sleeptime > Math.max(minimumLocalDelta, minimumGlobalDelta)) ? " (forced latency)" : "")); if (System.currentTimeMillis() - this.lastPrepare > 10000) { long t = System.currentTimeMillis(); prepare(400); diff --git a/source/de/anomic/crawler/CrawlEntry.java b/source/de/anomic/crawler/CrawlEntry.java index beebc8b9c..5ff1d91b3 100755 --- a/source/de/anomic/crawler/CrawlEntry.java +++ b/source/de/anomic/crawler/CrawlEntry.java @@ -159,6 +159,7 @@ public class CrawlEntry extends serverProcessorJob { } public void setStatus(final String s, int code) { + //System.out.println("***DEBUG*** crawler status " + s + ", " + code + " for " + this.url.toNormalform(true, false)); this.statusMessage = s; this.status = code; } diff --git a/source/de/anomic/kelondro/blob/BLOBArray.java b/source/de/anomic/kelondro/blob/BLOBArray.java index c4251f17b..21a3c543f 100755 --- a/source/de/anomic/kelondro/blob/BLOBArray.java +++ b/source/de/anomic/kelondro/blob/BLOBArray.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.text.ParseException; import java.util.ArrayList; import java.util.Date; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.TreeMap; @@ -97,12 +98,31 @@ public class BLOBArray implements BLOB { // register all blob files inside this directory String[] files = heapLocation.list(); + HashSet fh = new HashSet(); + for (int i = 0; i < files.length; i++) fh.add(files[i]); + // delete unused temporary files + boolean deletions = false; + for (int i = 0; i < files.length; i++) { + if (files[i].endsWith(".tmp")) { + FileUtils.deletedelete(new File(heapLocation, files[i])); + deletions = true; + } + if (files[i].endsWith(".idx") || files[i].endsWith(".gap")) { + String s = files[i].substring(0, files[i].length() - 17); + if (!fh.contains(s)) { + FileUtils.deletedelete(new File(heapLocation, files[i])); + deletions = true; + } + } + } + if (deletions) files = heapLocation.list(); // make a fresh list + + // find maximum time: the file with this time will be given a write buffer Date d; TreeMap sortedItems = new TreeMap(); BLOB oneBlob; File f; long time, maxtime = 0; - // first find maximum time: the file with this time will be given a write buffer for (int i = 0; i < files.length; i++) { if (files[i].length() >= 19 && files[i].endsWith(".blob")) { try { diff --git a/source/de/anomic/kelondro/blob/BLOBHeapModifier.java b/source/de/anomic/kelondro/blob/BLOBHeapModifier.java index 56ad8c047..b9dcb2c3f 100644 --- a/source/de/anomic/kelondro/blob/BLOBHeapModifier.java +++ b/source/de/anomic/kelondro/blob/BLOBHeapModifier.java @@ -112,10 +112,11 @@ public class BLOBHeapModifier extends HeapReader implements BLOB { // to speed up the next start try { long start = System.currentTimeMillis(); - free.dump(HeapWriter.fingerprintGapFile(this.heapFile)); + String fingerprint = HeapWriter.fingerprintFileHash(this.heapFile); + free.dump(HeapWriter.fingerprintGapFile(this.heapFile, fingerprint)); free.clear(); free = null; - index.dump(HeapWriter.fingerprintIndexFile(this.heapFile)); + index.dump(HeapWriter.fingerprintIndexFile(this.heapFile, fingerprint)); Log.logInfo("kelondroBLOBHeap", "wrote a dump for the " + this.index.size() + " index entries of " + heapFile.getName()+ " in " + (System.currentTimeMillis() - start) + " milliseconds."); index.close(); index = null; diff --git a/source/de/anomic/kelondro/blob/Gap.java b/source/de/anomic/kelondro/blob/Gap.java index 8f7746216..1c75bdd04 100644 --- a/source/de/anomic/kelondro/blob/Gap.java +++ b/source/de/anomic/kelondro/blob/Gap.java @@ -84,8 +84,9 @@ public class Gap extends TreeMap { * @throws IOException */ public int dump(File file) throws IOException { + File tmp = new File(file.getParentFile(), file.getName() + ".tmp"); Iterator> i = this.entrySet().iterator(); - DataOutputStream os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), 4 * 1024 * 1024)); + DataOutputStream os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(tmp), 4 * 1024 * 1024)); int c = 0; Map.Entry e; while (i.hasNext()) { @@ -96,6 +97,10 @@ public class Gap extends TreeMap { } os.flush(); os.close(); + tmp.renameTo(file); + assert file.exists() : file.toString(); + assert !tmp.exists() : tmp.toString(); + return c; } diff --git a/source/de/anomic/kelondro/blob/HeapReader.java b/source/de/anomic/kelondro/blob/HeapReader.java index 982fa3d86..7459d4796 100644 --- a/source/de/anomic/kelondro/blob/HeapReader.java +++ b/source/de/anomic/kelondro/blob/HeapReader.java @@ -64,7 +64,7 @@ public class HeapReader { this.file = new CachedRandomAccess(heapFile); // read or initialize the index - if (initIndexReadDump(heapFile)) { + if (initIndexReadDump()) { // verify that everything worked just fine // pick some elements of the index Iterator i = this.index.keys(true, null); @@ -94,13 +94,14 @@ public class HeapReader { } } - private boolean initIndexReadDump(File f) { + private boolean initIndexReadDump() { // look for an index dump and read it if it exist // if this is successfull, return true; otherwise false - File fif = HeapWriter.fingerprintIndexFile(f); - File fgf = HeapWriter.fingerprintGapFile(f); + String fingerprint = HeapWriter.fingerprintFileHash(this.heapFile); + File fif = HeapWriter.fingerprintIndexFile(this.heapFile, fingerprint); + File fgf = HeapWriter.fingerprintGapFile(this.heapFile, fingerprint); if (!fif.exists() || !fgf.exists()) { - HeapWriter.deleteAllFingerprints(f); + HeapWriter.deleteAllFingerprints(this.heapFile); return false; } diff --git a/source/de/anomic/kelondro/blob/HeapWriter.java b/source/de/anomic/kelondro/blob/HeapWriter.java index 44e6eaac4..97ff1f0ad 100644 --- a/source/de/anomic/kelondro/blob/HeapWriter.java +++ b/source/de/anomic/kelondro/blob/HeapWriter.java @@ -38,11 +38,12 @@ import de.anomic.kelondro.util.Log; public final class HeapWriter { - private int keylength; // the length of the primary key - private LongHandleIndex index; // key/seek relation for used records - private final File heapFile; // the file of the heap - private DataOutputStream os; // the output stream where the BLOB is written - private long seek; // the current write position + private int keylength; // the length of the primary key + private LongHandleIndex index; // key/seek relation for used records + private final File heapFileTMP; // the temporary file of the heap during writing + private final File heapFileREADY; // the final file of the heap when the file is closed + private DataOutputStream os; // the output stream where the BLOB is written + private long seek; // the current write position //private HashSet doublecheck;// only for testing /* @@ -65,16 +66,18 @@ public final class HeapWriter { /** * create a heap file: a arbitrary number of BLOBs, indexed by an access key * The heap file will be indexed upon initialization. - * @param heapFile + * @param temporaryHeapFile + * @param readyHeapFile * @param keylength * @param ordering * @throws IOException */ - public HeapWriter(final File heapFile, final int keylength, final ByteOrder ordering) throws IOException { - this.heapFile = heapFile; + public HeapWriter(final File temporaryHeapFile, final File readyHeapFile, final int keylength, final ByteOrder ordering) throws IOException { + this.heapFileTMP = temporaryHeapFile; + this.heapFileREADY = readyHeapFile; this.keylength = keylength; this.index = new LongHandleIndex(keylength, ordering, 10, 100000); - this.os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(heapFile), 8 * 1024 * 1024)); + this.os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(temporaryHeapFile), 8 * 1024 * 1024)); //this.doublecheck = new HashSet(); this.seek = 0; } @@ -91,7 +94,7 @@ public final class HeapWriter { assert blob.length > 0; assert key.length == this.keylength; assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; - assert index.get(key) < 0 : "index.get(key) = " + index.get(key) + ", index.size() = " + index.size() + ", file.length() = " + this.heapFile.length() + ", key = " + new String(key); // must not occur before + assert index.get(key) < 0 : "index.get(key) = " + index.get(key) + ", index.size() = " + index.size() + ", file.length() = " + this.heapFileTMP.length() + ", key = " + new String(key); // must not occur before if ((blob == null) || (blob.length == 0)) return; int chunkl = key.length + blob.length; os.writeInt(chunkl); @@ -102,18 +105,19 @@ public final class HeapWriter { this.seek += chunkl + 4; } - protected static File fingerprintIndexFile(File f) { + protected static File fingerprintIndexFile(File f, String fingerprint) { assert f != null; - return new File(f.getParentFile(), f.getName() + "." + fingerprintFileHash(f) + ".idx"); + return new File(f.getParentFile(), f.getName() + "." + fingerprint + ".idx"); } - protected static File fingerprintGapFile(File f) { + protected static File fingerprintGapFile(File f, String fingerprint) { assert f != null; - return new File(f.getParentFile(), f.getName() + "." + fingerprintFileHash(f) + ".gap"); + return new File(f.getParentFile(), f.getName() + "." + fingerprint + ".gap"); } protected static String fingerprintFileHash(File f) { assert f != null; + assert f.exists() : "file = " + f.toString(); String fp = Digest.fastFingerprintB64(f, false); assert fp != null : "file = " + f.toString(); return fp.substring(0, 12); @@ -133,6 +137,7 @@ public final class HeapWriter { * @throws */ public synchronized void close(boolean writeIDX) { + // close the file try { os.flush(); os.close(); @@ -141,14 +146,21 @@ public final class HeapWriter { } os = null; + // rename the file into final name + this.heapFileTMP.renameTo(this.heapFileREADY); + assert this.heapFileREADY.exists() : this.heapFileREADY.toString(); + assert !this.heapFileTMP.exists() : this.heapFileTMP.toString(); + + // generate index and gap files if (writeIDX && index.size() > 3) { // now we can create a dump of the index and the gap information // to speed up the next start try { long start = System.currentTimeMillis(); - new Gap().dump(fingerprintGapFile(this.heapFile)); - index.dump(fingerprintIndexFile(this.heapFile)); - Log.logInfo("kelondroBLOBHeapWriter", "wrote a dump for the " + this.index.size() + " index entries of " + heapFile.getName()+ " in " + (System.currentTimeMillis() - start) + " milliseconds."); + String fingerprint = HeapWriter.fingerprintFileHash(this.heapFileREADY); + new Gap().dump(fingerprintGapFile(this.heapFileREADY, fingerprint)); + index.dump(fingerprintIndexFile(this.heapFileREADY, fingerprint)); + Log.logInfo("kelondroBLOBHeapWriter", "wrote a dump for the " + this.index.size() + " index entries of " + heapFileREADY.getName()+ " in " + (System.currentTimeMillis() - start) + " milliseconds."); index.close(); index = null; } catch (IOException e) { @@ -160,5 +172,5 @@ public final class HeapWriter { index = null; } } - + } diff --git a/source/de/anomic/kelondro/index/LongHandleIndex.java b/source/de/anomic/kelondro/index/LongHandleIndex.java index 14563afed..25ba2e83f 100644 --- a/source/de/anomic/kelondro/index/LongHandleIndex.java +++ b/source/de/anomic/kelondro/index/LongHandleIndex.java @@ -98,8 +98,9 @@ public class LongHandleIndex { // we must use an iterator from the combined index, because we need the entries sorted // otherwise we could just write the byte[] from the in kelondroRowSet which would make // everything much faster, but this is not an option here. + File tmp = new File(file.getParentFile(), file.getName() + ".tmp"); Iterator i = this.index.rows(true, null); - OutputStream os = new BufferedOutputStream(new FileOutputStream(file), 4 * 1024 * 1024); + OutputStream os = new BufferedOutputStream(new FileOutputStream(tmp), 4 * 1024 * 1024); int c = 0; while (i.hasNext()) { os.write(i.next().bytes()); @@ -107,6 +108,9 @@ public class LongHandleIndex { } os.flush(); os.close(); + tmp.renameTo(file); + assert file.exists() : file.toString(); + assert !tmp.exists() : tmp.toString(); return c; } diff --git a/source/de/anomic/kelondro/text/IODispatcher.java b/source/de/anomic/kelondro/text/IODispatcher.java index 0d7e4f9b5..0d0b6a1ed 100644 --- a/source/de/anomic/kelondro/text/IODispatcher.java +++ b/source/de/anomic/kelondro/text/IODispatcher.java @@ -239,7 +239,8 @@ public class IODispatcher extends Thread { } assert i1.hasNext(); assert i2.hasNext(); - HeapWriter writer = new HeapWriter(newFile, array.keylength(), array.ordering()); + File tmpFile = new File(newFile.getParentFile(), newFile.getName() + ".tmp"); + HeapWriter writer = new HeapWriter(tmpFile, newFile, array.keylength(), array.ordering()); merge(i1, i2, array.ordering(), writer); writer.close(true); // we don't need the old files any more diff --git a/source/de/anomic/kelondro/text/IndexCell.java b/source/de/anomic/kelondro/text/IndexCell.java index de7a74b35..640a1d4c8 100644 --- a/source/de/anomic/kelondro/text/IndexCell.java +++ b/source/de/anomic/kelondro/text/IndexCell.java @@ -80,7 +80,7 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn this.lastCleanup = System.currentTimeMillis(); this.targetFileSize = targetFileSize; this.maxFileSize = maxFileSize; - cacheCleanup(); + cleanCache(); } @@ -96,15 +96,13 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn public synchronized void add(ReferenceContainer newEntries) throws IOException { this.ram.add(newEntries); serverProfiling.update("wordcache", Long.valueOf(this.ram.size()), true); - cacheDumpIfNecessary(); - cacheCleanup(); + cleanCache(); } public synchronized void add(String hash, ReferenceRow entry) throws IOException { this.ram.add(hash, entry); serverProfiling.update("wordcache", Long.valueOf(this.ram.size()), true); - cacheDumpIfNecessary(); - cacheCleanup(); + cleanCache(); } /** @@ -159,7 +157,7 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn return c0; } this.array.delete(wordHash); - cacheCleanup(); + cleanCache(); if (c0 == null) return c1; return c1.merge(c0); } @@ -274,16 +272,22 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn * cache control methods */ - private synchronized void cacheDumpIfNecessary() { - if (this.ram.size() > this.maxRamEntries || MemoryControl.available() < 20 * 1024 * 1024) { + private synchronized void cleanCache() { + // dump the cache if necessary + if (this.ram.size() > this.maxRamEntries || (this.ram.size() > 3000 && MemoryControl.available() < 50 * 1024 * 1024)) { try { cacheDump(); } catch (IOException e) { e.printStackTrace(); } } + + // clean-up the cache + if (this.lastCleanup + cleanupCycle > System.currentTimeMillis()) return; + this.array.shrink(this.targetFileSize, this.maxFileSize); + this.lastCleanup = System.currentTimeMillis(); } - + private synchronized void cacheDump() throws IOException { // dump the ram File dumpFile = this.array.newContainerBLOBFile(); @@ -295,12 +299,6 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn this.ram.initWriteMode(); } - private synchronized void cacheCleanup() throws IOException { - if (this.lastCleanup + cleanupCycle > System.currentTimeMillis()) return; - this.array.shrink(this.targetFileSize, this.maxFileSize); - this.lastCleanup = System.currentTimeMillis(); - } - public File newContainerBLOBFile() { // for migration of cache files return this.array.newContainerBLOBFile(); @@ -315,12 +313,10 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn // do nothing } - public int getBackendSize() { return this.array.size(); } - public long getBufferMaxAge() { return System.currentTimeMillis(); } diff --git a/source/de/anomic/kelondro/text/ReferenceContainerArray.java b/source/de/anomic/kelondro/text/ReferenceContainerArray.java index a039e7890..61ec6ed32 100644 --- a/source/de/anomic/kelondro/text/ReferenceContainerArray.java +++ b/source/de/anomic/kelondro/text/ReferenceContainerArray.java @@ -245,32 +245,38 @@ public final class ReferenceContainerArray { return this.array.entries(); } - public synchronized boolean shrink(long targetFileSize, long maxFileSize) throws IOException { + public synchronized boolean shrink(long targetFileSize, long maxFileSize) { if (this.array.entries() < 2) return false; - if (this.merger.queueLength() > 0) return false; + boolean donesomething = false; - File[] ff = this.array.unmountBestMatch(2.0, targetFileSize); - if (ff != null) { - Log.logInfo("RICELL-shrink", "doing unmountBestMatch(2.0, " + targetFileSize + ")"); + // first try to merge small files that match + while (this.merger.queueLength() < 3) { + File[] ff = this.array.unmountBestMatch(2.0, targetFileSize); + if (ff == null) break; + Log.logInfo("RICELL-shrink", "unmountBestMatch(2.0, " + targetFileSize + ")"); merger.merge(ff[0], ff[1], this.array, this.payloadrow, newContainerBLOBFile()); - return true; + donesomething = true; } - ff = this.array.unmountSmallest(targetFileSize); - if (ff != null) { - Log.logInfo("RICELL-shrink", "doing unmountSmallest(" + targetFileSize + ")"); + // then try to merge simply any small file + while (this.merger.queueLength() < 2) { + File[] ff = this.array.unmountSmallest(targetFileSize); + if (ff == null) break; + Log.logInfo("RICELL-shrink", "unmountSmallest(" + targetFileSize + ")"); merger.merge(ff[0], ff[1], this.array, this.payloadrow, newContainerBLOBFile()); - return true; + donesomething = true; } - ff = this.array.unmountBestMatch(2.0, maxFileSize); - if (ff != null) { - Log.logInfo("RICELL-shrink", "doing unmountBestMatch(2.0, " + maxFileSize + ")"); + // if there is no small file, then merge matching files up to limit + while (this.merger.queueLength() < 1) { + File[] ff = this.array.unmountBestMatch(2.0, maxFileSize); + if (ff == null) break; + Log.logInfo("RICELL-shrink", "unmountBestMatch(2.0, " + maxFileSize + ")"); merger.merge(ff[0], ff[1], this.array, this.payloadrow, newContainerBLOBFile()); - return true; + donesomething = true; } - return false; + return donesomething; } diff --git a/source/de/anomic/kelondro/text/ReferenceContainerCache.java b/source/de/anomic/kelondro/text/ReferenceContainerCache.java index 6701375d1..78de26ce7 100644 --- a/source/de/anomic/kelondro/text/ReferenceContainerCache.java +++ b/source/de/anomic/kelondro/text/ReferenceContainerCache.java @@ -114,7 +114,8 @@ public final class ReferenceContainerCache extends AbstractIndex implements Inde assert this.cache != null; Log.logInfo("indexContainerRAMHeap", "creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); if (heapFile.exists()) FileUtils.deletedelete(heapFile); - HeapWriter dump = new HeapWriter(heapFile, payloadrow.primaryKeyLength, Base64Order.enhancedCoder); + File tmpFile = new File(heapFile.getParentFile(), heapFile.getName() + ".tmp"); + HeapWriter dump = new HeapWriter(tmpFile, heapFile, payloadrow.primaryKeyLength, Base64Order.enhancedCoder); final long startTime = System.currentTimeMillis(); long wordcount = 0, urlcount = 0; String wordHash = null, lwh;