speed hacks in BLOB ArrayStack:

- more concurrency if possible
- less threads if no concurrency necessary

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@7527 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 14 years ago
parent a92d80a545
commit 48a61c39a3

@ -523,59 +523,65 @@ public class ArrayStack implements BLOB {
* @return * @return
* @throws IOException * @throws IOException
*/ */
public boolean containsKey(byte[] key) { public synchronized boolean containsKey(byte[] key) {
blobItem bi = keeperOf(key); blobItem bi = keeperOf(key);
return bi != null; return bi != null;
//for (blobItem bi: blobs) if (bi.blob.has(key)) return true; //for (blobItem bi: blobs) if (bi.blob.has(key)) return true;
//return false; //return false;
} }
public blobItem keeperOf(final byte[] key) { /**
// because the index is stored only in one table, * find the blobItem that holds the key
// and the index is completely in RAM, a concurrency will create * if no blobItem is found, then return null
// not concurrent File accesses * @param key
//long start = System.currentTimeMillis(); * @return the blobItem that holds the key or null if no blobItem is found
*/
private blobItem keeperOf(final byte[] key) {
if (blobs.size() == 0) return null;
if (blobs.size() == 1) {
blobItem bi = blobs.get(0);
if (bi.blob.containsKey(key)) return bi;
return null;
}
// start a concurrent query to database tables // start a concurrent query to database tables
final CompletionService<blobItem> cs = new ExecutorCompletionService<blobItem>(executor); final CompletionService<blobItem> cs = new ExecutorCompletionService<blobItem>(executor);
int accepted = 0; int accepted = 0;
synchronized (this) { for (final blobItem bi : blobs) {
for (final blobItem bi : blobs) {
try {
cs.submit(new Callable<blobItem>() {
public blobItem call() {
if (bi.blob.containsKey(key)) return bi;
return null;
}
});
accepted++;
} catch (final RejectedExecutionException e) {
// the executor is either shutting down or the blocking queue is full
// execute the search direct here without concurrency
if (bi.blob.containsKey(key)) return bi;
}
}
// read the result
try { try {
for (int i = 0; i < accepted; i++) { cs.submit(new Callable<blobItem>() {
final Future<blobItem> f = cs.take(); public blobItem call() {
//hash(System.out.println("**********accepted = " + accepted + ", i =" + i); if (bi.blob.containsKey(key)) return bi;
if (f == null) continue; return null;
final blobItem index = f.get();
if (index != null) {
//System.out.println("*DEBUG SplitTable success.time = " + (System.currentTimeMillis() - start) + " ms");
return index;
} }
});
accepted++;
} catch (final RejectedExecutionException e) {
// the executor is either shutting down or the blocking queue is full
// execute the search direct here without concurrency
if (bi.blob.containsKey(key)) return bi;
}
}
// read the result
try {
for (int i = 0; i < accepted; i++) {
final Future<blobItem> f = cs.take();
//hash(System.out.println("**********accepted = " + accepted + ", i =" + i);
if (f == null) continue;
final blobItem index = f.get();
if (index != null) {
//System.out.println("*DEBUG SplitTable success.time = " + (System.currentTimeMillis() - start) + " ms");
return index;
} }
//System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms");
return null;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} catch (final ExecutionException e) {
Log.logSevere("ArrayStack", "", e);
throw new RuntimeException(e.getCause());
} }
//System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms");
return null;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
} catch (final ExecutionException e) {
Log.logSevere("ArrayStack", "", e);
throw new RuntimeException(e.getCause());
} }
//System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms"); //System.out.println("*DEBUG SplitTable fail.time = " + (System.currentTimeMillis() - start) + " ms");
return null; return null;
@ -588,16 +594,23 @@ public class ArrayStack implements BLOB {
* @throws IOException * @throws IOException
*/ */
public synchronized byte[] get(byte[] key) throws IOException, RowSpaceExceededException { public synchronized byte[] get(byte[] key) throws IOException, RowSpaceExceededException {
//blobItem bi = keeperOf(key); if (blobs.size() == 0) return null;
//return (bi == null) ? null : bi.blob.get(key); if (blobs.size() == 1) {
blobItem bi = blobs.get(0);
return bi.blob.get(key);
}
blobItem bi = keeperOf(key);
return (bi == null) ? null : bi.blob.get(key);
/*
byte[] b; byte[] b;
for (blobItem bi: blobs) { for (blobItem bi: blobs) {
b = bi.blob.get(key); b = bi.blob.get(key);
if (b != null) return b; if (b != null) return b;
} }
return null; return null;
*/
} }
public byte[] get(Object key) { public byte[] get(Object key) {
@ -773,13 +786,32 @@ public class ArrayStack implements BLOB {
} }
/** /**
* remove a BLOB * delete a BLOB
* @param key the primary key * @param key the primary key
* @throws IOException * @throws IOException
*/ */
public synchronized void delete(byte[] key) throws IOException { public synchronized void delete(final byte[] key) throws IOException {
long m = this.mem(); long m = this.mem();
for (blobItem bi: blobs) bi.blob.delete(key); if (blobs.size() == 0) {
// do nothing
} else if (blobs.size() == 1) {
blobItem bi = blobs.get(0);
bi.blob.delete(key);
} else {
Thread[] t = new Thread[blobs.size()];
int i = 0;
for (blobItem bi: blobs) {
final blobItem bi0 = bi;
t[i] = new Thread() {
public void run() {
try { bi0.blob.delete(key); } catch (IOException e) {}
}
};
t[i].start();
i++;
}
for (Thread s: t) try {s.join();} catch (InterruptedException e) {}
}
assert this.mem() <= m : "m = " + m + ", mem() = " + mem(); assert this.mem() <= m : "m = " + m + ", mem() = " + mem();
} }

Loading…
Cancel
Save