flush only when > 3000 RWIs present + code cleanup

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6817 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 15 years ago
parent 3a50b5aa04
commit ed07046870

@ -30,7 +30,6 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -52,7 +51,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
public static final long growfactorLarge100 = 140L; public static final long growfactorLarge100 = 140L;
public static final long growfactorSmall100 = 120L; public static final long growfactorSmall100 = 120L;
private static final int isortlimit = 20; private static final int isortlimit = 20;
private static int availableCPU = Runtime.getRuntime().availableProcessors(); private static final int availableCPU = Runtime.getRuntime().availableProcessors();
private static final int exp_chunkcount = 0; private static final int exp_chunkcount = 0;
private static final int exp_last_read = 1; private static final int exp_last_read = 1;
@ -62,7 +61,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
private static final int exp_collection = 5; private static final int exp_collection = 5;
public static final ExecutorService sortingthreadexecutor = (availableCPU > 1) ? Executors.newCachedThreadPool(new NamePrefixThreadFactory("sorting")) : null; public static final ExecutorService sortingthreadexecutor = (availableCPU > 1) ? Executors.newCachedThreadPool(new NamePrefixThreadFactory("sorting")) : null;
public static final ExecutorService partitionthreadexecutor = (availableCPU > 1) ? Executors.newCachedThreadPool(new NamePrefixThreadFactory("partition")) : null; private static final ExecutorService partitionthreadexecutor = (availableCPU > 1) ? Executors.newCachedThreadPool(new NamePrefixThreadFactory("partition")) : null;
public final Row rowdef; public final Row rowdef;
protected byte[] chunkcache; protected byte[] chunkcache;
@ -70,7 +69,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
protected int sortBound; protected int sortBound;
protected long lastTimeWrote; protected long lastTimeWrote;
public RowCollection(final RowCollection rc) { protected RowCollection(final RowCollection rc) {
this.rowdef = rc.rowdef; this.rowdef = rc.rowdef;
this.chunkcache = rc.chunkcache; this.chunkcache = rc.chunkcache;
this.chunkcount = rc.chunkcount; this.chunkcount = rc.chunkcount;
@ -78,7 +77,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
this.lastTimeWrote = rc.lastTimeWrote; this.lastTimeWrote = rc.lastTimeWrote;
} }
public RowCollection(final Row rowdef) { protected RowCollection(final Row rowdef) {
this.rowdef = rowdef; this.rowdef = rowdef;
this.sortBound = 0; this.sortBound = 0;
this.lastTimeWrote = System.currentTimeMillis(); this.lastTimeWrote = System.currentTimeMillis();
@ -91,7 +90,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
ensureSize(objectCount); ensureSize(objectCount);
} }
public RowCollection(final Row rowdef, final int objectCount, final byte[] cache, final int sortBound) { protected RowCollection(final Row rowdef, final int objectCount, final byte[] cache, final int sortBound) {
this.rowdef = rowdef; this.rowdef = rowdef;
this.chunkcache = cache; this.chunkcache = cache;
this.chunkcount = objectCount; this.chunkcount = objectCount;
@ -99,7 +98,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
this.lastTimeWrote = System.currentTimeMillis(); this.lastTimeWrote = System.currentTimeMillis();
} }
public RowCollection(final Row rowdef, final Row.Entry exportedCollectionRowEnvironment) { protected RowCollection(final Row rowdef, final Row.Entry exportedCollectionRowEnvironment) {
final int chunkcachelength = exportedCollectionRowEnvironment.cellwidth(1) - exportOverheadSize; final int chunkcachelength = exportedCollectionRowEnvironment.cellwidth(1) - exportOverheadSize;
final Row.Entry exportedCollection = exportRow(chunkcachelength).newEntry(exportedCollectionRowEnvironment, 1); final Row.Entry exportedCollection = exportRow(chunkcachelength).newEntry(exportedCollectionRowEnvironment, 1);
@ -157,7 +156,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
private static final long day = 1000 * 60 * 60 * 24; private static final long day = 1000 * 60 * 60 * 24;
public static int daysSince2000(final long time) { private static int daysSince2000(final long time) {
return (int) (time / day) - 10957; return (int) (time / day) - 10957;
} }
@ -193,7 +192,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
); );
} }
public static final int exportOverheadSize = 14; protected static final int exportOverheadSize = 14;
public synchronized byte[] exportCollection() { public synchronized byte[] exportCollection() {
// returns null if the collection is empty // returns null if the collection is empty
@ -220,7 +219,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
return this.rowdef; return this.rowdef;
} }
protected final long neededSpaceForEnsuredSize(final int elements, final boolean forcegc) { private final long neededSpaceForEnsuredSize(final int elements, final boolean forcegc) {
assert elements > 0 : "elements = " + elements; assert elements > 0 : "elements = " + elements;
final long needed = elements * rowdef.objectsize; final long needed = elements * rowdef.objectsize;
if (chunkcache.length >= needed) return 0; if (chunkcache.length >= needed) return 0;
@ -234,7 +233,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
return needed; return needed;
} }
protected final void ensureSize(final int elements) throws RowSpaceExceededException { private final void ensureSize(final int elements) throws RowSpaceExceededException {
if (elements == 0) return; if (elements == 0) return;
final long allocram = neededSpaceForEnsuredSize(elements, true); final long allocram = neededSpaceForEnsuredSize(elements, true);
if (allocram == 0) return; if (allocram == 0) return;
@ -264,11 +263,11 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
* than is necessary to store the data. This method computes the extra memory that is needed to perform this task. * than is necessary to store the data. This method computes the extra memory that is needed to perform this task.
* @return * @return
*/ */
public final long memoryNeededForGrow() { protected final long memoryNeededForGrow() {
return neededSpaceForEnsuredSize(chunkcount + 1, false); return neededSpaceForEnsuredSize(chunkcount + 1, false);
} }
public synchronized void trim(final boolean plusGrowFactor) { protected synchronized void trim(final boolean plusGrowFactor) {
if (chunkcache.length == 0) return; if (chunkcache.length == 0) return;
long needed = chunkcount * rowdef.objectsize; long needed = chunkcount * rowdef.objectsize;
if (plusGrowFactor) needed = neededSpaceForEnsuredSize(chunkcount, false); if (plusGrowFactor) needed = neededSpaceForEnsuredSize(chunkcount, false);
@ -289,7 +288,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
return lastTimeWrote; return lastTimeWrote;
} }
public synchronized final byte[] getKey(final int index) { protected synchronized final byte[] getKey(final int index) {
assert (index >= 0) : "get: access with index " + index + " is below zero"; assert (index >= 0) : "get: access with index " + index + " is below zero";
assert (index < chunkcount) : "get: access with index " + index + " is above chunkcount " + chunkcount + "; sortBound = " + sortBound; assert (index < chunkcount) : "get: access with index " + index + " is above chunkcount " + chunkcount + "; sortBound = " + sortBound;
assert (index * rowdef.objectsize < chunkcache.length); assert (index * rowdef.objectsize < chunkcache.length);
@ -456,7 +455,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
return r; return r;
} }
public synchronized byte[] smallestKey() { protected synchronized byte[] smallestKey() {
if (chunkcount == 0) return null; if (chunkcount == 0) return null;
this.sort(); this.sort();
final Row.Entry r = get(0, false); final Row.Entry r = get(0, false);
@ -464,7 +463,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
return b; return b;
} }
public synchronized byte[] largestKey() { protected synchronized byte[] largestKey() {
if (chunkcount == 0) return null; if (chunkcount == 0) return null;
this.sort(); this.sort();
final Row.Entry r = get(chunkcount - 1, false); final Row.Entry r = get(chunkcount - 1, false);
@ -503,12 +502,12 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
* collection during removes. * collection during removes.
* *
*/ */
public class keyIterator implements Iterator<byte[]> { private class keyIterator implements Iterator<byte[]> {
private int p; private int p;
private final boolean keepOrderWhenRemoving; private final boolean keepOrderWhenRemoving;
public keyIterator(final boolean keepOrderWhenRemoving) { private keyIterator(final boolean keepOrderWhenRemoving) {
this.p = 0; this.p = 0;
this.keepOrderWhenRemoving = keepOrderWhenRemoving; this.keepOrderWhenRemoving = keepOrderWhenRemoving;
} }
@ -540,7 +539,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
* It supports remove() and keeps the order of the underlying * It supports remove() and keeps the order of the underlying
* collection during removes. * collection during removes.
*/ */
public class rowIterator implements Iterator<Row.Entry> { private class rowIterator implements Iterator<Row.Entry> {
private int p; private int p;
@ -563,7 +562,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
} }
public synchronized final void sort() { protected synchronized final void sort() {
assert (this.rowdef.objectOrder != null); assert (this.rowdef.objectOrder != null);
if (this.sortBound == this.chunkcount) return; // this is already sorted if (this.sortBound == this.chunkcount) return; // this is already sorted
if (this.chunkcount < isortlimit) { if (this.chunkcount < isortlimit) {
@ -643,8 +642,8 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
//assert this.isSorted(); //assert this.isSorted();
} }
public static class qsortthread implements Callable<Object> { private static class qsortthread implements Callable<Object> {
RowCollection rc; private RowCollection rc;
int L, R, S; int L, R, S;
public qsortthread(final RowCollection rc, final int L, final int R, final int S) { public qsortthread(final RowCollection rc, final int L, final int R, final int S) {
@ -831,7 +830,7 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
if (i == p) return j; else if (j == p) return i; else return p; if (i == p) return j; else if (j == p) return i; else return p;
} }
public synchronized void uniq() { protected synchronized void uniq() {
assert (this.rowdef.objectOrder != null); assert (this.rowdef.objectOrder != null);
// removes double-occurrences of chunks // removes double-occurrences of chunks
// this works only if the collection was ordered with sort before // this works only if the collection was ordered with sort before
@ -1029,11 +1028,11 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
} }
final long t2 = System.nanoTime(); final long t2 = System.nanoTime();
System.out.println("copy c -> d: " + (t2 - t1) + " nanoseconds, " + d(testsize, (t2 - t1)) + " entries/nanoseconds"); System.out.println("copy c -> d: " + (t2 - t1) + " nanoseconds, " + d(testsize, (t2 - t1)) + " entries/nanoseconds");
availableCPU = 1; //availableCPU = 1;
c.sort(); c.sort();
final long t3 = System.nanoTime(); final long t3 = System.nanoTime();
System.out.println("sort c (1) : " + (t3 - t2) + " nanoseconds, " + d(testsize, (t3 - t2)) + " entries/nanoseconds"); System.out.println("sort c (1) : " + (t3 - t2) + " nanoseconds, " + d(testsize, (t3 - t2)) + " entries/nanoseconds");
availableCPU = 2; //availableCPU = 2;
d.sort(); d.sort();
final long t4 = System.nanoTime(); final long t4 = System.nanoTime();
System.out.println("sort d (2) : " + (t4 - t3) + " nanoseconds, " + d(testsize, (t4 - t3)) + " entries/nanoseconds"); System.out.println("sort d (2) : " + (t4 - t3) + " nanoseconds, " + d(testsize, (t4 - t3)) + " entries/nanoseconds");

@ -55,7 +55,7 @@ public class IODispatcher extends Thread {
private ArrayBlockingQueue<DumpJob<? extends Reference>> dumpQueue; private ArrayBlockingQueue<DumpJob<? extends Reference>> dumpQueue;
//private ReferenceFactory<ReferenceType> factory; //private ReferenceFactory<ReferenceType> factory;
private boolean terminate; private boolean terminate;
protected int writeBufferSize; private int writeBufferSize;
public IODispatcher(int dumpQueueLength, int mergeQueueLength, int writeBufferSize) { public IODispatcher(int dumpQueueLength, int mergeQueueLength, int writeBufferSize) {
this.termination = new Semaphore(0); this.termination = new Semaphore(0);
@ -80,7 +80,7 @@ public class IODispatcher extends Thread {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public synchronized void dump(ReferenceContainerCache<? extends Reference> cache, File file, ReferenceContainerArray<? extends Reference> array) { protected synchronized void dump(ReferenceContainerCache<? extends Reference> cache, File file, ReferenceContainerArray<? extends Reference> array) {
if (dumpQueue == null || controlQueue == null || !this.isAlive()) { if (dumpQueue == null || controlQueue == null || !this.isAlive()) {
Log.logWarning("IODispatcher", "emergency dump of file " + file.getName()); Log.logWarning("IODispatcher", "emergency dump of file " + file.getName());
if (!cache.isEmpty()) cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); if (!cache.isEmpty()) cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize));
@ -103,11 +103,11 @@ public class IODispatcher extends Thread {
} }
} }
public synchronized int queueLength() { protected synchronized int queueLength() {
return (controlQueue == null || !this.isAlive()) ? 0 : controlQueue.availablePermits(); return (controlQueue == null || !this.isAlive()) ? 0 : controlQueue.availablePermits();
} }
public synchronized void merge(File f1, File f2, ReferenceFactory<? extends Reference> factory, ArrayStack array, Row payloadrow, File newFile) { protected synchronized void merge(File f1, File f2, ReferenceFactory<? extends Reference> factory, ArrayStack array, Row payloadrow, File newFile) {
if (mergeQueue == null || controlQueue == null || !this.isAlive()) { if (mergeQueue == null || controlQueue == null || !this.isAlive()) {
if (f2 == null) { if (f2 == null) {
Log.logWarning("IODispatcher", "emergency rewrite of file " + f1.getName() + " to " + newFile.getName()); Log.logWarning("IODispatcher", "emergency rewrite of file " + f1.getName() + " to " + newFile.getName());
@ -213,16 +213,16 @@ public class IODispatcher extends Thread {
} }
} }
public class DumpJob <ReferenceType extends Reference> { private class DumpJob <ReferenceType extends Reference> {
ReferenceContainerCache<ReferenceType> cache; private ReferenceContainerCache<ReferenceType> cache;
File file; private File file;
ReferenceContainerArray<ReferenceType> array; private ReferenceContainerArray<ReferenceType> array;
public DumpJob(ReferenceContainerCache<ReferenceType> cache, File file, ReferenceContainerArray<ReferenceType> array) { private DumpJob(ReferenceContainerCache<ReferenceType> cache, File file, ReferenceContainerArray<ReferenceType> array) {
this.cache = cache; this.cache = cache;
this.file = file; this.file = file;
this.array = array; this.array = array;
} }
public void dump() { private void dump() {
try { try {
if (!cache.isEmpty()) cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize)); if (!cache.isEmpty()) cache.dump(file, (int) Math.min(MemoryControl.available() / 3, writeBufferSize));
array.mountBLOBFile(file); array.mountBLOBFile(file);
@ -232,14 +232,14 @@ public class IODispatcher extends Thread {
} }
} }
public class MergeJob { private class MergeJob {
File f1, f2, newFile; private File f1, f2, newFile;
ArrayStack array; private ArrayStack array;
Row payloadrow; private Row payloadrow;
ReferenceFactory<? extends Reference> factory; private ReferenceFactory<? extends Reference> factory;
public MergeJob( private MergeJob(
File f1, File f1,
File f2, File f2,
ReferenceFactory<? extends Reference> factory, ReferenceFactory<? extends Reference> factory,
@ -254,7 +254,7 @@ public class IODispatcher extends Thread {
this.payloadrow = payloadrow; this.payloadrow = payloadrow;
} }
public File merge() { private File merge() {
if (!f1.exists()) { if (!f1.exists()) {
Log.logWarning("IODispatcher", "merge of file (1) " + f1.getName() + " failed: file does not exists"); Log.logWarning("IODispatcher", "merge of file (1) " + f1.getName() + " failed: file does not exists");
return null; return null;

@ -414,7 +414,7 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
if (this.dumperSemaphore.availablePermits() > 0 && if (this.dumperSemaphore.availablePermits() > 0 &&
(this.ram.size() >= this.maxRamEntries || (this.ram.size() >= this.maxRamEntries ||
(this.ram.size() > 3000 && !MemoryControl.request(80L * 1024L * 1024L, false)) || (this.ram.size() > 3000 && !MemoryControl.request(80L * 1024L * 1024L, false)) ||
(this.ram.size() > 0 && this.lastDump + dumpCycle < t))) { (this.ram.size() > 3000 && this.lastDump + dumpCycle < t))) {
try { try {
this.dumperSemaphore.acquire(); // only one may pass this.dumperSemaphore.acquire(); // only one may pass
if (this.ram.size() >= this.maxRamEntries || if (this.ram.size() >= this.maxRamEntries ||

Loading…
Cancel
Save