Corrected another case of

org.apache.lucene.store.AlreadyClosedException" occuring when
SearchEvent.cleanup() was called while committing local solr index.
pull/34/head
luc 9 years ago
parent befb2415f8
commit ba0a293f5c

@ -60,6 +60,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.http.entity.mime.content.ContentBody; import org.apache.http.entity.mime.content.ContentBody;
@ -487,7 +488,7 @@ public final class Protocol {
final int partitions, final int partitions,
final Seed target, final Seed target,
final SecondarySearchSuperviser secondarySearchSuperviser, final SecondarySearchSuperviser secondarySearchSuperviser,
final Blacklist blacklist) { final Blacklist blacklist) throws InterruptedException {
// send a search request to peer with remote Hash // send a search request to peer with remote Hash
// INPUT: // INPUT:
@ -586,7 +587,7 @@ public final class Protocol {
final int maxDistance, final int maxDistance,
final int partitions, final int partitions,
final Seed target, final Seed target,
final Blacklist blacklist) { final Blacklist blacklist) throws InterruptedException {
final long timestamp = System.currentTimeMillis(); final long timestamp = System.currentTimeMillis();
event.addExpectedRemoteReferences(count); event.addExpectedRemoteReferences(count);
@ -639,7 +640,7 @@ public final class Protocol {
final Seed target, final Seed target,
final Blacklist blacklist, final Blacklist blacklist,
final SearchResult result final SearchResult result
) throws SpaceExceededException { ) throws SpaceExceededException, InterruptedException {
// create containers // create containers
final int words = wordhashes.length() / Word.commonHashLength; final int words = wordhashes.length() / Word.commonHashLength;
@ -745,14 +746,24 @@ public final class Protocol {
// insert one container into the search result buffer // insert one container into the search result buffer
// one is enough, only the references are used, not the word // one is enough, only the references are used, not the word
if (event.addResultsToLocalIndex) { if (event.addResultsToLocalIndex) {
for (URIMetadataNode entry : storeDocs) { /*
try { * Current thread might be interrupted by SearchEvent.cleanup()
event.query.getSegment().setFirstSeenTime(entry.hash(), Math.min(entry.moddate().getTime(), System.currentTimeMillis())); */
event.query.getSegment().fulltext().putMetadata(entry); // it will be checked inside the putMetadata that poor metadata does not overwrite rich metadata if (Thread.interrupted()) {
} catch (final IOException e) { throw new InterruptedException("solrQuery interrupted");
ConcurrentLog.logException(e); }
} WriteMetadataNodeToLocalIndexThread writerToLocalIndex = new WriteMetadataNodeToLocalIndexThread(event.query.getSegment(), storeDocs);
} writerToLocalIndex.start();
try {
writerToLocalIndex.join();
} catch(InterruptedException e) {
/*
* Current thread interruption might happen while waiting
* for writeToLocalIndexThread.
*/
writerToLocalIndex.stopWriting();
throw new InterruptedException("remoteProcess stopped!");
}
event.addRWIs(container.get(0), false, target.getName() + "/" + target.hash, result.totalCount, time); event.addRWIs(container.get(0), false, target.getName() + "/" + target.hash, result.totalCount, time);
} else { } else {
// feed results as nodes (SolrQuery results) which carry metadata, // feed results as nodes (SolrQuery results) which carry metadata,
@ -783,6 +794,56 @@ public final class Protocol {
} }
Network.log.info("remote search: peer " + target.getName() + " sent " + container.get(0).size() + "/" + result.totalCount + " references"); Network.log.info("remote search: peer " + target.getName() + " sent " + container.get(0).size() + "/" + result.totalCount + " references");
} }
/**
* This thread is used to write a collection of URIMetadataNode documents to a segment allowing to be safely stopped.
* Indeed, if one interrupt a thread while commiting to Solr index, the index is closed and will be no more writable
* (later calls would throw a org.apache.lucene.store.AlreadyClosedException) because Solr IndexWriter uses an InterruptibleChanel.
* This thread allow to safely stop writing operation using an AtomicBoolean.
* @author luc
*
*/
private static class WriteMetadataNodeToLocalIndexThread extends Thread {
private AtomicBoolean stop = new AtomicBoolean(false);
private Segment segment;
private Collection<URIMetadataNode> storeDocs;
/**
* Parameters must be not null.
* @param segment solr segment to write
* @param storeDocs solr documents collection to put to segment
*/
public WriteMetadataNodeToLocalIndexThread(Segment segment, Collection<URIMetadataNode> storeDocs) {
this.segment = segment;
this.storeDocs = storeDocs;
}
/**
* Use this to stop writing operation. This thread will not stop immediately as Solr might be writing something.
*/
public void stopWriting() {
this.stop.set(true);
}
@Override
public void run() {
for (URIMetadataNode entry : this.storeDocs) {
if(stop.get()) {
Network.log.info("Writing documents collection to Solr segment was stopped.");
return;
}
try {
segment.setFirstSeenTime(entry.hash(), Math.min(entry.moddate().getTime(), System.currentTimeMillis()));
segment.fulltext().putMetadata(entry); // it will be checked inside the putMetadata that poor metadata does not overwrite rich metadata
} catch (final IOException e) {
ConcurrentLog.logException(e);
}
}
}
}
private static class SearchResult { private static class SearchResult {
public int availableCount; // number of returned LURL's for this search public int availableCount; // number of returned LURL's for this search
@ -948,7 +1009,7 @@ public final class Protocol {
final int count, final int count,
final Seed target, final Seed target,
final int partitions, final int partitions,
final Blacklist blacklist) { final Blacklist blacklist) throws InterruptedException {
//try {System.out.println("*** debug-query *** " + URLDecoder.decode(solrQuery.toString(), "UTF-8"));} catch (UnsupportedEncodingException e) {} //try {System.out.println("*** debug-query *** " + URLDecoder.decode(solrQuery.toString(), "UTF-8"));} catch (UnsupportedEncodingException e) {}
@ -1177,10 +1238,26 @@ public final class Protocol {
Network.log.info("local search (solr): localpeer sent " + container.size() + "/" + numFound + " references"); Network.log.info("local search (solr): localpeer sent " + container.size() + "/" + numFound + " references");
} else { } else {
if (event.addResultsToLocalIndex) { if (event.addResultsToLocalIndex) {
for (SolrInputDocument doc: docs) { /*
event.query.getSegment().putDocument(doc); * Current thread might be interrupted by SearchEvent.cleanup()
} */
docs.clear(); docs = null; if (Thread.interrupted()) {
throw new InterruptedException("solrQuery interrupted");
}
WriteToLocalIndexThread writeToLocalIndexThread = new WriteToLocalIndexThread(event.query.getSegment(),
docs);
writeToLocalIndexThread.start();
try {
writeToLocalIndexThread.join();
} catch (InterruptedException e) {
/*
* Current thread interruption might happen while waiting
* for writeToLocalIndexThread.
*/
writeToLocalIndexThread.stopWriting();
throw new InterruptedException("solrQuery interrupted");
}
docs.clear();
} }
event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, numFound); event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, numFound);
event.addFinalize(); event.addFinalize();
@ -1190,6 +1267,51 @@ public final class Protocol {
return dls; return dls;
} }
/**
* This thread is used to write a collection of Solr documents to a segment allowing to be safely stopped.
* Indeed, if one interrupt a thread while commiting to Solr index, the index is closed and will be no more writable
* (later calls would throw a org.apache.lucene.store.AlreadyClosedException) because Solr IndexWriter uses an InterruptibleChanel.
* This thead allow to safely stop writing operation using an AtomicBoolean.
* @author luc
*
*/
private static class WriteToLocalIndexThread extends Thread {
private AtomicBoolean stop = new AtomicBoolean(false);
private Segment segment;
private Collection<SolrInputDocument> docs;
/**
* Parameters must be not null.
* @param segment solr segment to write
* @param docs solr documents collection to put to segment
*/
public WriteToLocalIndexThread(Segment segment, Collection<SolrInputDocument> docs) {
this.segment = segment;
this.docs = docs;
}
/**
* Use this to stop writing operation. This thread will not stop immediately as Solr might be writing something.
*/
public void stopWriting() {
this.stop.set(true);
}
@Override
public void run() {
for (SolrInputDocument doc: docs) {
if(stop.get()) {
Network.log.info("Writing documents collection to Solr segment was stopped.");
return;
}
segment.putDocument(doc);
}
}
}
/** /**
* Only when maxSize is greater than zero, check that doc size is lower. To * Only when maxSize is greater than zero, check that doc size is lower. To
* process in a reasonable amount of time, document size is not evaluated * process in a reasonable amount of time, document size is not evaluated

@ -121,6 +121,8 @@ public class RemoteSearch extends Thread {
} else { } else {
Network.log.info("REMOTE SEARCH - no answer from remote peer " + this.targetPeer.hash + ":" + this.targetPeer.getName()); Network.log.info("REMOTE SEARCH - no answer from remote peer " + this.targetPeer.hash + ":" + this.targetPeer.getName());
} }
} catch(InterruptedException e) {
Network.log.info("REMOTE SEARCH - interrupted search to remote peer " + this.targetPeer.hash + ":" + this.targetPeer.getName());
} catch (final Exception e) { } catch (final Exception e) {
ConcurrentLog.logException(e); ConcurrentLog.logException(e);
} finally { } finally {
@ -309,6 +311,8 @@ public class RemoteSearch extends Thread {
} else { } else {
Network.log.info("REMOTE SEARCH - no answer from remote peer " + targetPeer.hash + ":" + targetPeer.getName()); Network.log.info("REMOTE SEARCH - no answer from remote peer " + targetPeer.hash + ":" + targetPeer.getName());
} }
} catch (final InterruptedException e) {
Network.log.info("REMOTE SEARCH - interrupted search to remote peer " + targetPeer.hash + ":" + targetPeer.getName());
} catch (final Exception e) { } catch (final Exception e) {
ConcurrentLog.logException(e); ConcurrentLog.logException(e);
} finally { } finally {
@ -359,6 +363,8 @@ public class RemoteSearch extends Thread {
Network.log.info("REMOTE SEARCH - no answer from remote peer " + targetPeer.hash + ":" + targetPeer.getName()); Network.log.info("REMOTE SEARCH - no answer from remote peer " + targetPeer.hash + ":" + targetPeer.getName());
} }
} }
} catch (final InterruptedException e) {
Network.log.info("REMOTE SEARCH - interrupted search to remote peer " + targetPeer.hash + ":" + targetPeer.getName());
} catch (final Exception e) { } catch (final Exception e) {
ConcurrentLog.logException(e); ConcurrentLog.logException(e);
} finally { } finally {

Loading…
Cancel
Save