From ba0a293f5cf98b970481fc25a0a131b39f01cb6c Mon Sep 17 00:00:00 2001 From: luc Date: Wed, 16 Dec 2015 02:26:40 +0100 Subject: [PATCH] Corrected another case of org.apache.lucene.store.AlreadyClosedException" occuring when SearchEvent.cleanup() was called while committing local solr index. --- source/net/yacy/peers/Protocol.java | 154 +++++++++++++++++++++--- source/net/yacy/peers/RemoteSearch.java | 6 + 2 files changed, 144 insertions(+), 16 deletions(-) diff --git a/source/net/yacy/peers/Protocol.java b/source/net/yacy/peers/Protocol.java index f51a933cb..72a707f56 100644 --- a/source/net/yacy/peers/Protocol.java +++ b/source/net/yacy/peers/Protocol.java @@ -60,6 +60,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.apache.http.entity.mime.content.ContentBody; @@ -487,7 +488,7 @@ public final class Protocol { final int partitions, final Seed target, final SecondarySearchSuperviser secondarySearchSuperviser, - final Blacklist blacklist) { + final Blacklist blacklist) throws InterruptedException { // send a search request to peer with remote Hash // INPUT: @@ -586,7 +587,7 @@ public final class Protocol { final int maxDistance, final int partitions, final Seed target, - final Blacklist blacklist) { + final Blacklist blacklist) throws InterruptedException { final long timestamp = System.currentTimeMillis(); event.addExpectedRemoteReferences(count); @@ -639,7 +640,7 @@ public final class Protocol { final Seed target, final Blacklist blacklist, final SearchResult result - ) throws SpaceExceededException { + ) throws SpaceExceededException, InterruptedException { // create containers final int words = wordhashes.length() / Word.commonHashLength; @@ -745,14 +746,24 @@ public final class Protocol { // insert one container into the search result buffer // one is enough, only the references are used, not the word if (event.addResultsToLocalIndex) { - for (URIMetadataNode entry : storeDocs) { - try { - event.query.getSegment().setFirstSeenTime(entry.hash(), Math.min(entry.moddate().getTime(), System.currentTimeMillis())); - event.query.getSegment().fulltext().putMetadata(entry); // it will be checked inside the putMetadata that poor metadata does not overwrite rich metadata - } catch (final IOException e) { - ConcurrentLog.logException(e); - } - } + /* + * Current thread might be interrupted by SearchEvent.cleanup() + */ + if (Thread.interrupted()) { + throw new InterruptedException("solrQuery interrupted"); + } + WriteMetadataNodeToLocalIndexThread writerToLocalIndex = new WriteMetadataNodeToLocalIndexThread(event.query.getSegment(), storeDocs); + writerToLocalIndex.start(); + try { + writerToLocalIndex.join(); + } catch(InterruptedException e) { + /* + * Current thread interruption might happen while waiting + * for writeToLocalIndexThread. + */ + writerToLocalIndex.stopWriting(); + throw new InterruptedException("remoteProcess stopped!"); + } event.addRWIs(container.get(0), false, target.getName() + "/" + target.hash, result.totalCount, time); } else { // feed results as nodes (SolrQuery results) which carry metadata, @@ -783,6 +794,56 @@ public final class Protocol { } Network.log.info("remote search: peer " + target.getName() + " sent " + container.get(0).size() + "/" + result.totalCount + " references"); } + + /** + * This thread is used to write a collection of URIMetadataNode documents to a segment allowing to be safely stopped. + * Indeed, if one interrupt a thread while commiting to Solr index, the index is closed and will be no more writable + * (later calls would throw a org.apache.lucene.store.AlreadyClosedException) because Solr IndexWriter uses an InterruptibleChanel. + * This thread allow to safely stop writing operation using an AtomicBoolean. + * @author luc + * + */ + private static class WriteMetadataNodeToLocalIndexThread extends Thread { + + private AtomicBoolean stop = new AtomicBoolean(false); + + private Segment segment; + + private Collection storeDocs; + + /** + * Parameters must be not null. + * @param segment solr segment to write + * @param storeDocs solr documents collection to put to segment + */ + public WriteMetadataNodeToLocalIndexThread(Segment segment, Collection storeDocs) { + this.segment = segment; + this.storeDocs = storeDocs; + } + + /** + * Use this to stop writing operation. This thread will not stop immediately as Solr might be writing something. + */ + public void stopWriting() { + this.stop.set(true); + } + + @Override + public void run() { + for (URIMetadataNode entry : this.storeDocs) { + if(stop.get()) { + Network.log.info("Writing documents collection to Solr segment was stopped."); + return; + } + try { + segment.setFirstSeenTime(entry.hash(), Math.min(entry.moddate().getTime(), System.currentTimeMillis())); + segment.fulltext().putMetadata(entry); // it will be checked inside the putMetadata that poor metadata does not overwrite rich metadata + } catch (final IOException e) { + ConcurrentLog.logException(e); + } + } + } + } private static class SearchResult { public int availableCount; // number of returned LURL's for this search @@ -948,7 +1009,7 @@ public final class Protocol { final int count, final Seed target, final int partitions, - final Blacklist blacklist) { + final Blacklist blacklist) throws InterruptedException { //try {System.out.println("*** debug-query *** " + URLDecoder.decode(solrQuery.toString(), "UTF-8"));} catch (UnsupportedEncodingException e) {} @@ -1177,10 +1238,26 @@ public final class Protocol { Network.log.info("local search (solr): localpeer sent " + container.size() + "/" + numFound + " references"); } else { if (event.addResultsToLocalIndex) { - for (SolrInputDocument doc: docs) { - event.query.getSegment().putDocument(doc); - } - docs.clear(); docs = null; + /* + * Current thread might be interrupted by SearchEvent.cleanup() + */ + if (Thread.interrupted()) { + throw new InterruptedException("solrQuery interrupted"); + } + WriteToLocalIndexThread writeToLocalIndexThread = new WriteToLocalIndexThread(event.query.getSegment(), + docs); + writeToLocalIndexThread.start(); + try { + writeToLocalIndexThread.join(); + } catch (InterruptedException e) { + /* + * Current thread interruption might happen while waiting + * for writeToLocalIndexThread. + */ + writeToLocalIndexThread.stopWriting(); + throw new InterruptedException("solrQuery interrupted"); + } + docs.clear(); } event.addNodes(container, facets, snippets, false, target.getName() + "/" + target.hash, numFound); event.addFinalize(); @@ -1190,6 +1267,51 @@ public final class Protocol { return dls; } + /** + * This thread is used to write a collection of Solr documents to a segment allowing to be safely stopped. + * Indeed, if one interrupt a thread while commiting to Solr index, the index is closed and will be no more writable + * (later calls would throw a org.apache.lucene.store.AlreadyClosedException) because Solr IndexWriter uses an InterruptibleChanel. + * This thead allow to safely stop writing operation using an AtomicBoolean. + * @author luc + * + */ + private static class WriteToLocalIndexThread extends Thread { + + private AtomicBoolean stop = new AtomicBoolean(false); + + private Segment segment; + + private Collection docs; + + /** + * Parameters must be not null. + * @param segment solr segment to write + * @param docs solr documents collection to put to segment + */ + public WriteToLocalIndexThread(Segment segment, Collection docs) { + this.segment = segment; + this.docs = docs; + } + + /** + * Use this to stop writing operation. This thread will not stop immediately as Solr might be writing something. + */ + public void stopWriting() { + this.stop.set(true); + } + + @Override + public void run() { + for (SolrInputDocument doc: docs) { + if(stop.get()) { + Network.log.info("Writing documents collection to Solr segment was stopped."); + return; + } + segment.putDocument(doc); + } + } + } + /** * Only when maxSize is greater than zero, check that doc size is lower. To * process in a reasonable amount of time, document size is not evaluated diff --git a/source/net/yacy/peers/RemoteSearch.java b/source/net/yacy/peers/RemoteSearch.java index cccc08f61..64a98d61b 100644 --- a/source/net/yacy/peers/RemoteSearch.java +++ b/source/net/yacy/peers/RemoteSearch.java @@ -121,6 +121,8 @@ public class RemoteSearch extends Thread { } else { Network.log.info("REMOTE SEARCH - no answer from remote peer " + this.targetPeer.hash + ":" + this.targetPeer.getName()); } + } catch(InterruptedException e) { + Network.log.info("REMOTE SEARCH - interrupted search to remote peer " + this.targetPeer.hash + ":" + this.targetPeer.getName()); } catch (final Exception e) { ConcurrentLog.logException(e); } finally { @@ -309,6 +311,8 @@ public class RemoteSearch extends Thread { } else { Network.log.info("REMOTE SEARCH - no answer from remote peer " + targetPeer.hash + ":" + targetPeer.getName()); } + } catch (final InterruptedException e) { + Network.log.info("REMOTE SEARCH - interrupted search to remote peer " + targetPeer.hash + ":" + targetPeer.getName()); } catch (final Exception e) { ConcurrentLog.logException(e); } finally { @@ -359,6 +363,8 @@ public class RemoteSearch extends Thread { Network.log.info("REMOTE SEARCH - no answer from remote peer " + targetPeer.hash + ":" + targetPeer.getName()); } } + } catch (final InterruptedException e) { + Network.log.info("REMOTE SEARCH - interrupted search to remote peer " + targetPeer.hash + ":" + targetPeer.getName()); } catch (final Exception e) { ConcurrentLog.logException(e); } finally {