Store node/solr search threads to be able to send them an interrupt

signal in case that a cleanup process wants to remove the search
process. Added also a new cleanup process which can reduce the number of
stored searches to a specific number which can be higher or lower
according to the remaining RAM. The cleanup process is called every time
a search ist started.
pull/1/head
Michael Peter Christen 12 years ago
parent 2a8b99ea82
commit 409d6edf53

@ -139,7 +139,6 @@ public class RemoteSearch extends Thread {
final int burstMultiwordPercent) {
// check own peer status
//if (wordIndex.seedDB.mySeed() == null || wordIndex.seedDB.mySeed().getPublicAddress() == null) { return null; }
// prepare seed targets and threads
final List<Seed> dhtPeers =
(clusterselection == null) ?
@ -175,7 +174,8 @@ public class RemoteSearch extends Thread {
if (!Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.DEBUG_SEARCH_REMOTE_SOLR_OFF, false)) {
final SolrQuery solrQuery = event.query.solrQuery();
for (Seed s: nodePeers) {
solrRemoteSearch(event, solrQuery, start, count, s, blacklist);
Thread t = solrRemoteSearch(event, solrQuery, start, count, s, blacklist);
event.nodeSearchThreads.add(t);
}
}

@ -117,6 +117,7 @@ public final class SearchEvent {
final WorkTables workTables;
public final SecondarySearchSuperviser secondarySearchSuperviser;
public final List<RemoteSearch> primarySearchThreadsL;
public final List<Thread> nodeSearchThreads;
public Thread[] secondarySearchThreads;
public final SortedMap<byte[], String> preselectedPeerHashes;
private final Thread localSearchThread;
@ -187,8 +188,24 @@ public final class SearchEvent {
final int burstRobinsonPercent,
final int burstMultiwordPercent,
final boolean deleteIfSnippetFail) {
long ab = MemoryControl.available();
if (ab < 1024 * 1024 * 200) {
int eb = SearchEventCache.size();
SearchEventCache.cleanupEvents(false);
int en = SearchEventCache.size();
if (en < eb) {
log.logInfo("Cleaned up search event cache (1) " + eb + "->" + en + ", " + (ab - MemoryControl.available()) / 1024 / 1024 + " MB freed");
}
}
ab = MemoryControl.available();
int eb = SearchEventCache.size();
SearchEventCache.cleanupEvents(Math.max(1, (int) (MemoryControl.available() / (1024 * 1024 * 120))));
int en = SearchEventCache.size();
if (en < eb) {
log.logInfo("Cleaned up search event cache (2) " + eb + "->" + en + ", " + (ab - MemoryControl.available()) / 1024 / 1024 + " MB freed");
}
if (MemoryControl.available() < 1024 * 1024 * 100) SearchEventCache.cleanupEvents(false);
this.eventTime = System.currentTimeMillis(); // for lifetime check
this.peers = peers;
this.workTables = workTables;
@ -277,8 +294,10 @@ public final class SearchEvent {
final long timer = System.currentTimeMillis();
if (this.query.getQueryGoal().getIncludeHashes().isEmpty()) {
this.primarySearchThreadsL = null;
this.nodeSearchThreads = null;
} else {
this.primarySearchThreadsL = new ArrayList<RemoteSearch>();
this.nodeSearchThreads = new ArrayList<Thread>();
// start this concurrently because the remote search needs an enumeration
// of the remote peers which may block in some cases when i.e. DHT is active
// at the same time.
@ -312,6 +331,7 @@ public final class SearchEvent {
}
} else {
this.primarySearchThreadsL = null;
this.nodeSearchThreads = null;
this.pollImmediately = !query.getSegment().connectedRWI() || !Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.INDEX_RECEIVE_ALLOW, false);
if ( generateAbstracts ) {
// we need the results now
@ -584,25 +604,27 @@ public final class SearchEvent {
protected void cleanup() {
// stop all threads
if ( this.primarySearchThreadsL != null ) {
for ( final RemoteSearch search : this.primarySearchThreadsL ) {
if ( search != null ) {
synchronized ( search ) {
if ( search.isAlive() ) {
search.interrupt();
}
}
if (this.localsolrsearch != null) {
if (localsolrsearch.isAlive()) synchronized (this.localsolrsearch) {this.localsolrsearch.interrupt();}
}
if (this.nodeSearchThreads != null) {
for (final Thread search : this.nodeSearchThreads) {
if (search != null) {
synchronized (search) {if (search.isAlive()) {search.interrupt();}}
}
}
}
if ( this.secondarySearchThreads != null ) {
for ( final Thread search : this.secondarySearchThreads ) {
if ( search != null ) {
synchronized ( search ) {
if ( search.isAlive() ) {
search.interrupt();
}
}
if (this.primarySearchThreadsL != null) {
for (final RemoteSearch search : this.primarySearchThreadsL) {
if (search != null) {
synchronized (search) {if (search.isAlive()) {search.interrupt();}}
}
}
}
if (this.secondarySearchThreads != null) {
for (final Thread search : this.secondarySearchThreads ) {
if (search != null) {
synchronized (search) {if (search.isAlive()) {search.interrupt();}}
}
}
}

@ -27,9 +27,9 @@
package net.yacy.search.query;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import net.yacy.data.WorkTables;
import net.yacy.kelondro.logging.Log;
@ -41,7 +41,7 @@ import net.yacy.search.SwitchboardConstants;
public class SearchEventCache {
private volatile static Map<String, SearchEvent> lastEvents = new ConcurrentHashMap<String, SearchEvent>(); // a cache for objects from this class: re-use old search requests
private volatile static LinkedHashMap<String, SearchEvent> lastEvents = new LinkedHashMap<String, SearchEvent>(); // a cache for objects from this class: re-use old search requests
private static final long eventLifetimeBigMem = 600000; // the time an event will stay in the cache when available memory is high, 10 Minutes
private static final long eventLifetimeMediumMem = 60000; // the time an event will stay in the cache when available memory is medium, 1 Minute
private static final long eventLifetimeShortMem = 10000; // the time an event will stay in the cache when memory is low, 10 seconds
@ -57,13 +57,17 @@ public class SearchEventCache {
protected static void put(final String eventID, final SearchEvent event) {
if (MemoryControl.shortStatus()) cleanupEvents(false);
lastEventID = eventID;
final SearchEvent oldEvent = lastEvents.put(eventID, event);
if (oldEvent == null) cacheInsert++;
synchronized (lastEvents) {
final SearchEvent oldEvent = lastEvents.put(eventID, event);
if (oldEvent == null) cacheInsert++;
}
}
public static boolean delete(final String urlhash) {
for (final SearchEvent event: lastEvents.values()) {
if (event.delete(urlhash)) return true;
synchronized (lastEvents) {
for (final SearchEvent event: lastEvents.values()) {
if (event.delete(urlhash)) return true;
}
}
return false;
}
@ -75,16 +79,40 @@ public class SearchEventCache {
final long memx = MemoryControl.available();
final long acceptTime = memx > memlimitHigh ? eventLifetimeBigMem : memx > memlimitMedium ? eventLifetimeMediumMem : eventLifetimeShortMem;
Map.Entry<String, SearchEvent> eventEntry;
final Iterator<Map.Entry<String, SearchEvent>> i = lastEvents.entrySet().iterator();
SearchEvent event;
while (i.hasNext()) {
eventEntry = i.next();
event = eventEntry.getValue();
if (event == null) continue;
if (all || event.getEventTime() + acceptTime < System.currentTimeMillis()) {
synchronized (lastEvents) {
final Iterator<Map.Entry<String, SearchEvent>> i = lastEvents.entrySet().iterator();
SearchEvent event;
while (i.hasNext()) {
eventEntry = i.next();
event = eventEntry.getValue();
if (event == null) continue;
if (all || event.getEventTime() + acceptTime < System.currentTimeMillis()) {
event.cleanup();
i.remove();
cacheDelete++;
}
}
}
}
public static void cleanupEvents(int maxsize) {
// remove old events in the event cache
if (MemoryControl.shortStatus()) {cleanupEvents(true); return;}
Map.Entry<String, SearchEvent> eventEntry;
synchronized (lastEvents) {
final Iterator<Map.Entry<String, SearchEvent>> i = lastEvents.entrySet().iterator(); // iterates in order of entry
int dc = lastEvents.size() - maxsize;
if (dc <= 0) return;
SearchEvent event;
while (i.hasNext()) {
eventEntry = i.next();
event = eventEntry.getValue();
if (event == null) continue;
event.cleanup();
i.remove();
cacheDelete++;
dc--;
if (dc <= 0) break;
}
}
}
@ -122,7 +150,9 @@ public class SearchEventCache {
// if a local crawl is ongoing, don't use the result from the cache to use possibly more results that come from the current crawl
// to prevent that this happens during a person switches between the different result pages, a re-search happens no more than
// once a minute
lastEvents.remove(id);
synchronized (lastEvents) {
lastEvents.remove(id);
}
cacheDelete++;
event = null;
} else {
@ -136,8 +166,10 @@ public class SearchEventCache {
}
if (event == null) {
// check if there are too many other searches alive now
Log.logInfo("SearchEventCache", "getEvent: " + lastEvents.size() + " in cache");
synchronized (lastEvents) {
Log.logInfo("SearchEventCache", "getEvent: " + lastEvents.size() + " in cache");
}
// start a new event
Switchboard sb = Switchboard.getSwitchboard();
final boolean delete = sb == null || Switchboard.getSwitchboard().getConfigBool(SwitchboardConstants.SEARCH_VERIFY_DELETE, true);

Loading…
Cancel
Save