|
|
@ -31,9 +31,6 @@ import java.io.FileInputStream;
|
|
|
|
import java.io.FileNotFoundException;
|
|
|
|
import java.io.FileNotFoundException;
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.io.IOException;
|
|
|
|
import java.util.Iterator;
|
|
|
|
import java.util.Iterator;
|
|
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
|
|
|
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
|
|
|
|
|
import java.util.concurrent.Callable;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public class ChunkIterator implements Iterator<byte[]> {
|
|
|
|
public class ChunkIterator implements Iterator<byte[]> {
|
|
|
|
|
|
|
|
|
|
|
@ -96,203 +93,4 @@ public class ChunkIterator implements Iterator<byte[]> {
|
|
|
|
public void remove() {
|
|
|
|
public void remove() {
|
|
|
|
throw new UnsupportedOperationException();
|
|
|
|
throw new UnsupportedOperationException();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
|
|
|
ExecutorService service = Executors.newFixedThreadPool(2);
|
|
|
|
|
|
|
|
filechunkProducer producer;
|
|
|
|
|
|
|
|
filechunkSlicer slicer;
|
|
|
|
|
|
|
|
Future<Integer> producerResult;
|
|
|
|
|
|
|
|
Future<Integer> slicerResult;
|
|
|
|
|
|
|
|
byte[] nextRecord;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public kelondroChunkIterator(final File file, final int recordsize, final int chunksize) throws FileNotFoundException {
|
|
|
|
|
|
|
|
assert (file.exists());
|
|
|
|
|
|
|
|
assert file.length() % recordsize == 0;
|
|
|
|
|
|
|
|
this.chunksize = chunksize;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
service = Executors.newFixedThreadPool(2);
|
|
|
|
|
|
|
|
// buffer size and count calculation is critical, because wrong values
|
|
|
|
|
|
|
|
// will cause blocking of the concurrent consumer/producer threads
|
|
|
|
|
|
|
|
int filebuffersize = 1024 * 16;
|
|
|
|
|
|
|
|
int chunkbuffercountmin = filebuffersize / recordsize + 1; // minimum
|
|
|
|
|
|
|
|
int filebuffercount = 1024 * 1024 / filebuffersize; // max 1 MB
|
|
|
|
|
|
|
|
int chunkbuffercount = chunkbuffercountmin * filebuffercount + 1;
|
|
|
|
|
|
|
|
producer = new filechunkProducer(file, filebuffersize, filebuffercount);
|
|
|
|
|
|
|
|
slicer = new filechunkSlicer(producer, recordsize, chunksize, chunkbuffercount);
|
|
|
|
|
|
|
|
producerResult = service.submit(producer);
|
|
|
|
|
|
|
|
slicerResult = service.submit(slicer);
|
|
|
|
|
|
|
|
service.shutdown();
|
|
|
|
|
|
|
|
nextRecord = slicer.consume();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public boolean hasNext() {
|
|
|
|
|
|
|
|
return nextRecord != null;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public byte[] next() {
|
|
|
|
|
|
|
|
if (nextRecord == null) return null;
|
|
|
|
|
|
|
|
byte[] n = nextRecord;
|
|
|
|
|
|
|
|
nextRecord = slicer.consume();
|
|
|
|
|
|
|
|
return n;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public void remove() {
|
|
|
|
|
|
|
|
throw new UnsupportedOperationException();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
private static class filechunkSlicer implements Callable<Integer> {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private filechunkProducer producer;
|
|
|
|
|
|
|
|
private static byte[] poison = new byte[0];
|
|
|
|
|
|
|
|
private BlockingQueue<byte[]> slices;
|
|
|
|
|
|
|
|
private int slicesize, head;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public filechunkSlicer(filechunkProducer producer, final int slicesize, int head, int stacksize) throws FileNotFoundException {
|
|
|
|
|
|
|
|
assert producer != null;
|
|
|
|
|
|
|
|
this.producer = producer;
|
|
|
|
|
|
|
|
this.slices = new ArrayBlockingQueue<byte[]>(stacksize);
|
|
|
|
|
|
|
|
this.slicesize = slicesize;
|
|
|
|
|
|
|
|
this.head = head;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public byte[] consume() {
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
byte[] b = slices.take();
|
|
|
|
|
|
|
|
if (b == poison) return null; else return b;
|
|
|
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
|
|
|
return null;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private void slice(byte[] from, int startfrom, byte[] to, int startto, int len) {
|
|
|
|
|
|
|
|
if (startto >= head) return;
|
|
|
|
|
|
|
|
if (startto + len > head) len = head - startto;
|
|
|
|
|
|
|
|
assert to.length == head;
|
|
|
|
|
|
|
|
System.arraycopy(from, startfrom, to, startto, len);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public Integer call() {
|
|
|
|
|
|
|
|
filechunk c;
|
|
|
|
|
|
|
|
int p;
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
byte[] slice = new byte[head];
|
|
|
|
|
|
|
|
int slicec = 0;
|
|
|
|
|
|
|
|
consumer: while(true) {
|
|
|
|
|
|
|
|
c = producer.consume();
|
|
|
|
|
|
|
|
if (c == null) {
|
|
|
|
|
|
|
|
// finished. put poison into slices queue
|
|
|
|
|
|
|
|
slices.put(poison);
|
|
|
|
|
|
|
|
break consumer;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
p = 0;
|
|
|
|
|
|
|
|
// copy as much as possible to the current slice
|
|
|
|
|
|
|
|
slicefiller: while (true) {
|
|
|
|
|
|
|
|
assert slicesize > slicec;
|
|
|
|
|
|
|
|
if (c.n - p >= slicesize - slicec) {
|
|
|
|
|
|
|
|
// a full slice can be produced
|
|
|
|
|
|
|
|
slice(c.b, p, slice, slicec, slicesize - slicec);
|
|
|
|
|
|
|
|
// the slice is now full
|
|
|
|
|
|
|
|
p += slicesize - slicec;
|
|
|
|
|
|
|
|
slices.put(slice);
|
|
|
|
|
|
|
|
slice = new byte[head];
|
|
|
|
|
|
|
|
slicec = 0;
|
|
|
|
|
|
|
|
continue slicefiller;
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
// fill only a part of the slice and wait for next chunk
|
|
|
|
|
|
|
|
slice(c.b, p, slice, slicec, c.n - p);
|
|
|
|
|
|
|
|
// the chunk is now fully read
|
|
|
|
|
|
|
|
producer.recycle(c);
|
|
|
|
|
|
|
|
slicec += c.n - p;
|
|
|
|
|
|
|
|
continue consumer;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return Integer.valueOf(0);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class filechunk {
|
|
|
|
|
|
|
|
public byte[] b;
|
|
|
|
|
|
|
|
public int n;
|
|
|
|
|
|
|
|
public filechunk(int len) {
|
|
|
|
|
|
|
|
b = new byte[len];
|
|
|
|
|
|
|
|
n = 0;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* the filechunkProducer reads an in put file and stores chunks of the results
|
|
|
|
|
|
|
|
* into a buffer. All elements stored in the buffer must be recycled.
|
|
|
|
|
|
|
|
* The class does not allocate more memory than a given chunk size multiplied with a
|
|
|
|
|
|
|
|
* number of chunks that shall be stored in a queue for processing.
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
private static class filechunkProducer implements Callable<Integer> {
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private BlockingQueue<filechunk> empty;
|
|
|
|
|
|
|
|
private BlockingQueue<filechunk> filed;
|
|
|
|
|
|
|
|
private static filechunk poison = new filechunk(0);
|
|
|
|
|
|
|
|
private FileInputStream fis;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public filechunkProducer(File in, int bufferSize, int bufferCount) throws FileNotFoundException {
|
|
|
|
|
|
|
|
empty = new ArrayBlockingQueue<filechunk>(bufferCount);
|
|
|
|
|
|
|
|
filed = new ArrayBlockingQueue<filechunk>(bufferCount);
|
|
|
|
|
|
|
|
fis = new FileInputStream(in);
|
|
|
|
|
|
|
|
// fill the empty queue
|
|
|
|
|
|
|
|
for (int i = 0; i < bufferCount; i++) empty.add(new filechunk(bufferSize));
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public void recycle(filechunk c) {
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
empty.put(c);
|
|
|
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public filechunk consume() {
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
filechunk f = filed.take(); // leer
|
|
|
|
|
|
|
|
if (f == poison) return null; else return f;
|
|
|
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
this.fis.close();
|
|
|
|
|
|
|
|
} catch (IOException e1) {
|
|
|
|
|
|
|
|
e1.printStackTrace();
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return null;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public Integer call() {
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
|
|
|
filechunk c;
|
|
|
|
|
|
|
|
while (true) {
|
|
|
|
|
|
|
|
c = empty.take(); // leer
|
|
|
|
|
|
|
|
c.n = fis.read(c.b);
|
|
|
|
|
|
|
|
if (c.n <= 0) break;
|
|
|
|
|
|
|
|
filed.put(c);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// put poison into consumer queue so he can stop consuming
|
|
|
|
|
|
|
|
filed.put(poison);
|
|
|
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
|
|
|
throw new RuntimeException(e.getMessage());
|
|
|
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
|
|
|
throw new RuntimeException(e.getMessage());
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return Integer.valueOf(0);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|