enhanced speed of RAM cache flush by factor 20 (twenty times faster)

- the speed was doubled by avoiding read access during the dump
- the speed was dramatically increased at least by factor 10
   by using a temporary ram-file where the structures are flushed to
   before it is dumped then as a whole byte-chunk to the file system.
The speed enhancements also affects some other parts of the database.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@3353 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 18 years ago
parent 30d79d69a6
commit 1f1f398bfa

@ -3,7 +3,7 @@ javacSource=1.4
javacTarget=1.4
# Release Configuration
releaseVersion=0.503
releaseVersion=0.504
releaseFile=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
#releaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
releaseDir=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}

@ -36,12 +36,16 @@ import java.util.SortedMap;
import java.util.TreeMap;
import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.kelondro.kelondroBufferedRA;
import de.anomic.kelondro.kelondroException;
import de.anomic.kelondro.kelondroFixedWidthArray;
import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.kelondro.kelondroNaturalOrder;
import de.anomic.kelondro.kelondroRow;
import de.anomic.server.serverByteBuffer;
import de.anomic.server.serverFileUtils;
import de.anomic.server.logging.serverLog;
import de.anomic.server.serverMemory;
import de.anomic.yacy.yacySeedDB;
public final class indexRAMRI implements indexRI {
@ -106,64 +110,92 @@ public final class indexRAMRI implements indexRI {
File indexDumpFile = new File(databaseRoot, indexArrayFileName);
if (indexDumpFile.exists()) indexDumpFile.delete();
kelondroFixedWidthArray dumpArray = null;
kelondroBufferedRA writeBuffer = null;
if (serverMemory.available() > 50 * bufferStructureBasis.objectsize() * cache.size()) {
writeBuffer = new kelondroBufferedRA();
dumpArray = new kelondroFixedWidthArray(writeBuffer, bufferStructureBasis, 0);
log.logInfo("started dump of ram cache: " + cache.size() + " words; memory-enhanced write");
} else {
dumpArray = new kelondroFixedWidthArray(indexDumpFile, bufferStructureBasis, 0);
long startTime = System.currentTimeMillis();
long messageTime = System.currentTimeMillis() + 5000;
long wordsPerSecond = 0, wordcount = 0, urlcount = 0;
Map.Entry entry;
String wordHash;
indexContainer container;
long updateTime;
indexRWIEntry iEntry;
kelondroRow.Entry row = dumpArray.row().newEntry();
byte[] occ, time;
// write wCache
synchronized (cache) {
Iterator i = cache.entrySet().iterator();
while (i.hasNext()) {
// get entries
entry = (Map.Entry) i.next();
wordHash = (String) entry.getKey();
updateTime = getUpdateTime(wordHash);
container = (indexContainer) entry.getValue();
// put entries on stack
if (container != null) {
Iterator ci = container.entries();
occ = kelondroNaturalOrder.encodeLong(container.size(), 4);
time = kelondroNaturalOrder.encodeLong(updateTime, 8);
while (ci.hasNext()) {
iEntry = (indexRWIEntry) ci.next();
row.setCol(0, wordHash.getBytes());
row.setCol(1, occ);
row.setCol(2, time);
row.setCol(3, iEntry.toKelondroEntry().bytes());
dumpArray.overwrite((int) urlcount++, row);
}
}
wordcount++;
i.remove(); // free some mem
// write a log
if (System.currentTimeMillis() > messageTime) {
// System.gc(); // for better statistic
wordsPerSecond = wordcount * 1000 / (1 + System.currentTimeMillis() - startTime);
log.logInfo("dumping status: " + wordcount + " words done, " + (cache.size() / (wordsPerSecond + 1)) + " seconds remaining, free mem = " + (Runtime.getRuntime().freeMemory() / 1024 / 1024) + "MB");
messageTime = System.currentTimeMillis() + 5000;
log.logInfo("started dump of ram cache: " + cache.size() + " words; low-memory write");
}
long startTime = System.currentTimeMillis();
long messageTime = System.currentTimeMillis() + 5000;
long wordsPerSecond = 0, wordcount = 0, urlcount = 0;
Map.Entry entry;
String wordHash;
indexContainer container;
long updateTime;
indexRWIEntry iEntry;
kelondroRow.Entry row = dumpArray.row().newEntry();
byte[] occ, time;
// write wCache
synchronized (cache) {
Iterator i = cache.entrySet().iterator();
while (i.hasNext()) {
// get entries
entry = (Map.Entry) i.next();
wordHash = (String) entry.getKey();
updateTime = getUpdateTime(wordHash);
container = (indexContainer) entry.getValue();
// put entries on stack
if (container != null) {
Iterator ci = container.entries();
occ = kelondroNaturalOrder.encodeLong(container.size(), 4);
time = kelondroNaturalOrder.encodeLong(updateTime, 8);
while (ci.hasNext()) {
iEntry = (indexRWIEntry) ci.next();
row.setCol(0, wordHash.getBytes());
row.setCol(1, occ);
row.setCol(2, time);
row.setCol(3, iEntry.toKelondroEntry().bytes());
dumpArray.overwrite((int) urlcount++, row);
}
}
wordcount++;
i.remove(); // free some mem
// write a log
if (System.currentTimeMillis() > messageTime) {
// System.gc(); // for better statistic
wordsPerSecond = wordcount * 1000
/ (1 + System.currentTimeMillis() - startTime);
log.logInfo("dump status: " + wordcount
+ " words done, "
+ (cache.size() / (wordsPerSecond + 1))
+ " seconds remaining, free mem = "
+ (Runtime.getRuntime().freeMemory() / 1024 / 1024)
+ "MB");
messageTime = System.currentTimeMillis() + 5000;
}
}
dumpArray.close();
dumpArray = null;
log.logConfig("dumped " + urlcount + " word/URL relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
}
if (writeBuffer != null) {
serverByteBuffer bb = writeBuffer.getBuffer();
//System.out.println("*** byteBuffer size = " + bb.length());
serverFileUtils.write(bb.getBytes(), indexDumpFile);
writeBuffer.close();
}
dumpArray.close();
dumpArray = null;
log.logInfo("finished dump of ram cache: " + urlcount + " word/URL relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
}
private long restore() throws IOException {
File indexDumpFile = new File(databaseRoot, indexArrayFileName);
if (!(indexDumpFile.exists())) return 0;
kelondroFixedWidthArray dumpArray = new kelondroFixedWidthArray(indexDumpFile, bufferStructureBasis, 0);
log.logConfig("restore array dump of index cache '" + indexArrayFileName + "', " + dumpArray.size() + " word/URL relations");
kelondroFixedWidthArray dumpArray;
kelondroBufferedRA readBuffer = null;
if (false /*serverMemory.available() > indexDumpFile.length() * 2*/) {
readBuffer = new kelondroBufferedRA(new serverByteBuffer(serverFileUtils.read(indexDumpFile)));
dumpArray = new kelondroFixedWidthArray(readBuffer, bufferStructureBasis, 0);
log.logInfo("started restore of ram cache '" + indexArrayFileName + "', " + dumpArray.size() + " word/URL relations; memory-enhanced read");
} else {
dumpArray = new kelondroFixedWidthArray(indexDumpFile, bufferStructureBasis, 0);
log.logInfo("started restore of ram cache '" + indexArrayFileName + "', " + dumpArray.size() + " word/URL relations; low-memory read");
}
long startTime = System.currentTimeMillis();
long messageTime = System.currentTimeMillis() + 5000;
long urlCount = 0, urlsPerSecond = 0;
@ -196,12 +228,12 @@ public final class indexRAMRI implements indexRI {
}
}
}
if (readBuffer != null) readBuffer.close();
dumpArray.close();
log.logConfig("restored " + cache.size() + " words in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
log.logConfig("finished restor " + cache.size() + " words in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
} catch (kelondroException e) {
// restore failed
log.logSevere("restore of indexCache array dump failed: " + e.getMessage(), e);
log.logSevere("failed restore of indexCache array dump: " + e.getMessage(), e);
} finally {
if (dumpArray != null) try {dumpArray.close();}catch(Exception e){}
}

@ -42,212 +42,62 @@
package de.anomic.kelondro;
import java.io.IOException;
import de.anomic.server.serverByteBuffer;
public class kelondroBufferedRA extends kelondroAbstractRA implements kelondroRA {
// FIXME: a lot of synchronization of ra is needed here
private serverByteBuffer sbb;
private long pos;
protected kelondroRA ra;
protected byte[] buffer;
protected int bufferPage;
protected boolean bufferWritten;
private long seekpos;
private int bufferSizeExp;
private int bufferSize;
private int bufferOffsetFilter;
private int bufferStart;
public kelondroBufferedRA(kelondroRA ra, int minBufferSize, int bufferStartMin) {
// calculate buffer organization parameters
this.bufferSizeExp = 0;
minBufferSize--;
while (minBufferSize > 0) {minBufferSize = minBufferSize >> 1; this.bufferSizeExp++;}
this.bufferSize = 1 << this.bufferSizeExp;
this.bufferOffsetFilter = this.bufferSize - 1;
this.bufferStart = 0;
while (this.bufferStart < bufferStartMin) this.bufferStart += this.bufferSize;
// init buffer
this.ra = ra;
this.name = ra.name();
this.buffer = new byte[bufferSize];
this.seekpos = 0;
this.bufferPage = -1;
this.bufferWritten = true;
}
public long length() throws IOException {
return ra.length();
}
public long available() throws IOException {
synchronized (ra) {
ra.seek(seekpos);
return ra.available();
}
public kelondroBufferedRA() {
sbb = new serverByteBuffer();
pos = 0;
}
private void readBuffer(int newPageNr) throws IOException {
if (newPageNr == bufferPage) return;
bufferPage = newPageNr;
ra.seek(bufferPage << bufferSizeExp);
ra.readFully(buffer, 0, bufferSize);
bufferWritten = true;
}
/*
private void writeBuffer() throws IOException {
if ((bufferWritten) || (bufferPage < 0)) return;
ra.seek(bufferPage << bufferSizeExp);
ra.write(buffer, 0, bufferSize);
bufferWritten = true;
public kelondroBufferedRA(serverByteBuffer bb) {
sbb = bb;
pos = 0;
}
*/
private void updateToBuffer(int newPageNr) throws IOException {
if (newPageNr != bufferPage) {
//writeBuffer();
readBuffer(newPageNr);
}
public serverByteBuffer getBuffer() {
return this.sbb;
}
// pseudo-native method read
public int read() throws IOException {
if (seekpos < bufferStart) {
// do not use buffer
ra.seek(seekpos);
int r = ra.read();
seekpos++;
return r;
}
int bn = (int) seekpos >> bufferSizeExp; // buffer page number
int offset = (int) seekpos & bufferOffsetFilter; // buffer page offset
seekpos++;
updateToBuffer(bn);
return 0xFF & buffer[offset];
public long available() throws IOException {
return Long.MAX_VALUE - sbb.length();
}
// pseudo-native method write
public void write(int b) throws IOException {
if (seekpos < bufferStart) {
// do not use buffer
ra.seek(seekpos);
ra.write(b);
seekpos++;
} else {
// write to ra direkt
ra.seek(seekpos);
ra.write(b);
// and write also to buffer
int bn = (int) seekpos >> bufferSizeExp; // buffer page number
int offset = (int) seekpos & bufferOffsetFilter; // buffer page offset
updateToBuffer(bn);
buffer[offset] = (byte) b;
bufferWritten = false;
// update seek pos
seekpos++;
}
public void close() throws IOException {
sbb = null;
}
public int read(byte[] b, int off, int len) throws IOException {
public long length() throws IOException {
return sbb.length();
}
// check buffer size
if (seekpos < bufferStart) {
// do not use buffer
ra.seek(seekpos);
int r = ra.read(b, off, len);
seekpos += len;
return r;
}
// check simple case
int bn1 = (int) seekpos >> bufferSizeExp; // buffer page number, first position
int bn2 = (int) (seekpos + len - 1) >> bufferSizeExp; // buffer page number, last position
int offset = (int) seekpos & bufferOffsetFilter; // buffer page offset
updateToBuffer(bn1);
if (bn1 == bn2) {
// simple case
System.arraycopy(buffer, offset, b, off, len);
seekpos += len;
return len;
}
// do recursively
int thislen = bufferSize - offset;
System.arraycopy(buffer, offset, b, off, thislen);
seekpos += thislen;
return thislen + read(b, off + thislen, len - thislen);
public int read() throws IOException {
return (int) 0xff & sbb.byteAt((int) pos++);
}
public void write(byte[] b, int off, int len) throws IOException {
if (seekpos < bufferStart) {
// do not use buffer
ra.seek(seekpos);
ra.write(b, off, len);
seekpos += len;
} else {
int bn1 = (int) seekpos >> bufferSizeExp; // buffer page number, first position
int bn2 = (int) (seekpos + len - 1) >> bufferSizeExp; // buffer page number, last position
int offset = (int) seekpos & bufferOffsetFilter; // buffer page offset
updateToBuffer(bn1);
if (bn1 == bn2) {
// simple case
ra.seek(seekpos);
ra.write(b, off, len);
System.arraycopy(b, off, buffer, offset, len);
bufferWritten = false;
seekpos += len;
} else {
// do recursively
int thislen = bufferSize - offset;
ra.seek(seekpos);
ra.write(b, off, thislen);
System.arraycopy(b, off, buffer, offset, thislen);
bufferWritten = false;
seekpos += thislen;
write(b, off + thislen, len - thislen);
}
}
public int read(byte[] b, int off, int len) throws IOException {
byte[] g = sbb.getBytes((int) pos, (int) pos + len);
pos += g.length;
System.arraycopy(g, 0, b, off, g.length);
return g.length;
}
public void seek(long pos) throws IOException {
seekpos = pos;
this.pos = pos;
}
public void close() throws IOException {
// write unwritten buffer
if (buffer == null) return;
//writeBuffer();
ra.close();
buffer = null;
}
/*
public void finalize() {
try {
close();
} catch (IOException e) {}
public void write(int b) throws IOException {
this.sbb.overwrite((int) pos, b);
pos++;
}
*/
public static void main(String[] args) {
try {
kelondroRA file = new kelondroBufferedRA(new kelondroFileRA("testx"), 64, 30);
file.seek(1024 - 2);
byte[] b = new byte[]{65, 66, 77, 88};
file.write(b);
file.seek(1024 * 2 - 30);
for (int i = 65; i < 150; i++) file.write(i);
file.close();
} catch (IOException e) {
e.printStackTrace();
}
public void write(byte[] b, int off, int len) throws IOException {
this.sbb.overwrite((int) pos, b, off, len);
pos += len;
}
}

@ -69,6 +69,18 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
}
}
public kelondroFixedWidthArray(kelondroRA ra, kelondroRow rowdef, int intprops) throws IOException {
// this creates a new array
super(ra, 0, 0, thisOHBytes, thisOHHandles, rowdef, intprops, rowdef.columns() /* txtProps */, 80 /* txtPropWidth */, false);
for (int i = 0; i < intprops; i++) {
setHandle(i, new Handle(0));
}
// store column description
for (int i = 0; i < rowdef.columns(); i++) {
try {super.setText(i, rowdef.column(i).toString().getBytes());} catch (IOException e) {}
}
}
public static kelondroFixedWidthArray open(File file, kelondroRow rowdef, int intprops) {
try {
return new kelondroFixedWidthArray(file, rowdef, intprops);
@ -107,16 +119,10 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
public synchronized void overwrite(int index, kelondroRow.Entry rowentry) throws IOException {
// this writes a row without reading the row from the file system first
// make room for element
Node n;
while (super.USAGE.allCount() <= index) {
n = newNode();
n.commit(CP_NONE);
}
// create a node at position index with rowentry
n = newNode(new Handle(index), (rowentry == null) ? null : rowentry.bytes(), 0);
n.commit(CP_NONE);
Handle h = new Handle(index);
h.adoptAllCount(); // adopt counting
newNode(h, (rowentry == null) ? null : rowentry.bytes(), 0).commit(CP_NONE);
}
public synchronized kelondroRow.Entry get(int index) throws IOException {
@ -186,7 +192,6 @@ public class kelondroFixedWidthArray extends kelondroRecords implements kelondro
k = new kelondroFixedWidthArray(f, rowdef, 6);
System.out.println(k.get(2).toString());
System.out.println(k.get(3).toString());
System.out.println(k.get(4).toString());
k.close();
System.out.println("zweiter Test");

@ -258,6 +258,8 @@ public class kelondroRecords {
short ohbytec, short ohhandlec,
kelondroRow rowdef, int FHandles, int txtProps, int txtPropWidth,
boolean exitOnFail) {
// this always creates a new file
this.fileExisted = false;
this.filename = null;
try {
initNewFile(ra, ohbytec, ohhandlec, rowdef, FHandles, txtProps, txtPropWidth, buffersize / 10);
@ -265,6 +267,7 @@ public class kelondroRecords {
logFailure("cannot create / " + e.getMessage());
if (exitOnFail) System.exit(-1);
}
assignRowdef(rowdef);
writeOrderType();
initCache(buffersize / 10 * 9, preloadTime);
}
@ -388,6 +391,7 @@ public class kelondroRecords {
}
public kelondroRecords(kelondroRA ra, long buffersize, long preloadTime) throws IOException{
this.fileExisted = false;
this.filename = null;
initExistingFile(ra, buffersize / 10);
readOrderType();
@ -656,7 +660,7 @@ public class kelondroRecords {
protected Node(Handle handle, byte[] bulkchunk, int offset, boolean setChanged) {
// this initializer is used to create nodes from bulk-read byte arrays
this.handle = handle;
assert (bulkchunk.length >= offset + headchunksize) : "bulkchunk.length = " + bulkchunk.length + ", offset = " + offset + ", headchunksize = " + headchunksize;
assert ((bulkchunk == null) || (bulkchunk.length >= offset + headchunksize)) : "bulkchunk.length = " + bulkchunk.length + ", offset = " + offset + ", headchunksize = " + headchunksize;
// create empty chunks
this.headChunk = new byte[headchunksize];
@ -1400,6 +1404,7 @@ public class kelondroRecords {
index = USAGE.allCount();
USAGE.USEDC++;
entryFile.writeInt(POS_USEDC, USAGE.USEDC);
entryFile.commit();
} else {
// re-use record from free-list
USAGE.USEDC++;
@ -1439,6 +1444,36 @@ public class kelondroRecords {
this.index = i;
}
protected void adoptAllCount() throws IOException {
// in case that the handle index was created outside this class,
// this method ensures that the USAGE counters are consistent with the
// new handle index
if (USAGE.allCount() <= this.index) synchronized (USAGE) {
// records that are in between are marked as deleted
boolean wf = false;
while (USAGE.allCount() < this.index) {
Handle h = new Handle(USAGE.allCount());
USAGE.FREEC++;
entryFile.writeInt(seekpos(h), USAGE.FREEH.index);
USAGE.FREEH = h;
wf = true;
}
assert (USAGE.allCount() >= this.index);
// adopt USAGE.USEDC
if (USAGE.allCount() == this.index) {
USAGE.USEDC++;
wf = true;
}
// commit changes
if (wf) {
USAGE.write();
entryFile.commit();
}
}
}
public boolean isNUL() {
return index == NUL;
}

@ -165,6 +165,28 @@ public final class serverByteBuffer extends OutputStream {
length += le;
}
// overwrite does not increase the 'length' write position pointer!
public void overwrite(int pos, int b) {
overwrite(pos, (byte) (b & 0xff));
}
public void overwrite(int pos, byte b) {
if (offset + pos + 1 > buffer.length) grow();
buffer[offset + pos] = b;
if (pos >= length) length = pos + 1;
}
public void overwrite(int pos, byte[] bb) {
overwrite(pos, bb, 0, bb.length);
}
public void overwrite(int pos, byte[] bb, int of, int le) {
while (offset + pos + le > buffer.length) grow();
System.arraycopy(bb, of, buffer, offset + pos, le);
if (pos + le > length) length = pos + le;
}
public serverByteBuffer append(byte b) {
write(b);
return this;

Loading…
Cancel
Save