|
|
@ -36,10 +36,8 @@ import java.util.Iterator;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.List;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.Map;
|
|
|
|
import java.util.Random;
|
|
|
|
import java.util.Random;
|
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
|
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
import java.util.concurrent.ExecutorService;
|
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
|
|
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
|
|
@ -84,8 +82,8 @@ public class SplitTable implements ObjectIndex, Iterable<Row.Entry> {
|
|
|
|
private long fileSizeLimit;
|
|
|
|
private long fileSizeLimit;
|
|
|
|
private boolean useTailCache;
|
|
|
|
private boolean useTailCache;
|
|
|
|
private boolean exceed134217727;
|
|
|
|
private boolean exceed134217727;
|
|
|
|
private BlockingQueue<DiscoverOrder> orderQueue;
|
|
|
|
//private BlockingQueue<DiscoverOrder> orderQueue;
|
|
|
|
private Discovery[] discoveryThreads;
|
|
|
|
//private Discovery[] discoveryThreads;
|
|
|
|
|
|
|
|
|
|
|
|
public SplitTable(
|
|
|
|
public SplitTable(
|
|
|
|
final File path,
|
|
|
|
final File path,
|
|
|
@ -112,12 +110,14 @@ public class SplitTable implements ObjectIndex, Iterable<Row.Entry> {
|
|
|
|
this.useTailCache = useTailCache;
|
|
|
|
this.useTailCache = useTailCache;
|
|
|
|
this.exceed134217727 = exceed134217727;
|
|
|
|
this.exceed134217727 = exceed134217727;
|
|
|
|
this.entryOrder = new Row.EntryComparator(rowdef.objectOrder);
|
|
|
|
this.entryOrder = new Row.EntryComparator(rowdef.objectOrder);
|
|
|
|
|
|
|
|
/*
|
|
|
|
this.orderQueue = new LinkedBlockingQueue<DiscoverOrder>();
|
|
|
|
this.orderQueue = new LinkedBlockingQueue<DiscoverOrder>();
|
|
|
|
this.discoveryThreads = new Discovery[Runtime.getRuntime().availableProcessors() + 1];
|
|
|
|
this.discoveryThreads = new Discovery[Runtime.getRuntime().availableProcessors() + 1];
|
|
|
|
for (int i = 0; i < this.discoveryThreads.length; i++) {
|
|
|
|
for (int i = 0; i < this.discoveryThreads.length; i++) {
|
|
|
|
this.discoveryThreads[i] = new Discovery(this.orderQueue);
|
|
|
|
this.discoveryThreads[i] = new Discovery(this.orderQueue);
|
|
|
|
this.discoveryThreads[i].start();
|
|
|
|
this.discoveryThreads[i].start();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
*/
|
|
|
|
init();
|
|
|
|
init();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -330,6 +330,7 @@ public class SplitTable implements ObjectIndex, Iterable<Row.Entry> {
|
|
|
|
* challenge class for concurrent keeperOf implementation
|
|
|
|
* challenge class for concurrent keeperOf implementation
|
|
|
|
*
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
|
|
|
|
/*
|
|
|
|
private static final class Challenge {
|
|
|
|
private static final class Challenge {
|
|
|
|
// the Challenge is a discover order entry
|
|
|
|
// the Challenge is a discover order entry
|
|
|
|
private final byte[] key;
|
|
|
|
private final byte[] key;
|
|
|
@ -363,12 +364,14 @@ public class SplitTable implements ObjectIndex, Iterable<Row.Entry> {
|
|
|
|
return this.discovery;
|
|
|
|
return this.discovery;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* A DiscoverOrder is a class to order a check for a specific table
|
|
|
|
* A DiscoverOrder is a class to order a check for a specific table
|
|
|
|
* for the occurrences of a given key
|
|
|
|
* for the occurrences of a given key
|
|
|
|
*
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
|
|
|
|
/*
|
|
|
|
private static final class DiscoverOrder {
|
|
|
|
private static final class DiscoverOrder {
|
|
|
|
public Challenge challenge;
|
|
|
|
public Challenge challenge;
|
|
|
|
public ObjectIndex objectIndex;
|
|
|
|
public ObjectIndex objectIndex;
|
|
|
@ -394,12 +397,14 @@ public class SplitTable implements ObjectIndex, Iterable<Row.Entry> {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
private static final DiscoverOrder poisonDiscoverOrder = new DiscoverOrder();
|
|
|
|
private static final DiscoverOrder poisonDiscoverOrder = new DiscoverOrder();
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* the Discovery class is used to start some concurrent threads that check the database
|
|
|
|
* the Discovery class is used to start some concurrent threads that check the database
|
|
|
|
* table files for occurrences of key after a keeperOf was submitted
|
|
|
|
* table files for occurrences of key after a keeperOf was submitted
|
|
|
|
*
|
|
|
|
*
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
|
|
|
|
/*
|
|
|
|
private static final class Discovery extends Thread {
|
|
|
|
private static final class Discovery extends Thread {
|
|
|
|
// the class discovers keeper locations in the splitted table
|
|
|
|
// the class discovers keeper locations in the splitted table
|
|
|
|
BlockingQueue<DiscoverOrder> orderQueue;
|
|
|
|
BlockingQueue<DiscoverOrder> orderQueue;
|
|
|
@ -431,16 +436,18 @@ public class SplitTable implements ObjectIndex, Iterable<Row.Entry> {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
|
|
private ObjectIndex keeperOf(final byte[] key) {
|
|
|
|
private ObjectIndex keeperOf(final byte[] key) {
|
|
|
|
if (!discoveriesAlive()) {
|
|
|
|
//if (!discoveriesAlive()) {
|
|
|
|
synchronized (tables) {
|
|
|
|
//synchronized (tables) {
|
|
|
|
for (ObjectIndex oi: tables.values()) {
|
|
|
|
for (ObjectIndex oi: tables.values()) {
|
|
|
|
if (oi.has(key)) return oi;
|
|
|
|
if (oi.has(key)) return oi;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return null;
|
|
|
|
return null;
|
|
|
|
}
|
|
|
|
//}
|
|
|
|
}
|
|
|
|
//}
|
|
|
|
|
|
|
|
/*
|
|
|
|
Challenge challenge = null;
|
|
|
|
Challenge challenge = null;
|
|
|
|
synchronized (tables) {
|
|
|
|
synchronized (tables) {
|
|
|
|
int tableCount = this.tables.size();
|
|
|
|
int tableCount = this.tables.size();
|
|
|
@ -465,6 +472,7 @@ public class SplitTable implements ObjectIndex, Iterable<Row.Entry> {
|
|
|
|
ObjectIndex result = challenge.discover(1000);
|
|
|
|
ObjectIndex result = challenge.discover(1000);
|
|
|
|
//System.out.println("result of discovery: file = " + ((result == null) ? "null" : result.filename()));
|
|
|
|
//System.out.println("result of discovery: file = " + ((result == null) ? "null" : result.filename()));
|
|
|
|
return result;
|
|
|
|
return result;
|
|
|
|
|
|
|
|
*/
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
/*
|
|
|
@ -615,6 +623,7 @@ public class SplitTable implements ObjectIndex, Iterable<Row.Entry> {
|
|
|
|
|
|
|
|
|
|
|
|
public synchronized void close() {
|
|
|
|
public synchronized void close() {
|
|
|
|
// stop discover threads
|
|
|
|
// stop discover threads
|
|
|
|
|
|
|
|
/*
|
|
|
|
if (this.orderQueue != null) for (int i = 0; i < this.discoveryThreads.length; i++) {
|
|
|
|
if (this.orderQueue != null) for (int i = 0; i < this.discoveryThreads.length; i++) {
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
this.orderQueue.put(poisonDiscoverOrder);
|
|
|
|
this.orderQueue.put(poisonDiscoverOrder);
|
|
|
@ -622,7 +631,7 @@ public class SplitTable implements ObjectIndex, Iterable<Row.Entry> {
|
|
|
|
Log.logSevere("SplitTable", "", e);
|
|
|
|
Log.logSevere("SplitTable", "", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
*/
|
|
|
|
if (tables == null) return;
|
|
|
|
if (tables == null) return;
|
|
|
|
this.executor.shutdown();
|
|
|
|
this.executor.shutdown();
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|