From 1f1f398bfa5d99e6680515d295908ccbe69b2fd2 Mon Sep 17 00:00:00 2001 From: orbiter Date: Thu, 8 Feb 2007 23:21:46 +0000 Subject: [PATCH] enhanced speed of RAM cache flush by factor 20 (twenty times faster) - the speed was doubled by avoiding read access during the dump - the speed was dramatically increased at least by factor 10 by using a temporary ram-file where the structures are flushed to before it is dumped then as a whole byte-chunk to the file system. The speed enhancements also affects some other parts of the database. git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@3353 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- build.properties | 2 +- source/de/anomic/index/indexRAMRI.java | 136 ++++++----- .../anomic/kelondro/kelondroBufferedRA.java | 218 +++--------------- .../kelondro/kelondroFixedWidthArray.java | 25 +- .../de/anomic/kelondro/kelondroRecords.java | 37 ++- source/de/anomic/server/serverByteBuffer.java | 22 ++ 6 files changed, 192 insertions(+), 248 deletions(-) diff --git a/build.properties b/build.properties index e6b51439d..a05c9e08c 100644 --- a/build.properties +++ b/build.properties @@ -3,7 +3,7 @@ javacSource=1.4 javacTarget=1.4 # Release Configuration -releaseVersion=0.503 +releaseVersion=0.504 releaseFile=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz #releaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz releaseDir=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr} diff --git a/source/de/anomic/index/indexRAMRI.java b/source/de/anomic/index/indexRAMRI.java index 8a48b9504..661d2eabb 100644 --- a/source/de/anomic/index/indexRAMRI.java +++ b/source/de/anomic/index/indexRAMRI.java @@ -36,12 +36,16 @@ import java.util.SortedMap; import java.util.TreeMap; import de.anomic.kelondro.kelondroBase64Order; +import de.anomic.kelondro.kelondroBufferedRA; import de.anomic.kelondro.kelondroException; import de.anomic.kelondro.kelondroFixedWidthArray; import de.anomic.kelondro.kelondroMScoreCluster; import de.anomic.kelondro.kelondroNaturalOrder; import de.anomic.kelondro.kelondroRow; +import de.anomic.server.serverByteBuffer; +import de.anomic.server.serverFileUtils; import de.anomic.server.logging.serverLog; +import de.anomic.server.serverMemory; import de.anomic.yacy.yacySeedDB; public final class indexRAMRI implements indexRI { @@ -106,64 +110,92 @@ public final class indexRAMRI implements indexRI { File indexDumpFile = new File(databaseRoot, indexArrayFileName); if (indexDumpFile.exists()) indexDumpFile.delete(); kelondroFixedWidthArray dumpArray = null; + kelondroBufferedRA writeBuffer = null; + if (serverMemory.available() > 50 * bufferStructureBasis.objectsize() * cache.size()) { + writeBuffer = new kelondroBufferedRA(); + dumpArray = new kelondroFixedWidthArray(writeBuffer, bufferStructureBasis, 0); + log.logInfo("started dump of ram cache: " + cache.size() + " words; memory-enhanced write"); + } else { dumpArray = new kelondroFixedWidthArray(indexDumpFile, bufferStructureBasis, 0); - long startTime = System.currentTimeMillis(); - long messageTime = System.currentTimeMillis() + 5000; - long wordsPerSecond = 0, wordcount = 0, urlcount = 0; - Map.Entry entry; - String wordHash; - indexContainer container; - long updateTime; - indexRWIEntry iEntry; - kelondroRow.Entry row = dumpArray.row().newEntry(); - byte[] occ, time; - - // write wCache - synchronized (cache) { - Iterator i = cache.entrySet().iterator(); - while (i.hasNext()) { - // get entries - entry = (Map.Entry) i.next(); - wordHash = (String) entry.getKey(); - updateTime = getUpdateTime(wordHash); - container = (indexContainer) entry.getValue(); - - // put entries on stack - if (container != null) { - Iterator ci = container.entries(); - occ = kelondroNaturalOrder.encodeLong(container.size(), 4); - time = kelondroNaturalOrder.encodeLong(updateTime, 8); - while (ci.hasNext()) { - iEntry = (indexRWIEntry) ci.next(); - row.setCol(0, wordHash.getBytes()); - row.setCol(1, occ); - row.setCol(2, time); - row.setCol(3, iEntry.toKelondroEntry().bytes()); - dumpArray.overwrite((int) urlcount++, row); - } - } - wordcount++; - i.remove(); // free some mem - - // write a log - if (System.currentTimeMillis() > messageTime) { - // System.gc(); // for better statistic - wordsPerSecond = wordcount * 1000 / (1 + System.currentTimeMillis() - startTime); - log.logInfo("dumping status: " + wordcount + " words done, " + (cache.size() / (wordsPerSecond + 1)) + " seconds remaining, free mem = " + (Runtime.getRuntime().freeMemory() / 1024 / 1024) + "MB"); - messageTime = System.currentTimeMillis() + 5000; + log.logInfo("started dump of ram cache: " + cache.size() + " words; low-memory write"); + } + long startTime = System.currentTimeMillis(); + long messageTime = System.currentTimeMillis() + 5000; + long wordsPerSecond = 0, wordcount = 0, urlcount = 0; + Map.Entry entry; + String wordHash; + indexContainer container; + long updateTime; + indexRWIEntry iEntry; + kelondroRow.Entry row = dumpArray.row().newEntry(); + byte[] occ, time; + + // write wCache + synchronized (cache) { + Iterator i = cache.entrySet().iterator(); + while (i.hasNext()) { + // get entries + entry = (Map.Entry) i.next(); + wordHash = (String) entry.getKey(); + updateTime = getUpdateTime(wordHash); + container = (indexContainer) entry.getValue(); + + // put entries on stack + if (container != null) { + Iterator ci = container.entries(); + occ = kelondroNaturalOrder.encodeLong(container.size(), 4); + time = kelondroNaturalOrder.encodeLong(updateTime, 8); + while (ci.hasNext()) { + iEntry = (indexRWIEntry) ci.next(); + row.setCol(0, wordHash.getBytes()); + row.setCol(1, occ); + row.setCol(2, time); + row.setCol(3, iEntry.toKelondroEntry().bytes()); + dumpArray.overwrite((int) urlcount++, row); } } + wordcount++; + i.remove(); // free some mem + + // write a log + if (System.currentTimeMillis() > messageTime) { + // System.gc(); // for better statistic + wordsPerSecond = wordcount * 1000 + / (1 + System.currentTimeMillis() - startTime); + log.logInfo("dump status: " + wordcount + + " words done, " + + (cache.size() / (wordsPerSecond + 1)) + + " seconds remaining, free mem = " + + (Runtime.getRuntime().freeMemory() / 1024 / 1024) + + "MB"); + messageTime = System.currentTimeMillis() + 5000; + } } - dumpArray.close(); - dumpArray = null; - log.logConfig("dumped " + urlcount + " word/URL relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); + } + if (writeBuffer != null) { + serverByteBuffer bb = writeBuffer.getBuffer(); + //System.out.println("*** byteBuffer size = " + bb.length()); + serverFileUtils.write(bb.getBytes(), indexDumpFile); + writeBuffer.close(); + } + dumpArray.close(); + dumpArray = null; + log.logInfo("finished dump of ram cache: " + urlcount + " word/URL relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); } private long restore() throws IOException { File indexDumpFile = new File(databaseRoot, indexArrayFileName); if (!(indexDumpFile.exists())) return 0; - kelondroFixedWidthArray dumpArray = new kelondroFixedWidthArray(indexDumpFile, bufferStructureBasis, 0); - log.logConfig("restore array dump of index cache '" + indexArrayFileName + "', " + dumpArray.size() + " word/URL relations"); + kelondroFixedWidthArray dumpArray; + kelondroBufferedRA readBuffer = null; + if (false /*serverMemory.available() > indexDumpFile.length() * 2*/) { + readBuffer = new kelondroBufferedRA(new serverByteBuffer(serverFileUtils.read(indexDumpFile))); + dumpArray = new kelondroFixedWidthArray(readBuffer, bufferStructureBasis, 0); + log.logInfo("started restore of ram cache '" + indexArrayFileName + "', " + dumpArray.size() + " word/URL relations; memory-enhanced read"); + } else { + dumpArray = new kelondroFixedWidthArray(indexDumpFile, bufferStructureBasis, 0); + log.logInfo("started restore of ram cache '" + indexArrayFileName + "', " + dumpArray.size() + " word/URL relations; low-memory read"); + } long startTime = System.currentTimeMillis(); long messageTime = System.currentTimeMillis() + 5000; long urlCount = 0, urlsPerSecond = 0; @@ -196,12 +228,12 @@ public final class indexRAMRI implements indexRI { } } } - + if (readBuffer != null) readBuffer.close(); dumpArray.close(); - log.logConfig("restored " + cache.size() + " words in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); + log.logConfig("finished restor " + cache.size() + " words in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds"); } catch (kelondroException e) { // restore failed - log.logSevere("restore of indexCache array dump failed: " + e.getMessage(), e); + log.logSevere("failed restore of indexCache array dump: " + e.getMessage(), e); } finally { if (dumpArray != null) try {dumpArray.close();}catch(Exception e){} } diff --git a/source/de/anomic/kelondro/kelondroBufferedRA.java b/source/de/anomic/kelondro/kelondroBufferedRA.java index 261a180d1..94feec9d3 100644 --- a/source/de/anomic/kelondro/kelondroBufferedRA.java +++ b/source/de/anomic/kelondro/kelondroBufferedRA.java @@ -42,212 +42,62 @@ package de.anomic.kelondro; import java.io.IOException; +import de.anomic.server.serverByteBuffer; public class kelondroBufferedRA extends kelondroAbstractRA implements kelondroRA { - // FIXME: a lot of synchronization of ra is needed here + private serverByteBuffer sbb; + private long pos; - protected kelondroRA ra; - protected byte[] buffer; - protected int bufferPage; - protected boolean bufferWritten; - private long seekpos; - - private int bufferSizeExp; - private int bufferSize; - private int bufferOffsetFilter; - private int bufferStart; - - public kelondroBufferedRA(kelondroRA ra, int minBufferSize, int bufferStartMin) { - // calculate buffer organization parameters - this.bufferSizeExp = 0; - minBufferSize--; - while (minBufferSize > 0) {minBufferSize = minBufferSize >> 1; this.bufferSizeExp++;} - this.bufferSize = 1 << this.bufferSizeExp; - this.bufferOffsetFilter = this.bufferSize - 1; - this.bufferStart = 0; - while (this.bufferStart < bufferStartMin) this.bufferStart += this.bufferSize; - - // init buffer - this.ra = ra; - this.name = ra.name(); - this.buffer = new byte[bufferSize]; - this.seekpos = 0; - this.bufferPage = -1; - this.bufferWritten = true; - } - - public long length() throws IOException { - return ra.length(); - } - - public long available() throws IOException { - synchronized (ra) { - ra.seek(seekpos); - return ra.available(); - } + public kelondroBufferedRA() { + sbb = new serverByteBuffer(); + pos = 0; } - private void readBuffer(int newPageNr) throws IOException { - if (newPageNr == bufferPage) return; - bufferPage = newPageNr; - ra.seek(bufferPage << bufferSizeExp); - ra.readFully(buffer, 0, bufferSize); - bufferWritten = true; - } - - /* - private void writeBuffer() throws IOException { - if ((bufferWritten) || (bufferPage < 0)) return; - ra.seek(bufferPage << bufferSizeExp); - ra.write(buffer, 0, bufferSize); - bufferWritten = true; + public kelondroBufferedRA(serverByteBuffer bb) { + sbb = bb; + pos = 0; } - */ - private void updateToBuffer(int newPageNr) throws IOException { - if (newPageNr != bufferPage) { - //writeBuffer(); - readBuffer(newPageNr); - } + public serverByteBuffer getBuffer() { + return this.sbb; } - // pseudo-native method read - public int read() throws IOException { - if (seekpos < bufferStart) { - // do not use buffer - ra.seek(seekpos); - int r = ra.read(); - seekpos++; - return r; - } - - int bn = (int) seekpos >> bufferSizeExp; // buffer page number - int offset = (int) seekpos & bufferOffsetFilter; // buffer page offset - seekpos++; - updateToBuffer(bn); - return 0xFF & buffer[offset]; + public long available() throws IOException { + return Long.MAX_VALUE - sbb.length(); } - // pseudo-native method write - public void write(int b) throws IOException { - if (seekpos < bufferStart) { - // do not use buffer - ra.seek(seekpos); - ra.write(b); - seekpos++; - } else { - // write to ra direkt - ra.seek(seekpos); - ra.write(b); - - // and write also to buffer - int bn = (int) seekpos >> bufferSizeExp; // buffer page number - int offset = (int) seekpos & bufferOffsetFilter; // buffer page offset - updateToBuffer(bn); - buffer[offset] = (byte) b; - bufferWritten = false; - - // update seek pos - seekpos++; - } + public void close() throws IOException { + sbb = null; } - public int read(byte[] b, int off, int len) throws IOException { + public long length() throws IOException { + return sbb.length(); + } - // check buffer size - if (seekpos < bufferStart) { - // do not use buffer - ra.seek(seekpos); - int r = ra.read(b, off, len); - seekpos += len; - return r; - } - - // check simple case - int bn1 = (int) seekpos >> bufferSizeExp; // buffer page number, first position - int bn2 = (int) (seekpos + len - 1) >> bufferSizeExp; // buffer page number, last position - int offset = (int) seekpos & bufferOffsetFilter; // buffer page offset - updateToBuffer(bn1); - if (bn1 == bn2) { - // simple case - System.arraycopy(buffer, offset, b, off, len); - seekpos += len; - return len; - } - - // do recursively - int thislen = bufferSize - offset; - System.arraycopy(buffer, offset, b, off, thislen); - seekpos += thislen; - return thislen + read(b, off + thislen, len - thislen); + public int read() throws IOException { + return (int) 0xff & sbb.byteAt((int) pos++); } - public void write(byte[] b, int off, int len) throws IOException { - if (seekpos < bufferStart) { - // do not use buffer - ra.seek(seekpos); - ra.write(b, off, len); - seekpos += len; - } else { - int bn1 = (int) seekpos >> bufferSizeExp; // buffer page number, first position - int bn2 = (int) (seekpos + len - 1) >> bufferSizeExp; // buffer page number, last position - int offset = (int) seekpos & bufferOffsetFilter; // buffer page offset - updateToBuffer(bn1); - if (bn1 == bn2) { - // simple case - ra.seek(seekpos); - ra.write(b, off, len); - - System.arraycopy(b, off, buffer, offset, len); - bufferWritten = false; - seekpos += len; - } else { - // do recursively - int thislen = bufferSize - offset; - - ra.seek(seekpos); - ra.write(b, off, thislen); - - System.arraycopy(b, off, buffer, offset, thislen); - bufferWritten = false; - seekpos += thislen; - write(b, off + thislen, len - thislen); - } - } + public int read(byte[] b, int off, int len) throws IOException { + byte[] g = sbb.getBytes((int) pos, (int) pos + len); + pos += g.length; + System.arraycopy(g, 0, b, off, g.length); + return g.length; } public void seek(long pos) throws IOException { - seekpos = pos; + this.pos = pos; } - public void close() throws IOException { - // write unwritten buffer - if (buffer == null) return; - //writeBuffer(); - ra.close(); - buffer = null; - } - - /* - public void finalize() { - try { - close(); - } catch (IOException e) {} + public void write(int b) throws IOException { + this.sbb.overwrite((int) pos, b); + pos++; } - */ - - public static void main(String[] args) { - try { - kelondroRA file = new kelondroBufferedRA(new kelondroFileRA("testx"), 64, 30); - file.seek(1024 - 2); - byte[] b = new byte[]{65, 66, 77, 88}; - file.write(b); - file.seek(1024 * 2 - 30); - for (int i = 65; i < 150; i++) file.write(i); - file.close(); - } catch (IOException e) { - e.printStackTrace(); - } + + public void write(byte[] b, int off, int len) throws IOException { + this.sbb.overwrite((int) pos, b, off, len); + pos += len; } + } diff --git a/source/de/anomic/kelondro/kelondroFixedWidthArray.java b/source/de/anomic/kelondro/kelondroFixedWidthArray.java index d5320d53a..2af2ecd9a 100644 --- a/source/de/anomic/kelondro/kelondroFixedWidthArray.java +++ b/source/de/anomic/kelondro/kelondroFixedWidthArray.java @@ -69,6 +69,18 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro } } + public kelondroFixedWidthArray(kelondroRA ra, kelondroRow rowdef, int intprops) throws IOException { + // this creates a new array + super(ra, 0, 0, thisOHBytes, thisOHHandles, rowdef, intprops, rowdef.columns() /* txtProps */, 80 /* txtPropWidth */, false); + for (int i = 0; i < intprops; i++) { + setHandle(i, new Handle(0)); + } + // store column description + for (int i = 0; i < rowdef.columns(); i++) { + try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {} + } + } + public static kelondroFixedWidthArray open(File file, kelondroRow rowdef, int intprops) { try { return new kelondroFixedWidthArray(file, rowdef, intprops); @@ -107,16 +119,10 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro public synchronized void overwrite(int index, kelondroRow.Entry rowentry) throws IOException { // this writes a row without reading the row from the file system first - // make room for element - Node n; - while (super.USAGE.allCount() <= index) { - n = newNode(); - n.commit(CP_NONE); - } - // create a node at position index with rowentry - n = newNode(new Handle(index), (rowentry == null) ? null : rowentry.bytes(), 0); - n.commit(CP_NONE); + Handle h = new Handle(index); + h.adoptAllCount(); // adopt counting + newNode(h, (rowentry == null) ? null : rowentry.bytes(), 0).commit(CP_NONE); } public synchronized kelondroRow.Entry get(int index) throws IOException { @@ -186,7 +192,6 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro k = new kelondroFixedWidthArray(f, rowdef, 6); System.out.println(k.get(2).toString()); System.out.println(k.get(3).toString()); - System.out.println(k.get(4).toString()); k.close(); System.out.println("zweiter Test"); diff --git a/source/de/anomic/kelondro/kelondroRecords.java b/source/de/anomic/kelondro/kelondroRecords.java index f7b3ba4d6..275918034 100644 --- a/source/de/anomic/kelondro/kelondroRecords.java +++ b/source/de/anomic/kelondro/kelondroRecords.java @@ -258,6 +258,8 @@ public class kelondroRecords { short ohbytec, short ohhandlec, kelondroRow rowdef, int FHandles, int txtProps, int txtPropWidth, boolean exitOnFail) { + // this always creates a new file + this.fileExisted = false; this.filename = null; try { initNewFile(ra, ohbytec, ohhandlec, rowdef, FHandles, txtProps, txtPropWidth, buffersize / 10); @@ -265,6 +267,7 @@ public class kelondroRecords { logFailure("cannot create / " + e.getMessage()); if (exitOnFail) System.exit(-1); } + assignRowdef(rowdef); writeOrderType(); initCache(buffersize / 10 * 9, preloadTime); } @@ -388,6 +391,7 @@ public class kelondroRecords { } public kelondroRecords(kelondroRA ra, long buffersize, long preloadTime) throws IOException{ + this.fileExisted = false; this.filename = null; initExistingFile(ra, buffersize / 10); readOrderType(); @@ -656,7 +660,7 @@ public class kelondroRecords { protected Node(Handle handle, byte[] bulkchunk, int offset, boolean setChanged) { // this initializer is used to create nodes from bulk-read byte arrays this.handle = handle; - assert (bulkchunk.length >= offset + headchunksize) : "bulkchunk.length = " + bulkchunk.length + ", offset = " + offset + ", headchunksize = " + headchunksize; + assert ((bulkchunk == null) || (bulkchunk.length >= offset + headchunksize)) : "bulkchunk.length = " + bulkchunk.length + ", offset = " + offset + ", headchunksize = " + headchunksize; // create empty chunks this.headChunk = new byte[headchunksize]; @@ -1400,6 +1404,7 @@ public class kelondroRecords { index = USAGE.allCount(); USAGE.USEDC++; entryFile.writeInt(POS_USEDC, USAGE.USEDC); + entryFile.commit(); } else { // re-use record from free-list USAGE.USEDC++; @@ -1439,6 +1444,36 @@ public class kelondroRecords { this.index = i; } + protected void adoptAllCount() throws IOException { + // in case that the handle index was created outside this class, + // this method ensures that the USAGE counters are consistent with the + // new handle index + if (USAGE.allCount() <= this.index) synchronized (USAGE) { + // records that are in between are marked as deleted + boolean wf = false; + while (USAGE.allCount() < this.index) { + Handle h = new Handle(USAGE.allCount()); + USAGE.FREEC++; + entryFile.writeInt(seekpos(h), USAGE.FREEH.index); + USAGE.FREEH = h; + wf = true; + } + assert (USAGE.allCount() >= this.index); + + // adopt USAGE.USEDC + if (USAGE.allCount() == this.index) { + USAGE.USEDC++; + wf = true; + } + + // commit changes + if (wf) { + USAGE.write(); + entryFile.commit(); + } + } + } + public boolean isNUL() { return index == NUL; } diff --git a/source/de/anomic/server/serverByteBuffer.java b/source/de/anomic/server/serverByteBuffer.java index 9030ee036..aee8715b6 100644 --- a/source/de/anomic/server/serverByteBuffer.java +++ b/source/de/anomic/server/serverByteBuffer.java @@ -165,6 +165,28 @@ public final class serverByteBuffer extends OutputStream { length += le; } + // overwrite does not increase the 'length' write position pointer! + + public void overwrite(int pos, int b) { + overwrite(pos, (byte) (b & 0xff)); + } + + public void overwrite(int pos, byte b) { + if (offset + pos + 1 > buffer.length) grow(); + buffer[offset + pos] = b; + if (pos >= length) length = pos + 1; + } + + public void overwrite(int pos, byte[] bb) { + overwrite(pos, bb, 0, bb.length); + } + + public void overwrite(int pos, byte[] bb, int of, int le) { + while (offset + pos + le > buffer.length) grow(); + System.arraycopy(bb, of, buffer, offset + pos, le); + if (pos + le > length) length = pos + le; + } + public serverByteBuffer append(byte b) { write(b); return this;