@ -24,8 +24,6 @@
package de.anomic.kelondro ;
package de.anomic.kelondro ;
import java.io.BufferedInputStream ;
import java.io.DataInputStream ;
import java.io.File ;
import java.io.File ;
import java.io.FileInputStream ;
import java.io.FileInputStream ;
import java.io.FileNotFoundException ;
import java.io.FileNotFoundException ;
@ -34,6 +32,9 @@ import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue ;
import java.util.concurrent.ArrayBlockingQueue ;
import java.util.concurrent.BlockingQueue ;
import java.util.concurrent.BlockingQueue ;
import java.util.concurrent.Callable ;
import java.util.concurrent.Callable ;
import java.util.concurrent.ExecutorService ;
import java.util.concurrent.Executors ;
import java.util.concurrent.Future ;
public class kelondroChunkIterator implements Iterator < byte [ ] > {
public class kelondroChunkIterator implements Iterator < byte [ ] > {
@ -48,7 +49,7 @@ public class kelondroChunkIterator implements Iterator<byte[]> {
* @param chunksize : the size of the chunks that are returned by next ( ) . remaining bytes until the lenght of recordsize are skipped
* @param chunksize : the size of the chunks that are returned by next ( ) . remaining bytes until the lenght of recordsize are skipped
* @throws FileNotFoundException
* @throws FileNotFoundException
* /
* /
/ *
private final DataInputStream stream ;
private final DataInputStream stream ;
private byte [ ] nextBytes ;
private byte [ ] nextBytes ;
@ -98,7 +99,7 @@ public class kelondroChunkIterator implements Iterator<byte[]> {
}
}
/*
*/
ExecutorService service = Executors . newFixedThreadPool ( 2 ) ;
ExecutorService service = Executors . newFixedThreadPool ( 2 ) ;
filechunkProducer producer ;
filechunkProducer producer ;
filechunkSlicer slicer ;
filechunkSlicer slicer ;
@ -133,9 +134,7 @@ public class kelondroChunkIterator implements Iterator<byte[]> {
public byte [ ] next ( ) {
public byte [ ] next ( ) {
if ( nextRecord = = null ) return null ;
if ( nextRecord = = null ) return null ;
byte [ ] n = new byte [ chunksize ] ;
byte [ ] n = nextRecord ;
System . arraycopy ( nextRecord , 0 , n , 0 , chunksize ) ;
slicer . recycle ( nextRecord ) ;
nextRecord = slicer . consume ( ) ;
nextRecord = slicer . consume ( ) ;
return n ;
return n ;
}
}
@ -143,13 +142,11 @@ public class kelondroChunkIterator implements Iterator<byte[]> {
public void remove ( ) {
public void remove ( ) {
throw new UnsupportedOperationException ( ) ;
throw new UnsupportedOperationException ( ) ;
}
}
* /
private static class filechunkSlicer implements Callable < Integer > {
private static class filechunkSlicer implements Callable < Integer > {
private filechunkProducer producer ;
private filechunkProducer producer ;
private static byte [ ] poison = new byte [ 0 ] ;
private static byte [ ] poison = new byte [ 0 ] ;
private BlockingQueue < byte [ ] > empty ;
private BlockingQueue < byte [ ] > slices ;
private BlockingQueue < byte [ ] > slices ;
private int slicesize , head ;
private int slicesize , head ;
@ -157,21 +154,10 @@ public class kelondroChunkIterator implements Iterator<byte[]> {
assert producer ! = null ;
assert producer ! = null ;
this . producer = producer ;
this . producer = producer ;
this . slices = new ArrayBlockingQueue < byte [ ] > ( stacksize ) ;
this . slices = new ArrayBlockingQueue < byte [ ] > ( stacksize ) ;
this . empty = new ArrayBlockingQueue < byte [ ] > ( stacksize ) ;
this . slicesize = slicesize ;
this . slicesize = slicesize ;
this . head = head ;
this . head = head ;
// fill the empty queue
for ( int i = 0 ; i < stacksize ; i + + ) empty . add ( new byte [ head ] ) ;
}
}
public void recycle ( byte [ ] c ) {
try {
empty . put ( c ) ;
} catch ( InterruptedException e ) {
e . printStackTrace ( ) ;
}
}
public byte [ ] consume ( ) {
public byte [ ] consume ( ) {
try {
try {
byte [ ] b = slices . take ( ) ; // leer
byte [ ] b = slices . take ( ) ; // leer
@ -193,7 +179,7 @@ public class kelondroChunkIterator implements Iterator<byte[]> {
filechunk c ;
filechunk c ;
int p ;
int p ;
try {
try {
byte [ ] slice = empty . take ( ) ;
byte [ ] slice = new byte [ head ] ;
int slicec = 0 ;
int slicec = 0 ;
consumer : while ( true ) {
consumer : while ( true ) {
c = producer . consume ( ) ;
c = producer . consume ( ) ;
@ -212,7 +198,7 @@ public class kelondroChunkIterator implements Iterator<byte[]> {
// the slice is now full
// the slice is now full
p + = slicesize - slicec ;
p + = slicesize - slicec ;
slices . put ( slice ) ;
slices . put ( slice ) ;
slice = empty . take ( ) ;
slice = new byte [ head ] ;
slicec = 0 ;
slicec = 0 ;
continue slicefiller ;
continue slicefiller ;
} else {
} else {