Unsure closing ChunkIterator stream in every possible use case.

Also trace in logs the eventual close failures instead of failing
silently.
This should help prevent holding too many unreleased system file
handlers, as in the case reported by eros on YaCy forum
(http://forum.yacy-websuche.de/viewtopic.php?f=23&t=5988&sid=b00e7486c1bf7e48a0d63eb328ccca02
)
pull/122/head
luccioman 8 years ago
parent 29e52bda39
commit c53c58fa85

@ -48,7 +48,7 @@ public class ChunkIterator extends LookAheadIterator<byte[]> implements Iterator
* ATTENTION: if the iterator is not read to the end or interrupted, close() must be called to release the InputStream
* @param file: the file
* @param recordsize: the size of the elements in the file
* @param chunksize: the size of the chunks that are returned by next(). remaining bytes until the lenght of recordsize are skipped
* @param chunksize: the size of the chunks that are returned by next(). remaining bytes until the length of recordsize are skipped
* @throws FileNotFoundException
*/
@ -66,9 +66,10 @@ public class ChunkIterator extends LookAheadIterator<byte[]> implements Iterator
}
/**
* Special close methode to release the used InputStream
* Special close method to release the used InputStream
* stream is automatically closed on last next(),
* close() needs only be called if iterator not read to the end ( hasNext() or next() has not returned null)
* close() needs only to be called if iterator did not read to the end ( hasNext() or next() has not returned null)
* @throws IOException when the stream could not be closed
*/
public void close() throws IOException {
this.stream.close();
@ -78,29 +79,43 @@ public class ChunkIterator extends LookAheadIterator<byte[]> implements Iterator
public byte[] next0() {
final byte[] chunk = new byte[chunksize];
int r, s;
byte[] result = null;
try {
// read the chunk
this.stream.readFully(chunk);
// skip remaining bytes
r = chunksize;
boolean skipError = false;
while (r < recordsize) {
s = (int) this.stream.skip(recordsize - r);
assert s > 0;
if (s <= 0) return null;
if (s <= 0) {
skipError = true;
break;
}
r += s;
}
return chunk;
if(!skipError) {
result = chunk;
}
} catch (final EOFException e) {
// no real exception, this is the normal termination
try {
this.stream.close(); // close the underlaying inputstream
} catch (IOException ex) {
ConcurrentLog.logException(ex);
}
return null;
result = null;
} catch (final IOException e) {
/* Return normally but trace the exception in log */
ConcurrentLog.logException(e);
return null;
result = null;
} finally {
if(result == null) {
/* when result is null (normal termination or not),
* we must ensure the underlying input stream is closed as we must not keep system file handlers open */
try {
this.stream.close();
} catch (IOException ex) {
ConcurrentLog.logException(ex);
}
}
}
return result;
}
}

@ -158,42 +158,77 @@ public class Table implements Index, Iterable<Row.Entry> {
int i = 0;
byte[] key;
if (this.table == null) {
final Iterator<byte[]> ki = new ChunkIterator(tablefile, rowdef.objectsize, rowdef.primaryKeyLength);
while (ki.hasNext()) {
key = ki.next();
// write the key into the index table
assert key != null;
if (key == null) {i++; continue;}
if (rowdef.objectOrder.wellformed(key)) {
this.index.putUnique(key, i++);
} else {
errors.putUnique(key, i++);
}
final ChunkIterator ki = new ChunkIterator(tablefile, rowdef.objectsize, rowdef.primaryKeyLength);
try {
while (ki.hasNext()) {
key = ki.next();
// write the key into the index table
assert key != null;
if (key == null) {i++; continue;}
if (rowdef.objectOrder.wellformed(key)) {
this.index.putUnique(key, i++);
} else {
errors.putUnique(key, i++);
}
}
} finally {
/* If any error occurred while looping over the iterator, we
* must ensure the underlying stream is closed before
* transmitting the exception to the upper layer
*/
if(ki.hasNext()) {
try {
ki.close();
} catch(IOException ioe) {
/* Do not block if closing is not possible but anyway keep a trace in log */
log.warn("Could not close input stream on the file " + tablefile);
}
}
}
} else {
byte[] record;
key = new byte[rowdef.primaryKeyLength];
final ChunkIterator ri = new ChunkIterator(tablefile, rowdef.objectsize, rowdef.objectsize);
while (ri.hasNext()) {
record = ri.next();
assert record != null;
if (record == null) {i++; continue;}
System.arraycopy(record, 0, key, 0, rowdef.primaryKeyLength);
// write the key into the index table
if (rowdef.objectOrder.wellformed(key)) {
this.index.putUnique(key, i++);
// write the tail into the table
try {
this.table.addUnique(this.taildef.newEntry(record, rowdef.primaryKeyLength, true));
} catch (final SpaceExceededException e) {
this.table = null;
ri.close(); // close inputstream of chunkiterator
break;
}
} else {
errors.putUnique(key, i++);
}
try {
while (ri.hasNext()) {
record = ri.next();
assert record != null;
if (record == null) {i++; continue;}
System.arraycopy(record, 0, key, 0, rowdef.primaryKeyLength);
// write the key into the index table
if (rowdef.objectOrder.wellformed(key)) {
this.index.putUnique(key, i++);
// write the tail into the table
try {
this.table.addUnique(this.taildef.newEntry(record, rowdef.primaryKeyLength, true));
} catch (final SpaceExceededException e) {
this.table = null;
try {
ri.close(); // close inputstream of chunkiterator
} finally {
/* Do not block if closing is not possible but anyway keep a trace in log */
log.warn("Could not close input stream on the file " + tablefile);
}
break;
}
} else {
errors.putUnique(key, i++);
}
}
} finally {
/* If any error occurred while looping over the iterator, we
* must ensure the underlying stream is closed before
* transmitting the exception to the upper layer
*/
if(ri.hasNext()) {
try {
ri.close();
} catch(IOException ioe) {
/* Do not block if closing is not possible but anyway keep a trace in log */
log.warn("Could not close input stream on the file " + tablefile);
}
}
}
Runtime.getRuntime().gc();
if (abandonTable()) {

Loading…
Cancel
Save