orbiter 16 years ago
parent 0f3246e90a
commit 16efcd0366

@ -148,7 +148,7 @@ public class HeapReader {
Log.logInfo("HeapReader", "generating index for " + heapFile.toString() + ", " + (file.length() / 1024 / 1024) + " MB. Please wait."); Log.logInfo("HeapReader", "generating index for " + heapFile.toString() + ", " + (file.length() / 1024 / 1024) + " MB. Please wait.");
this.free = new Gap(); this.free = new Gap();
HandleMap.initDataConsumer indexready = HandleMap.asynchronusInitializer(keylength, this.ordering, 8, 0, Math.max(10, (int) (Runtime.getRuntime().freeMemory() / (10 * 1024 * 1024))), 100000); HandleMap.initDataConsumer indexready = HandleMap.asynchronusInitializer(keylength, this.ordering, 8, 0, Math.max(10, (int) (Runtime.getRuntime().freeMemory() / (10 * 1024 * 1024))));
byte[] key = new byte[keylength]; byte[] key = new byte[keylength];
int reclen; int reclen;
long seek = 0; long seek = 0;

@ -43,6 +43,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
@ -288,8 +289,8 @@ public class HandleMap implements Iterable<Row.Entry> {
* @param bufferSize * @param bufferSize
* @return * @return
*/ */
public static initDataConsumer asynchronusInitializer(final int keylength, final ByteOrder objectOrder, int idxbytes, final int space, final int expectedspace, int bufferSize) { public static initDataConsumer asynchronusInitializer(final int keylength, final ByteOrder objectOrder, int idxbytes, final int space, final int expectedspace) {
initDataConsumer initializer = new initDataConsumer(new HandleMap(keylength, objectOrder, idxbytes, space, expectedspace), bufferSize); initDataConsumer initializer = new initDataConsumer(new HandleMap(keylength, objectOrder, idxbytes, space, expectedspace));
ExecutorService service = Executors.newSingleThreadExecutor(); ExecutorService service = Executors.newSingleThreadExecutor();
initializer.setResult(service.submit(initializer)); initializer.setResult(service.submit(initializer));
service.shutdown(); service.shutdown();
@ -314,9 +315,9 @@ public class HandleMap implements Iterable<Row.Entry> {
private Future<HandleMap> result; private Future<HandleMap> result;
private boolean sortAtEnd; private boolean sortAtEnd;
public initDataConsumer(HandleMap map, int bufferCount) { public initDataConsumer(HandleMap map) {
this.map = map; this.map = map;
cache = new ArrayBlockingQueue<entry>(bufferCount); cache = new LinkedBlockingQueue<entry>();
sortAtEnd = false; sortAtEnd = false;
} }
@ -338,7 +339,7 @@ public class HandleMap implements Iterable<Row.Entry> {
} }
/** /**
* to signal the initialization thread that no more entries will be sublitted with consumer() * to signal the initialization thread that no more entries will be submitted with consumer()
* this method must be called. The process will not terminate if this is not called before. * this method must be called. The process will not terminate if this is not called before.
*/ */
public void finish(boolean sortAtEnd) { public void finish(boolean sortAtEnd) {

Loading…
Cancel
Save