introduced a default 10 second time-out in rwi normalization time

uring search process to prevent endless deadlocks after a very long
running search
pull/1/head
Michael Peter Christen 13 years ago
parent 8d997d55b6
commit 7c1feefb28

@ -419,7 +419,7 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
* @return a blocking queue filled with WordReferenceVars that is still filled when the object is returned
*/
public static BlockingQueue<WordReferenceVars> transform(final ReferenceContainer<WordReference> container) {
public static BlockingQueue<WordReferenceVars> transform(final ReferenceContainer<WordReference> container, final long maxtime) {
final LinkedBlockingQueue<WordReferenceVars> vars = new LinkedBlockingQueue<WordReferenceVars>();
if (container.size() <= 100) {
// transform without concurrency to omit thread creation overhead
@ -431,7 +431,7 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
} catch (final InterruptedException e) {}
return vars;
}
final Thread distributor = new TransformDistributor(container, vars);
final Thread distributor = new TransformDistributor(container, vars, maxtime);
distributor.start();
// return the resulting queue while the processing queues are still working
@ -442,32 +442,37 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
ReferenceContainer<WordReference> container;
BlockingQueue<WordReferenceVars> out;
long maxtime;
public TransformDistributor(final ReferenceContainer<WordReference> container, final BlockingQueue<WordReferenceVars> out) {
public TransformDistributor(final ReferenceContainer<WordReference> container, final BlockingQueue<WordReferenceVars> out, final long maxtime) {
this.container = container;
this.out = out;
this.maxtime = maxtime;
}
@Override
public void run() {
// start the transformation threads
final int cores0 = Math.min(cores, this.container.size() / 100) + 1;
final Semaphore termination = new Semaphore(cores0);
final TransformWorker[] worker = new TransformWorker[cores0];
for (int i = 0; i < cores0; i++) {
worker[i] = new TransformWorker(this.out, termination);
worker[i] = new TransformWorker(this.out, this.maxtime);
worker[i].start();
}
long timeout = System.currentTimeMillis() + this.maxtime;
// fill the queue
int p = this.container.size();
while (p > 0) {
p--;
worker[p % cores0].add(this.container.get(p, false));
if (p % 100 == 0 && System.currentTimeMillis() > timeout) break;
}
// insert poison to stop the queues
for (int i = 0; i < cores0; i++) worker[i].add(WordReferenceRow.poisonRowEntry);
for (int i = 0; i < cores0; i++) {
worker[i].add(WordReferenceRow.poisonRowEntry);
}
}
}
@ -475,12 +480,12 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
BlockingQueue<Row.Entry> in;
BlockingQueue<WordReferenceVars> out;
Semaphore termination;
long maxtime;
public TransformWorker(final BlockingQueue<WordReferenceVars> out, final Semaphore termination) {
public TransformWorker(final BlockingQueue<WordReferenceVars> out, final long maxtime) {
this.in = new LinkedBlockingQueue<Row.Entry>();
this.out = out;
this.termination = termination;
this.maxtime = maxtime;
}
public void add(final Row.Entry entry) {
@ -493,14 +498,12 @@ public class WordReferenceVars extends AbstractReference implements WordReferenc
@Override
public void run() {
Row.Entry entry;
long timeout = System.currentTimeMillis() + this.maxtime;
try {
while ((entry = this.in.take()) != WordReferenceRow.poisonRowEntry) this.out.put(new WordReferenceVars(new WordReferenceRow(entry)));
} catch (final InterruptedException e) {}
// insert poison to signal the termination to next queue
try {
this.termination.acquire();
if (this.termination.availablePermits() == 0) this.out.put(WordReferenceVars.poison);
while ((entry = this.in.take()) != WordReferenceRow.poisonRowEntry) {
this.out.put(new WordReferenceVars(new WordReferenceRow(entry)));
if (System.currentTimeMillis() > timeout) break;
}
} catch (final InterruptedException e) {}
}
}

@ -786,7 +786,7 @@ public final class Protocol
// store remote result to local result container
// insert one container into the search result buffer
// one is enough, only the references are used, not the word
containerCache.add(container[0], false, target.getName() + "/" + target.hash, result.joincount, true);
containerCache.add(container[0], false, target.getName() + "/" + target.hash, result.joincount, true, 5000);
containerCache.addExpectedRemoteReferences(-count);
// insert the containers to the index

@ -264,7 +264,7 @@ public class Segment {
try {
container = ReferenceContainer.emptyContainer(Segment.wordReferenceFactory, wordhash, 1);
container.add(ientry);
rankingProcess.add(container, true, sourceName, -1, !i.hasNext());
rankingProcess.add(container, true, sourceName, -1, !i.hasNext(), 5000);
} catch (final RowSpaceExceededException e) {
continue;
}

@ -83,7 +83,9 @@ public final class RWIProcess extends Thread
//private final int[] domZones;
private SortedMap<byte[], ReferenceContainer<WordReference>> localSearchInclusion;
private int remote_resourceSize, remote_indexCount, remote_peerCount;
private int remote_resourceSize;
private int remote_indexCount;
private int remote_peerCount;
private int local_indexCount;
private final AtomicInteger maxExpectedRemoteReferences, expectedRemoteReferences,
receivedRemoteReferences;
@ -204,7 +206,7 @@ public final class RWIProcess extends Thread
System.currentTimeMillis() - timer),
false);
if ( !index.isEmpty() ) {
add(index, true, "local index: " + this.query.getSegment().getLocation(), -1, true);
add(index, true, "local index: " + this.query.getSegment().getLocation(), -1, true, 10000);
}
} catch ( final Exception e ) {
Log.logException(e);
@ -218,7 +220,8 @@ public final class RWIProcess extends Thread
final boolean local,
final String resourceName,
final int fullResource,
final boolean finalizeAddAtEnd) {
final boolean finalizeAddAtEnd,
final long maxtime) {
// we collect the urlhashes and construct a list with urlEntry objects
// attention: if minEntries is too high, this method will not terminate within the maxTime
//Log.logInfo("RWIProcess", "added a container, size = " + index.size());
@ -239,7 +242,7 @@ public final class RWIProcess extends Thread
long timer = System.currentTimeMillis();
// normalize entries
final BlockingQueue<WordReferenceVars> decodedEntries = this.order.normalizeWith(index);
final BlockingQueue<WordReferenceVars> decodedEntries = this.order.normalizeWith(index, maxtime);
int is = index.size();
EventTracker.update(EventTracker.EClass.SEARCH, new ProfilingGraph.EventSearch(
this.query.id(true),

@ -65,11 +65,11 @@ public class ReferenceOrder {
this.language = language;
}
public BlockingQueue<WordReferenceVars> normalizeWith(final ReferenceContainer<WordReference> container) {
public BlockingQueue<WordReferenceVars> normalizeWith(final ReferenceContainer<WordReference> container, long maxtime) {
final LinkedBlockingQueue<WordReferenceVars> out = new LinkedBlockingQueue<WordReferenceVars>();
int threads = cores;
if (container.size() < 100) threads = 2;
final Thread distributor = new NormalizeDistributor(container, out, threads);
final Thread distributor = new NormalizeDistributor(container, out, threads, maxtime);
distributor.start();
try {
distributor.join(10); // let the distributor work for at least 10 milliseconds
@ -85,17 +85,19 @@ public class ReferenceOrder {
ReferenceContainer<WordReference> container;
LinkedBlockingQueue<WordReferenceVars> out;
private final int threads;
private final long maxtime;
public NormalizeDistributor(final ReferenceContainer<WordReference> container, final LinkedBlockingQueue<WordReferenceVars> out, final int threads) {
public NormalizeDistributor(final ReferenceContainer<WordReference> container, final LinkedBlockingQueue<WordReferenceVars> out, final int threads, final long maxtime) {
this.container = container;
this.out = out;
this.threads = threads;
this.maxtime = maxtime;
}
@Override
public void run() {
// transform the reference container into a stream of parsed entries
final BlockingQueue<WordReferenceVars> vars = WordReferenceVars.transform(this.container);
final BlockingQueue<WordReferenceVars> vars = WordReferenceVars.transform(this.container, this.maxtime);
// start the transformation threads
final Semaphore termination = new Semaphore(this.threads);
@ -148,6 +150,7 @@ public class ReferenceOrder {
}
}
@Override
public void run() {
try {
WordReferenceVars iEntry;

Loading…
Cancel
Save