From 4a2dac659ea7171e519ca06812238f0f887f1e0c Mon Sep 17 00:00:00 2001 From: orbiter Date: Fri, 5 Dec 2008 13:55:48 +0000 Subject: [PATCH] more speed hacks: - modified and activated write buffer - increased cache flush factor - fixed a problem with deadlocking of indexing process git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5382 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- htroot/Threaddump_p.java | 2 +- source/de/anomic/index/indexWord.java | 4 +- .../anomic/kelondro/kelondroAbstractRA.java | 1 + .../kelondro/kelondroAbstractRecords.java | 9 +- .../kelondro/kelondroBufferedIOChunks.java | 136 +++++-------- .../de/anomic/kelondro/kelondroCachedRA.java | 191 +++++++++--------- .../kelondro/kelondroCachedRecords.java | 19 +- .../de/anomic/plasma/plasmaSwitchboard.java | 6 +- source/de/anomic/plasma/plasmaWordIndex.java | 2 +- 9 files changed, 164 insertions(+), 206 deletions(-) diff --git a/htroot/Threaddump_p.java b/htroot/Threaddump_p.java index ff2ed8a3c..f99124d60 100644 --- a/htroot/Threaddump_p.java +++ b/htroot/Threaddump_p.java @@ -98,7 +98,7 @@ public class Threaddump_p { bufferappend(buffer, plain, "************* End Thread Dump " + dt + " *******************"); - prop.putHTML("plain_content", buffer.toString()); + prop.put("plain_content", buffer.toString()); prop.put("plain", (plain) ? 1 : 0); return prop; // return from serverObjects respond() diff --git a/source/de/anomic/index/indexWord.java b/source/de/anomic/index/indexWord.java index d6959b888..185e54d61 100644 --- a/source/de/anomic/index/indexWord.java +++ b/source/de/anomic/index/indexWord.java @@ -26,8 +26,8 @@ package de.anomic.index; -import java.util.HashMap; import java.util.HashSet; +import java.util.Hashtable; import java.util.Iterator; import java.util.Locale; import java.util.Set; @@ -78,7 +78,7 @@ public class indexWord { // static methods // create a word hash - private static final HashMap hashCache = new HashMap(); + private static final Hashtable hashCache = new Hashtable(); public static final String word2hash(final String word) { String h = hashCache.get(word); if (h != null) return h; diff --git a/source/de/anomic/kelondro/kelondroAbstractRA.java b/source/de/anomic/kelondro/kelondroAbstractRA.java index 052d93ff6..c223e1460 100644 --- a/source/de/anomic/kelondro/kelondroAbstractRA.java +++ b/source/de/anomic/kelondro/kelondroAbstractRA.java @@ -49,6 +49,7 @@ abstract class kelondroAbstractRA implements kelondroRA { } // pseudo-native methods: + abstract public void readFully(byte[] b, int off, int len) throws IOException; abstract public long length() throws IOException; abstract public long available() throws IOException; abstract public void write(byte[] b, int off, int len) throws IOException; diff --git a/source/de/anomic/kelondro/kelondroAbstractRecords.java b/source/de/anomic/kelondro/kelondroAbstractRecords.java index ad1a1c7fa..cf3a2f102 100644 --- a/source/de/anomic/kelondro/kelondroAbstractRecords.java +++ b/source/de/anomic/kelondro/kelondroAbstractRecords.java @@ -337,7 +337,6 @@ public abstract class kelondroAbstractRecords implements kelondroRecords { } } - public kelondroAbstractRecords(final File file, final boolean useNodeCache, final short ohbytec, final short ohhandlec, final kelondroRow rowdef, final int FHandles, final int txtProps, final int txtPropWidth) throws IOException { @@ -357,10 +356,11 @@ public abstract class kelondroAbstractRecords implements kelondroRecords { if (file.exists()) { // opens an existing tree this.filename = file.getCanonicalPath(); - final kelondroRA raf = (useChannel) ? new kelondroChannelRA(new File(this.filename)) : new kelondroFileRA(new File(this.filename)); + kelondroRA raf = (useChannel) ? new kelondroChannelRA(new File(this.filename)) : new kelondroFileRA(new File(this.filename)); //kelondroRA raf = new kelondroBufferedRA(new kelondroFileRA(this.filename), 1024, 100); //kelondroRA raf = new kelondroCachedRA(new kelondroFileRA(this.filename), 5000000, 1000); //kelondroRA raf = new kelondroNIOFileRA(this.filename, (file.length() < 4000000), 10000); + //raf = new kelondroCachedRA(raf); initExistingFile(raf, useNodeCache); } else { this.filename = file.getCanonicalPath(); @@ -525,10 +525,11 @@ public abstract class kelondroAbstractRecords implements kelondroRecords { readOrderType(); } - private void initExistingFile(final kelondroRA ra, final boolean useBuffer) throws IOException { + private void initExistingFile(final kelondroRA ra, boolean useBuffer) throws IOException { // read from Chunked IO + //useBuffer = false; if (useBuffer) { - this.entryFile = new kelondroBufferedIOChunks(ra, ra.name(), 0, 30000 + random.nextLong() % 30000); + this.entryFile = new kelondroBufferedIOChunks(ra, ra.name(), 1024*1024, 30000 + random.nextLong() % 30000); } else { this.entryFile = new kelondroRAIOChunks(ra, ra.name()); } diff --git a/source/de/anomic/kelondro/kelondroBufferedIOChunks.java b/source/de/anomic/kelondro/kelondroBufferedIOChunks.java index 373fe8739..6deb1e5ee 100644 --- a/source/de/anomic/kelondro/kelondroBufferedIOChunks.java +++ b/source/de/anomic/kelondro/kelondroBufferedIOChunks.java @@ -26,29 +26,21 @@ package de.anomic.kelondro; import java.io.IOException; -import java.util.Iterator; -import java.util.Map; -import java.util.TreeMap; public final class kelondroBufferedIOChunks extends kelondroAbstractIOChunks implements kelondroIOChunks { protected kelondroRA ra; - private final long bufferMaxSize; - private long bufferCurrSize; + private int bufferSize; private final long commitTimeout; - private final TreeMap buffer; + private final byte[] buffer; private long lastCommit = 0; - private static final int overhead = 40; - - - public kelondroBufferedIOChunks(final kelondroRA ra, final String name, final long buffer, final long commitTimeout) { + public kelondroBufferedIOChunks(final kelondroRA ra, final String name, final int buffersize, final long commitTimeout) { this.name = name; this.ra = ra; - this.bufferMaxSize = buffer; - this.bufferCurrSize = 0; + this.bufferSize = 0; this.commitTimeout = commitTimeout; - this.buffer = new TreeMap(); + this.buffer = new byte[buffersize]; // this is a buffer at the end of the file this.lastCommit = System.currentTimeMillis(); } @@ -57,104 +49,68 @@ public final class kelondroBufferedIOChunks extends kelondroAbstractIOChunks imp } public long length() throws IOException { - return ra.length(); + return ra.length() + this.bufferSize; } public synchronized void readFully(final long pos, final byte[] b, final int off, final int len) throws IOException { assert (b.length >= off + len): "read pos=" + pos + ", b.length=" + b.length + ", off=" + off + ", len=" + len; // check commit time - if ((bufferCurrSize > bufferMaxSize) || - (this.lastCommit + this.commitTimeout > System.currentTimeMillis())) { + if (this.lastCommit + this.commitTimeout > System.currentTimeMillis()) { commit(); - this.lastCommit = System.currentTimeMillis(); } // do the read - synchronized (this.buffer) { - final byte[] bb = buffer.get(Long.valueOf(pos)); - if (bb == null) { - // entry not known, read directly from IO - synchronized (this.ra) { - this.ra.seek(pos + off); - ra.readFully(b, off, len); - return; - } - } - // use buffered entry - if (bb.length >= off + len) { - // the buffered entry is long enough - System.arraycopy(bb, off, b, off, len); - return; - } - // the entry is not long enough. transmit only a part - System.arraycopy(bb, off, b, off, bb.length - off); - return; + if (pos >= this.ra.length()) { + // read from the buffer + System.arraycopy(this.buffer, (int) (pos - this.ra.length()), b, off, len); + } else if (pos + len >= this.ra.length()) { + // the content is partly in the file and partly in the buffer + commit(); + this.ra.seek(pos); + ra.readFully(b, off, len); + } else { + // read from the file + this.ra.seek(pos); + ra.readFully(b, off, len); } } public synchronized void write(final long pos, final byte[] b, final int off, final int len) throws IOException { assert (b.length >= off + len): "write pos=" + pos + ", b.length=" + b.length + ", b='" + new String(b) + "', off=" + off + ", len=" + len; - - //if (len > 10) System.out.println("WRITE(" + name + ", " + pos + ", " + b.length + ", " + off + ", " + len + ")"); - - // do the write into buffer - final byte[] bb = kelondroObjectSpace.alloc(len); - System.arraycopy(b, off, bb, 0, len); - synchronized (buffer) { - buffer.put(Long.valueOf(pos + off), bb); - bufferCurrSize += overhead + len; - } + //assert pos <= this.ra.length(): "pos = " + pos + ", this.ra.length() = " + this.ra.length(); - // check commit time - if ((bufferCurrSize > bufferMaxSize) || - (this.lastCommit + this.commitTimeout > System.currentTimeMillis())) { + if (pos >= this.ra.length()) { + // the position is fully outside of the file + if (pos - this.ra.length() + len > this.buffer.length) { + // this does not fit into the buffer + commit(); + this.ra.seek(pos); + this.ra.write(b, off, len); + return; + } + System.arraycopy(b, off, this.buffer, (int) (pos - this.ra.length()), len); + this.bufferSize = (int) Math.max(this.bufferSize, pos - this.ra.length() + len); + return; + } else if (pos + len >= this.ra.length()) { + // the content is partly in the file and partly in the buffer commit(); - this.lastCommit = System.currentTimeMillis(); + this.ra.seek(pos); + this.ra.write(b, off, len); + return; + } else { + // the position is fully inside the file + this.ra.seek(pos); + this.ra.write(b, off, len); + return; } } public synchronized void commit() throws IOException { - synchronized (buffer) { - if (buffer.size() == 0) return; - final Iterator> i = buffer.entrySet().iterator(); - Map.Entry entry = i.next(); - long lastPos = (entry.getKey()).longValue(); - byte[] lastChunk = entry.getValue(); - long nextPos; - byte[] nextChunk, tmpChunk; - synchronized (this.ra) { - while (i.hasNext()) { - entry = i.next(); - nextPos = (entry.getKey()).longValue(); - nextChunk = entry.getValue(); - if (lastPos + lastChunk.length == nextPos) { - // try to combine the new chunk with the previous chunk - //System.out.println("combining chunks pos0=" + lastPos + ", chunk0.length=" + lastChunk.length + ", pos1=" + nextPos + ", chunk1.length=" + nextChunk.length); - tmpChunk = kelondroObjectSpace.alloc(lastChunk.length + nextChunk.length); - System.arraycopy(lastChunk, 0, tmpChunk, 0, lastChunk.length); - System.arraycopy(nextChunk, 0, tmpChunk, lastChunk.length, nextChunk.length); - kelondroObjectSpace.recycle(lastChunk); - lastChunk = tmpChunk; - tmpChunk = null; - kelondroObjectSpace.recycle(nextChunk); - } else { - // write the last chunk and take nextChunk next time als lastChunk - this.ra.seek(lastPos); - this.ra.write(lastChunk); - kelondroObjectSpace.recycle(lastChunk); - lastPos = nextPos; - lastChunk = nextChunk; - } - } - // at the end write just the last chunk - this.ra.seek(lastPos); - this.ra.write(lastChunk); - kelondroObjectSpace.recycle(lastChunk); - } - buffer.clear(); - bufferCurrSize = 0; - } + this.ra.seek(this.ra.length()); // move to end of file + this.ra.write(this.buffer, 0, this.bufferSize); + this.bufferSize = 0; + this.lastCommit = System.currentTimeMillis(); } public synchronized void close() throws IOException { diff --git a/source/de/anomic/kelondro/kelondroCachedRA.java b/source/de/anomic/kelondro/kelondroCachedRA.java index 283beb896..23f1a8eb1 100644 --- a/source/de/anomic/kelondro/kelondroCachedRA.java +++ b/source/de/anomic/kelondro/kelondroCachedRA.java @@ -2,10 +2,9 @@ // ----------------------- // part of The Kelondro Database // (C) by Michael Peter Christen; mc@yacy.net -// first published on http://www.anomic.de -// Frankfurt, Germany, 2004, 2005 -// last major change: 06.10.2004 -// this file was previously named kelondroBufferedRA +// first published on http://yacy.net +// Frankfurt, Germany, 2004-2008 +// last major change: 04.12.2008 // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -26,153 +25,159 @@ package de.anomic.kelondro; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; +import java.util.Map; + +import de.anomic.server.serverMemory; public class kelondroCachedRA extends kelondroAbstractRA implements kelondroRA { + // a shared cache for all users of this class + private static final int elementsize = 8192; + private static final int remainingfree = 30 * 1024 * 1024; + private static HashMap cacheMemory = new HashMap(); + + // class variables protected kelondroRA ra; - protected kelondroMScoreCluster cacheScore; - protected HashMap cacheMemory; - private final int cacheMaxElements; - private final int cacheElementSize; private long seekpos; + private String id; - public kelondroCachedRA(final kelondroRA ra, final int cachesize, final int elementsize) { - this.ra = ra; + public kelondroCachedRA(final kelondroRA ra) { + this.ra = ra; this.name = ra.name(); this.file = ra.file(); - this.cacheMemory = new HashMap(); - this.cacheScore = new kelondroMScoreCluster(); - this.cacheElementSize = elementsize; - this.cacheMaxElements = cachesize / cacheElementSize; + this.id = file.toString(); this.seekpos = 0; } - - public long length() throws IOException { - return ra.available(); + + public synchronized long length() throws IOException { + return ra.length(); } - public long available() throws IOException { - synchronized (ra) { - ra.seek(seekpos); - return ra.available(); - } + public synchronized long available() throws IOException { + return ra.length() - seekpos; } private int cacheElementNumber(final long address) { - return (int) address / cacheElementSize; + return (int) address / elementsize; } private int cacheElementOffset(final long address) { - return (int) address % cacheElementSize; + return (int) address % elementsize; } private byte[] readCache(final int cacheNr) throws IOException { - final Integer cacheNrI = Integer.valueOf(cacheNr); - byte[] cache = cacheMemory.get(cacheNrI); + String key = this.id + cacheNr; + byte[] cache = cacheMemory.get(key); if (cache == null) { - if (cacheMemory.size() >= cacheMaxElements) { + if (serverMemory.available() < remainingfree) { // delete elements in buffer if buffer too big - final Iterator it = cacheScore.scores(true); - final Integer element = it.next(); - writeCache(cacheMemory.get(element), element.intValue()); - cacheMemory.remove(element); - cacheScore.deleteScore(element); + synchronized(cacheMemory) { + Iterator> i = cacheMemory.entrySet().iterator(); + for (int j = 0; j < 10; j++) { + if (!i.hasNext()) break; + i.next(); + i.remove(); + } + } } + // check if we have enough space in the file to read a complete cache element + long seek = cacheNr * (long) elementsize; + if (ra.length() - seek < elementsize) return null; // add new element - cache = new byte[cacheElementSize]; - //System.out.println("buffernr=" + bufferNr + ", elSize=" + bufferElementSize); - ra.seek(cacheNr * (long) cacheElementSize); - ra.readFully(cache, 0, cacheElementSize); - cacheMemory.put(cacheNrI, cache); + cache = new byte[elementsize]; + ra.seek(seek); + ra.readFully(cache, 0, elementsize); + cacheMemory.put(key, cache); } - cacheScore.setScore(cacheNrI, (int) (0xFFFFFFFFL & System.currentTimeMillis())); return cache; } - private void writeCache(final byte[] cache, final int cacheNr) throws IOException { - if (cache == null) return; - final Integer cacheNrI = Integer.valueOf(cacheNr); - ra.seek(cacheNr * (long) cacheElementSize); - ra.write(cache, 0, cacheElementSize); - cacheScore.setScore(cacheNrI, (int) (0xFFFFFFFFL & System.currentTimeMillis())); + private boolean existCache(final int cacheNr) throws IOException { + return cacheMemory.containsKey(Integer.valueOf(cacheNr)); } - // pseudo-native method read - public int read() throws IOException { - final int bn = cacheElementNumber(seekpos); - final int offset = cacheElementOffset(seekpos); - seekpos++; - return 0xFF & readCache(bn)[offset]; - } - - // pseudo-native method write - public void write(final int b) throws IOException { - final int bn = cacheElementNumber(seekpos); - final int offset = cacheElementOffset(seekpos); - final byte[] cache = readCache(bn); - seekpos++; - cache[offset] = (byte) b; - //writeBuffer(buffer, bn); - } - - public void readFully(byte[] b, int off, int len) throws IOException { + public synchronized void readFully(byte[] b, int off, int len) throws IOException { final int bn1 = cacheElementNumber(seekpos); final int bn2 = cacheElementNumber(seekpos + len - 1); final int offset = cacheElementOffset(seekpos); - final byte[] buffer = readCache(bn1); + final byte[] cache = readCache(bn1); if (bn1 == bn2) { // simple case - //System.out.println("C1: bn1=" + bn1 + ", offset=" + offset + ", off=" + off + ", len=" + len); - System.arraycopy(buffer, offset, b, off, len); + if (cache == null) { + ra.seek(seekpos); + ra.readFully(b, off, len); + seekpos += len; + return; + } + //System.out.println("cache hit"); + System.arraycopy(cache, offset, b, off, len); seekpos += len; return; } + assert cache != null; // do recursively - final int thislen = cacheElementSize - offset; - //System.out.println("C2: bn1=" + bn1 + ", bn2=" + bn2 +", offset=" + offset + ", off=" + off + ", len=" + len + ", thislen=" + thislen); - System.arraycopy(buffer, offset, b, off, thislen); + final int thislen = elementsize - offset; + System.arraycopy(cache, offset, b, off, thislen); seekpos += thislen; readFully(b, off + thislen, len - thislen); - return; } - public void write(final byte[] b, final int off, final int len) throws IOException { + public synchronized void write(final byte[] b, final int off, final int len) throws IOException { final int bn1 = cacheElementNumber(seekpos); final int bn2 = cacheElementNumber(seekpos + len - 1); final int offset = cacheElementOffset(seekpos); - final byte[] cache = readCache(bn1); if (bn1 == bn2) { - // simple case - System.arraycopy(b, off, cache, offset, len); + if (existCache(bn1)) { + // write to cache and file; here: write only to cache + final byte[] cache = readCache(bn1); + assert cache != null; + System.arraycopy(b, off, cache, offset, len); + } else { + // in case that the cache could be filled completely + // create a new entry here and store it also to the cache + if (offset == 0 && len >= elementsize) { + final byte[] cache = new byte[elementsize]; + System.arraycopy(b, off, cache, 0, elementsize); + cacheMemory.put(this.id + bn1, cache); + } + } + // write to file + ra.seek(seekpos); + ra.write(b, off, len); seekpos += len; - //writeBuffer(buffer, bn1); - } else { - // do recursively - final int thislen = cacheElementSize - offset; + return; + } + + // do recursively + final int thislen = elementsize - offset; + if (existCache(bn1)) { + // write to cache and file; here: write only to cache + final byte[] cache = readCache(bn1); + assert cache != null; System.arraycopy(b, off, cache, offset, thislen); - seekpos += thislen; - //writeBuffer(buffer, bn1); - write(b, off + thislen, len - thislen); + } else { + // in case that the cache could be filled completely + // create a new entry here and store it also to the cache + if (offset == 0 && len >= elementsize) { + final byte[] cache = new byte[elementsize]; + System.arraycopy(b, off, cache, 0, elementsize); + cacheMemory.put(this.id + bn1, cache); + } } + // write to file + ra.seek(seekpos); + ra.write(b, off, thislen); + seekpos += thislen; + write(b, off + thislen, len - thislen); } - public void seek(final long pos) throws IOException { + public synchronized void seek(final long pos) throws IOException { seekpos = pos; } - public void close() throws IOException { - // write all unwritten buffers - if (cacheMemory == null) return; - final Iterator it = cacheScore.scores(true); - while (it.hasNext()) { - final Integer element = it.next(); - writeCache(cacheMemory.get(element), element.intValue()); - cacheMemory.remove(element); - } + public synchronized void close() throws IOException { ra.close(); - cacheScore = null; - cacheMemory = null; } protected void finalize() { diff --git a/source/de/anomic/kelondro/kelondroCachedRecords.java b/source/de/anomic/kelondro/kelondroCachedRecords.java index 0f3557de8..96a37632d 100644 --- a/source/de/anomic/kelondro/kelondroCachedRecords.java +++ b/source/de/anomic/kelondro/kelondroCachedRecords.java @@ -46,18 +46,13 @@ public class kelondroCachedRecords extends kelondroAbstractRecords implements ke private static long memStartShrink = 6000000; // a limit for the node cache to start with shrinking if less than this memory amount is available // caching buffer - kelondroIntBytesMap cacheHeaders; // the cache; holds overhead values and key element - int readHit; - - int readMiss; - - int writeUnique; - - int writeDouble; - - int cacheDelete; - - int cacheFlush; + private kelondroIntBytesMap cacheHeaders; // the cache; holds overhead values and key element + private int readHit; + private int readMiss; + private int writeUnique; + private int writeDouble; + private int cacheDelete; + private int cacheFlush; public kelondroCachedRecords( diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 2f3aa720b..04fe2da69 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -563,10 +563,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch(this, "storeDocumentIndex", 1, null); + indexingStorageProcessor = new serverProcessor(this, "storeDocumentIndex", serverProcessor.useCPU, null, 1); indexingAnalysisProcessor = new serverProcessor(this, "webStructureAnalysis", serverProcessor.useCPU + 1, indexingStorageProcessor); - indexingCondensementProcessor = new serverProcessor(this, "condenseDocument", serverProcessor.useCPU + 1, indexingAnalysisProcessor); - indexingDocumentProcessor = new serverProcessor(this, "parseDocument", serverProcessor.useCPU + 1, indexingCondensementProcessor); + indexingCondensementProcessor = new serverProcessor(this, "condenseDocument", serverProcessor.useCPU + 2, indexingAnalysisProcessor); + indexingDocumentProcessor = new serverProcessor(this, "parseDocument", serverProcessor.useCPU + 3, indexingCondensementProcessor); // deploy busy threads log.logConfig("Starting Threads"); diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index a713a6dc3..ffedf68e0 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -78,7 +78,7 @@ public final class plasmaWordIndex implements indexRI { // environment constants public static final long wCacheMaxAge = 1000 * 60 * 30; // milliseconds; 30 minutes public static final int wCacheMaxChunk = 800; // maximum number of references for each urlhash - public static final int lowcachedivisor = 1800; + public static final int lowcachedivisor = 900; public static final int maxCollectionPartition = 7; // should be 7