- added writing of temporary file names and renaming to final file name when index dump/merge are done. Interrupted merges can be cleaned up.

- added clean-up of unfinished merges and unused idx/gap files
- enhanced merge file selection method

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5764 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent 3621aa96ab
commit 0139988c04

@ -582,7 +582,7 @@ public class Balancer {
// in best case, this should never happen if the balancer works propertly // in best case, this should never happen if the balancer works propertly
// this is only to protection against the worst case, where the crawler could // this is only to protection against the worst case, where the crawler could
// behave in a DoS-manner // behave in a DoS-manner
Log.logInfo("BALANCER", "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ((sleeptime > Math.max(minimumLocalDelta, minimumGlobalDelta)) ? " (caused by robots.txt)" : "")); Log.logInfo("BALANCER", "forcing crawl-delay of " + sleeptime + " milliseconds for " + crawlEntry.url().getHost() + ((sleeptime > Math.max(minimumLocalDelta, minimumGlobalDelta)) ? " (forced latency)" : ""));
if (System.currentTimeMillis() - this.lastPrepare > 10000) { if (System.currentTimeMillis() - this.lastPrepare > 10000) {
long t = System.currentTimeMillis(); long t = System.currentTimeMillis();
prepare(400); prepare(400);

@ -159,6 +159,7 @@ public class CrawlEntry extends serverProcessorJob {
} }
public void setStatus(final String s, int code) { public void setStatus(final String s, int code) {
//System.out.println("***DEBUG*** crawler status " + s + ", " + code + " for " + this.url.toNormalform(true, false));
this.statusMessage = s; this.statusMessage = s;
this.status = code; this.status = code;
} }

@ -31,6 +31,7 @@ import java.io.IOException;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.TreeMap; import java.util.TreeMap;
@ -97,12 +98,31 @@ public class BLOBArray implements BLOB {
// register all blob files inside this directory // register all blob files inside this directory
String[] files = heapLocation.list(); String[] files = heapLocation.list();
HashSet<String> fh = new HashSet<String>();
for (int i = 0; i < files.length; i++) fh.add(files[i]);
// delete unused temporary files
boolean deletions = false;
for (int i = 0; i < files.length; i++) {
if (files[i].endsWith(".tmp")) {
FileUtils.deletedelete(new File(heapLocation, files[i]));
deletions = true;
}
if (files[i].endsWith(".idx") || files[i].endsWith(".gap")) {
String s = files[i].substring(0, files[i].length() - 17);
if (!fh.contains(s)) {
FileUtils.deletedelete(new File(heapLocation, files[i]));
deletions = true;
}
}
}
if (deletions) files = heapLocation.list(); // make a fresh list
// find maximum time: the file with this time will be given a write buffer
Date d; Date d;
TreeMap<Long, blobItem> sortedItems = new TreeMap<Long, blobItem>(); TreeMap<Long, blobItem> sortedItems = new TreeMap<Long, blobItem>();
BLOB oneBlob; BLOB oneBlob;
File f; File f;
long time, maxtime = 0; long time, maxtime = 0;
// first find maximum time: the file with this time will be given a write buffer
for (int i = 0; i < files.length; i++) { for (int i = 0; i < files.length; i++) {
if (files[i].length() >= 19 && files[i].endsWith(".blob")) { if (files[i].length() >= 19 && files[i].endsWith(".blob")) {
try { try {

@ -112,10 +112,11 @@ public class BLOBHeapModifier extends HeapReader implements BLOB {
// to speed up the next start // to speed up the next start
try { try {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
free.dump(HeapWriter.fingerprintGapFile(this.heapFile)); String fingerprint = HeapWriter.fingerprintFileHash(this.heapFile);
free.dump(HeapWriter.fingerprintGapFile(this.heapFile, fingerprint));
free.clear(); free.clear();
free = null; free = null;
index.dump(HeapWriter.fingerprintIndexFile(this.heapFile)); index.dump(HeapWriter.fingerprintIndexFile(this.heapFile, fingerprint));
Log.logInfo("kelondroBLOBHeap", "wrote a dump for the " + this.index.size() + " index entries of " + heapFile.getName()+ " in " + (System.currentTimeMillis() - start) + " milliseconds."); Log.logInfo("kelondroBLOBHeap", "wrote a dump for the " + this.index.size() + " index entries of " + heapFile.getName()+ " in " + (System.currentTimeMillis() - start) + " milliseconds.");
index.close(); index.close();
index = null; index = null;

@ -84,8 +84,9 @@ public class Gap extends TreeMap<Long, Integer> {
* @throws IOException * @throws IOException
*/ */
public int dump(File file) throws IOException { public int dump(File file) throws IOException {
File tmp = new File(file.getParentFile(), file.getName() + ".tmp");
Iterator<Map.Entry<Long, Integer>> i = this.entrySet().iterator(); Iterator<Map.Entry<Long, Integer>> i = this.entrySet().iterator();
DataOutputStream os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), 4 * 1024 * 1024)); DataOutputStream os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(tmp), 4 * 1024 * 1024));
int c = 0; int c = 0;
Map.Entry<Long, Integer> e; Map.Entry<Long, Integer> e;
while (i.hasNext()) { while (i.hasNext()) {
@ -96,6 +97,10 @@ public class Gap extends TreeMap<Long, Integer> {
} }
os.flush(); os.flush();
os.close(); os.close();
tmp.renameTo(file);
assert file.exists() : file.toString();
assert !tmp.exists() : tmp.toString();
return c; return c;
} }

@ -64,7 +64,7 @@ public class HeapReader {
this.file = new CachedRandomAccess(heapFile); this.file = new CachedRandomAccess(heapFile);
// read or initialize the index // read or initialize the index
if (initIndexReadDump(heapFile)) { if (initIndexReadDump()) {
// verify that everything worked just fine // verify that everything worked just fine
// pick some elements of the index // pick some elements of the index
Iterator<byte[]> i = this.index.keys(true, null); Iterator<byte[]> i = this.index.keys(true, null);
@ -94,13 +94,14 @@ public class HeapReader {
} }
} }
private boolean initIndexReadDump(File f) { private boolean initIndexReadDump() {
// look for an index dump and read it if it exist // look for an index dump and read it if it exist
// if this is successfull, return true; otherwise false // if this is successfull, return true; otherwise false
File fif = HeapWriter.fingerprintIndexFile(f); String fingerprint = HeapWriter.fingerprintFileHash(this.heapFile);
File fgf = HeapWriter.fingerprintGapFile(f); File fif = HeapWriter.fingerprintIndexFile(this.heapFile, fingerprint);
File fgf = HeapWriter.fingerprintGapFile(this.heapFile, fingerprint);
if (!fif.exists() || !fgf.exists()) { if (!fif.exists() || !fgf.exists()) {
HeapWriter.deleteAllFingerprints(f); HeapWriter.deleteAllFingerprints(this.heapFile);
return false; return false;
} }

@ -38,11 +38,12 @@ import de.anomic.kelondro.util.Log;
public final class HeapWriter { public final class HeapWriter {
private int keylength; // the length of the primary key private int keylength; // the length of the primary key
private LongHandleIndex index; // key/seek relation for used records private LongHandleIndex index; // key/seek relation for used records
private final File heapFile; // the file of the heap private final File heapFileTMP; // the temporary file of the heap during writing
private DataOutputStream os; // the output stream where the BLOB is written private final File heapFileREADY; // the final file of the heap when the file is closed
private long seek; // the current write position private DataOutputStream os; // the output stream where the BLOB is written
private long seek; // the current write position
//private HashSet<String> doublecheck;// only for testing //private HashSet<String> doublecheck;// only for testing
/* /*
@ -65,16 +66,18 @@ public final class HeapWriter {
/** /**
* create a heap file: a arbitrary number of BLOBs, indexed by an access key * create a heap file: a arbitrary number of BLOBs, indexed by an access key
* The heap file will be indexed upon initialization. * The heap file will be indexed upon initialization.
* @param heapFile * @param temporaryHeapFile
* @param readyHeapFile
* @param keylength * @param keylength
* @param ordering * @param ordering
* @throws IOException * @throws IOException
*/ */
public HeapWriter(final File heapFile, final int keylength, final ByteOrder ordering) throws IOException { public HeapWriter(final File temporaryHeapFile, final File readyHeapFile, final int keylength, final ByteOrder ordering) throws IOException {
this.heapFile = heapFile; this.heapFileTMP = temporaryHeapFile;
this.heapFileREADY = readyHeapFile;
this.keylength = keylength; this.keylength = keylength;
this.index = new LongHandleIndex(keylength, ordering, 10, 100000); this.index = new LongHandleIndex(keylength, ordering, 10, 100000);
this.os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(heapFile), 8 * 1024 * 1024)); this.os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(temporaryHeapFile), 8 * 1024 * 1024));
//this.doublecheck = new HashSet<String>(); //this.doublecheck = new HashSet<String>();
this.seek = 0; this.seek = 0;
} }
@ -91,7 +94,7 @@ public final class HeapWriter {
assert blob.length > 0; assert blob.length > 0;
assert key.length == this.keylength; assert key.length == this.keylength;
assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length; assert index.row().primaryKeyLength == key.length : index.row().primaryKeyLength + "!=" + key.length;
assert index.get(key) < 0 : "index.get(key) = " + index.get(key) + ", index.size() = " + index.size() + ", file.length() = " + this.heapFile.length() + ", key = " + new String(key); // must not occur before assert index.get(key) < 0 : "index.get(key) = " + index.get(key) + ", index.size() = " + index.size() + ", file.length() = " + this.heapFileTMP.length() + ", key = " + new String(key); // must not occur before
if ((blob == null) || (blob.length == 0)) return; if ((blob == null) || (blob.length == 0)) return;
int chunkl = key.length + blob.length; int chunkl = key.length + blob.length;
os.writeInt(chunkl); os.writeInt(chunkl);
@ -102,18 +105,19 @@ public final class HeapWriter {
this.seek += chunkl + 4; this.seek += chunkl + 4;
} }
protected static File fingerprintIndexFile(File f) { protected static File fingerprintIndexFile(File f, String fingerprint) {
assert f != null; assert f != null;
return new File(f.getParentFile(), f.getName() + "." + fingerprintFileHash(f) + ".idx"); return new File(f.getParentFile(), f.getName() + "." + fingerprint + ".idx");
} }
protected static File fingerprintGapFile(File f) { protected static File fingerprintGapFile(File f, String fingerprint) {
assert f != null; assert f != null;
return new File(f.getParentFile(), f.getName() + "." + fingerprintFileHash(f) + ".gap"); return new File(f.getParentFile(), f.getName() + "." + fingerprint + ".gap");
} }
protected static String fingerprintFileHash(File f) { protected static String fingerprintFileHash(File f) {
assert f != null; assert f != null;
assert f.exists() : "file = " + f.toString();
String fp = Digest.fastFingerprintB64(f, false); String fp = Digest.fastFingerprintB64(f, false);
assert fp != null : "file = " + f.toString(); assert fp != null : "file = " + f.toString();
return fp.substring(0, 12); return fp.substring(0, 12);
@ -133,6 +137,7 @@ public final class HeapWriter {
* @throws * @throws
*/ */
public synchronized void close(boolean writeIDX) { public synchronized void close(boolean writeIDX) {
// close the file
try { try {
os.flush(); os.flush();
os.close(); os.close();
@ -141,14 +146,21 @@ public final class HeapWriter {
} }
os = null; os = null;
// rename the file into final name
this.heapFileTMP.renameTo(this.heapFileREADY);
assert this.heapFileREADY.exists() : this.heapFileREADY.toString();
assert !this.heapFileTMP.exists() : this.heapFileTMP.toString();
// generate index and gap files
if (writeIDX && index.size() > 3) { if (writeIDX && index.size() > 3) {
// now we can create a dump of the index and the gap information // now we can create a dump of the index and the gap information
// to speed up the next start // to speed up the next start
try { try {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
new Gap().dump(fingerprintGapFile(this.heapFile)); String fingerprint = HeapWriter.fingerprintFileHash(this.heapFileREADY);
index.dump(fingerprintIndexFile(this.heapFile)); new Gap().dump(fingerprintGapFile(this.heapFileREADY, fingerprint));
Log.logInfo("kelondroBLOBHeapWriter", "wrote a dump for the " + this.index.size() + " index entries of " + heapFile.getName()+ " in " + (System.currentTimeMillis() - start) + " milliseconds."); index.dump(fingerprintIndexFile(this.heapFileREADY, fingerprint));
Log.logInfo("kelondroBLOBHeapWriter", "wrote a dump for the " + this.index.size() + " index entries of " + heapFileREADY.getName()+ " in " + (System.currentTimeMillis() - start) + " milliseconds.");
index.close(); index.close();
index = null; index = null;
} catch (IOException e) { } catch (IOException e) {
@ -160,5 +172,5 @@ public final class HeapWriter {
index = null; index = null;
} }
} }
} }

@ -98,8 +98,9 @@ public class LongHandleIndex {
// we must use an iterator from the combined index, because we need the entries sorted // we must use an iterator from the combined index, because we need the entries sorted
// otherwise we could just write the byte[] from the in kelondroRowSet which would make // otherwise we could just write the byte[] from the in kelondroRowSet which would make
// everything much faster, but this is not an option here. // everything much faster, but this is not an option here.
File tmp = new File(file.getParentFile(), file.getName() + ".tmp");
Iterator<Row.Entry> i = this.index.rows(true, null); Iterator<Row.Entry> i = this.index.rows(true, null);
OutputStream os = new BufferedOutputStream(new FileOutputStream(file), 4 * 1024 * 1024); OutputStream os = new BufferedOutputStream(new FileOutputStream(tmp), 4 * 1024 * 1024);
int c = 0; int c = 0;
while (i.hasNext()) { while (i.hasNext()) {
os.write(i.next().bytes()); os.write(i.next().bytes());
@ -107,6 +108,9 @@ public class LongHandleIndex {
} }
os.flush(); os.flush();
os.close(); os.close();
tmp.renameTo(file);
assert file.exists() : file.toString();
assert !tmp.exists() : tmp.toString();
return c; return c;
} }

@ -239,7 +239,8 @@ public class IODispatcher extends Thread {
} }
assert i1.hasNext(); assert i1.hasNext();
assert i2.hasNext(); assert i2.hasNext();
HeapWriter writer = new HeapWriter(newFile, array.keylength(), array.ordering()); File tmpFile = new File(newFile.getParentFile(), newFile.getName() + ".tmp");
HeapWriter writer = new HeapWriter(tmpFile, newFile, array.keylength(), array.ordering());
merge(i1, i2, array.ordering(), writer); merge(i1, i2, array.ordering(), writer);
writer.close(true); writer.close(true);
// we don't need the old files any more // we don't need the old files any more

@ -80,7 +80,7 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn
this.lastCleanup = System.currentTimeMillis(); this.lastCleanup = System.currentTimeMillis();
this.targetFileSize = targetFileSize; this.targetFileSize = targetFileSize;
this.maxFileSize = maxFileSize; this.maxFileSize = maxFileSize;
cacheCleanup(); cleanCache();
} }
@ -96,15 +96,13 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn
public synchronized void add(ReferenceContainer newEntries) throws IOException { public synchronized void add(ReferenceContainer newEntries) throws IOException {
this.ram.add(newEntries); this.ram.add(newEntries);
serverProfiling.update("wordcache", Long.valueOf(this.ram.size()), true); serverProfiling.update("wordcache", Long.valueOf(this.ram.size()), true);
cacheDumpIfNecessary(); cleanCache();
cacheCleanup();
} }
public synchronized void add(String hash, ReferenceRow entry) throws IOException { public synchronized void add(String hash, ReferenceRow entry) throws IOException {
this.ram.add(hash, entry); this.ram.add(hash, entry);
serverProfiling.update("wordcache", Long.valueOf(this.ram.size()), true); serverProfiling.update("wordcache", Long.valueOf(this.ram.size()), true);
cacheDumpIfNecessary(); cleanCache();
cacheCleanup();
} }
/** /**
@ -159,7 +157,7 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn
return c0; return c0;
} }
this.array.delete(wordHash); this.array.delete(wordHash);
cacheCleanup(); cleanCache();
if (c0 == null) return c1; if (c0 == null) return c1;
return c1.merge(c0); return c1.merge(c0);
} }
@ -274,16 +272,22 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn
* cache control methods * cache control methods
*/ */
private synchronized void cacheDumpIfNecessary() { private synchronized void cleanCache() {
if (this.ram.size() > this.maxRamEntries || MemoryControl.available() < 20 * 1024 * 1024) { // dump the cache if necessary
if (this.ram.size() > this.maxRamEntries || (this.ram.size() > 3000 && MemoryControl.available() < 50 * 1024 * 1024)) {
try { try {
cacheDump(); cacheDump();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
// clean-up the cache
if (this.lastCleanup + cleanupCycle > System.currentTimeMillis()) return;
this.array.shrink(this.targetFileSize, this.maxFileSize);
this.lastCleanup = System.currentTimeMillis();
} }
private synchronized void cacheDump() throws IOException { private synchronized void cacheDump() throws IOException {
// dump the ram // dump the ram
File dumpFile = this.array.newContainerBLOBFile(); File dumpFile = this.array.newContainerBLOBFile();
@ -295,12 +299,6 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn
this.ram.initWriteMode(); this.ram.initWriteMode();
} }
private synchronized void cacheCleanup() throws IOException {
if (this.lastCleanup + cleanupCycle > System.currentTimeMillis()) return;
this.array.shrink(this.targetFileSize, this.maxFileSize);
this.lastCleanup = System.currentTimeMillis();
}
public File newContainerBLOBFile() { public File newContainerBLOBFile() {
// for migration of cache files // for migration of cache files
return this.array.newContainerBLOBFile(); return this.array.newContainerBLOBFile();
@ -315,12 +313,10 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn
// do nothing // do nothing
} }
public int getBackendSize() { public int getBackendSize() {
return this.array.size(); return this.array.size();
} }
public long getBufferMaxAge() { public long getBufferMaxAge() {
return System.currentTimeMillis(); return System.currentTimeMillis();
} }

@ -245,32 +245,38 @@ public final class ReferenceContainerArray {
return this.array.entries(); return this.array.entries();
} }
public synchronized boolean shrink(long targetFileSize, long maxFileSize) throws IOException { public synchronized boolean shrink(long targetFileSize, long maxFileSize) {
if (this.array.entries() < 2) return false; if (this.array.entries() < 2) return false;
if (this.merger.queueLength() > 0) return false; boolean donesomething = false;
File[] ff = this.array.unmountBestMatch(2.0, targetFileSize); // first try to merge small files that match
if (ff != null) { while (this.merger.queueLength() < 3) {
Log.logInfo("RICELL-shrink", "doing unmountBestMatch(2.0, " + targetFileSize + ")"); File[] ff = this.array.unmountBestMatch(2.0, targetFileSize);
if (ff == null) break;
Log.logInfo("RICELL-shrink", "unmountBestMatch(2.0, " + targetFileSize + ")");
merger.merge(ff[0], ff[1], this.array, this.payloadrow, newContainerBLOBFile()); merger.merge(ff[0], ff[1], this.array, this.payloadrow, newContainerBLOBFile());
return true; donesomething = true;
} }
ff = this.array.unmountSmallest(targetFileSize); // then try to merge simply any small file
if (ff != null) { while (this.merger.queueLength() < 2) {
Log.logInfo("RICELL-shrink", "doing unmountSmallest(" + targetFileSize + ")"); File[] ff = this.array.unmountSmallest(targetFileSize);
if (ff == null) break;
Log.logInfo("RICELL-shrink", "unmountSmallest(" + targetFileSize + ")");
merger.merge(ff[0], ff[1], this.array, this.payloadrow, newContainerBLOBFile()); merger.merge(ff[0], ff[1], this.array, this.payloadrow, newContainerBLOBFile());
return true; donesomething = true;
} }
ff = this.array.unmountBestMatch(2.0, maxFileSize); // if there is no small file, then merge matching files up to limit
if (ff != null) { while (this.merger.queueLength() < 1) {
Log.logInfo("RICELL-shrink", "doing unmountBestMatch(2.0, " + maxFileSize + ")"); File[] ff = this.array.unmountBestMatch(2.0, maxFileSize);
if (ff == null) break;
Log.logInfo("RICELL-shrink", "unmountBestMatch(2.0, " + maxFileSize + ")");
merger.merge(ff[0], ff[1], this.array, this.payloadrow, newContainerBLOBFile()); merger.merge(ff[0], ff[1], this.array, this.payloadrow, newContainerBLOBFile());
return true; donesomething = true;
} }
return false; return donesomething;
} }

@ -114,7 +114,8 @@ public final class ReferenceContainerCache extends AbstractIndex implements Inde
assert this.cache != null; assert this.cache != null;
Log.logInfo("indexContainerRAMHeap", "creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); Log.logInfo("indexContainerRAMHeap", "creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's");
if (heapFile.exists()) FileUtils.deletedelete(heapFile); if (heapFile.exists()) FileUtils.deletedelete(heapFile);
HeapWriter dump = new HeapWriter(heapFile, payloadrow.primaryKeyLength, Base64Order.enhancedCoder); File tmpFile = new File(heapFile.getParentFile(), heapFile.getName() + ".tmp");
HeapWriter dump = new HeapWriter(tmpFile, heapFile, payloadrow.primaryKeyLength, Base64Order.enhancedCoder);
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
long wordcount = 0, urlcount = 0; long wordcount = 0, urlcount = 0;
String wordHash = null, lwh; String wordHash = null, lwh;

Loading…
Cancel
Save