performance hacks

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6807 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 15 years ago
parent 2f181d0027
commit 55d8e686ea

@ -480,7 +480,7 @@ public class yacysearch {
EventTracker.update("SEARCH", new ProfilingGraph.searchEvent(theQuery.id(true), SearchEvent.INITIALIZATION, 0, 0), false, 30000, ProfilingGraph.maxTime); EventTracker.update("SEARCH", new ProfilingGraph.searchEvent(theQuery.id(true), SearchEvent.INITIALIZATION, 0, 0), false, 30000, ProfilingGraph.maxTime);
// tell all threads to do nothing for a specific time // tell all threads to do nothing for a specific time
sb.intermissionAllThreads(10000); sb.intermissionAllThreads(3000);
// filter out words that appear in bluelist // filter out words that appear in bluelist
theQuery.filterOut(Switchboard.blueList); theQuery.filterOut(Switchboard.blueList);
@ -496,7 +496,7 @@ public class yacysearch {
offset = 0; offset = 0;
} }
final SearchEvent theSearch = SearchEventCache.getEvent(theQuery, sb.peers, sb.crawlResults, (sb.isRobinsonMode()) ? sb.clusterhashes : null, false, sb.loader); final SearchEvent theSearch = SearchEventCache.getEvent(theQuery, sb.peers, sb.crawlResults, (sb.isRobinsonMode()) ? sb.clusterhashes : null, false, sb.loader);
try {Thread.sleep(100);} catch (InterruptedException e1) {} // wait a little time to get first results in the search try {Thread.sleep(global ? 100 : 10);} catch (InterruptedException e1) {} // wait a little time to get first results in the search
// generate result object // generate result object
//serverLog.logFine("LOCAL_SEARCH", "SEARCH TIME AFTER ORDERING OF SEARCH RESULTS: " + (System.currentTimeMillis() - timestamp) + " ms"); //serverLog.logFine("LOCAL_SEARCH", "SEARCH TIME AFTER ORDERING OF SEARCH RESULTS: " + (System.currentTimeMillis() - timestamp) + " ms");

@ -192,11 +192,11 @@ public class CrawlQueues {
public boolean coreCrawlJob() { public boolean coreCrawlJob() {
final boolean robinsonPrivateCase = ((sb.isRobinsonMode()) && final boolean robinsonPrivateCase = (sb.isRobinsonMode() &&
(!sb.getConfig(SwitchboardConstants.CLUSTER_MODE, "").equals(SwitchboardConstants.CLUSTER_MODE_PUBLIC_CLUSTER)) && !sb.getConfig(SwitchboardConstants.CLUSTER_MODE, "").equals(SwitchboardConstants.CLUSTER_MODE_PUBLIC_CLUSTER) &&
(!sb.getConfig(SwitchboardConstants.CLUSTER_MODE, "").equals(SwitchboardConstants.CLUSTER_MODE_PRIVATE_CLUSTER))); !sb.getConfig(SwitchboardConstants.CLUSTER_MODE, "").equals(SwitchboardConstants.CLUSTER_MODE_PRIVATE_CLUSTER));
if (((robinsonPrivateCase) || (coreCrawlJobSize() <= 20)) && (limitCrawlJobSize() > 0)) { if ((robinsonPrivateCase || coreCrawlJobSize() <= 20) && limitCrawlJobSize() > 0) {
// move some tasks to the core crawl job so we have something to do // move some tasks to the core crawl job so we have something to do
final int toshift = Math.min(10, limitCrawlJobSize()); // this cannot be a big number because the balancer makes a forced waiting if it cannot balance final int toshift = Math.min(10, limitCrawlJobSize()); // this cannot be a big number because the balancer makes a forced waiting if it cannot balance
for (int i = 0; i < toshift; i++) { for (int i = 0; i < toshift; i++) {
@ -209,12 +209,12 @@ public class CrawlQueues {
String queueCheck = crawlIsPossible(NoticedURL.STACK_TYPE_CORE); String queueCheck = crawlIsPossible(NoticedURL.STACK_TYPE_CORE);
if (queueCheck != null) { if (queueCheck != null) {
log.logInfo("omitting de-queue/local: " + queueCheck); if (log.isFine()) log.logFine("omitting de-queue/local: " + queueCheck);
return false; return false;
} }
if (isPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL)) { if (isPaused(SwitchboardConstants.CRAWLJOB_LOCAL_CRAWL)) {
log.logInfo("omitting de-queue/local: paused"); if (log.isFine()) log.logFine("omitting de-queue/local: paused");
return false; return false;
} }

@ -301,7 +301,7 @@ public class ResultFetcher {
// finally wait until enough results are there produced from the // finally wait until enough results are there produced from the
// snippet fetch process // snippet fetch process
while ((anyWorkerAlive()) && (result.size() <= item)) { while ((anyWorkerAlive()) && (result.size() <= item)) {
try {Thread.sleep((item % query.itemsPerPage) * 50L);} catch (final InterruptedException e) {} try {Thread.sleep((item % query.itemsPerPage) * 10L);} catch (final InterruptedException e) {}
} }
// finally, if there is something, return the result // finally, if there is something, return the result

@ -563,22 +563,22 @@ public final class Switchboard extends serverSwitch {
"storeDocumentIndex", "storeDocumentIndex",
"This is the sequencing step of the indexing queue. Files are written as streams, too much councurrency would destroy IO performance. In this process the words are written to the RWI cache, which flushes if it is full.", "This is the sequencing step of the indexing queue. Files are written as streams, too much councurrency would destroy IO performance. In this process the words are written to the RWI cache, which flushes if it is full.",
new String[]{"RWI/Cache/Collections"}, new String[]{"RWI/Cache/Collections"},
this, "storeDocumentIndex", WorkflowProcessor.useCPU + 40, null, indexerThreads); this, "storeDocumentIndex", 2 * WorkflowProcessor.useCPU, null, indexerThreads);
this.indexingAnalysisProcessor = new WorkflowProcessor<indexingQueueEntry>( this.indexingAnalysisProcessor = new WorkflowProcessor<indexingQueueEntry>(
"webStructureAnalysis", "webStructureAnalysis",
"This just stores the link structure of the document into a web structure database.", "This just stores the link structure of the document into a web structure database.",
new String[]{"storeDocumentIndex"}, new String[]{"storeDocumentIndex"},
this, "webStructureAnalysis", WorkflowProcessor.useCPU + 20, indexingStorageProcessor, WorkflowProcessor.useCPU + 1); this, "webStructureAnalysis", 2 * WorkflowProcessor.useCPU, indexingStorageProcessor, WorkflowProcessor.useCPU + 1);
this.indexingCondensementProcessor = new WorkflowProcessor<indexingQueueEntry>( this.indexingCondensementProcessor = new WorkflowProcessor<indexingQueueEntry>(
"condenseDocument", "condenseDocument",
"This does a structural analysis of plain texts: markup of headlines, slicing into phrases (i.e. sentences), markup with position, counting of words, calculation of term frequency.", "This does a structural analysis of plain texts: markup of headlines, slicing into phrases (i.e. sentences), markup with position, counting of words, calculation of term frequency.",
new String[]{"webStructureAnalysis"}, new String[]{"webStructureAnalysis"},
this, "condenseDocument", WorkflowProcessor.useCPU + 10, indexingAnalysisProcessor, WorkflowProcessor.useCPU + 1); this, "condenseDocument", 4 * WorkflowProcessor.useCPU, indexingAnalysisProcessor, WorkflowProcessor.useCPU + 1);
this.indexingDocumentProcessor = new WorkflowProcessor<indexingQueueEntry>( this.indexingDocumentProcessor = new WorkflowProcessor<indexingQueueEntry>(
"parseDocument", "parseDocument",
"This does the parsing of the newly loaded documents from the web. The result is not only a plain text document, but also a list of URLs that are embedded into the document. The urls are handed over to the CrawlStacker. This process has two child process queues!", "This does the parsing of the newly loaded documents from the web. The result is not only a plain text document, but also a list of URLs that are embedded into the document. The urls are handed over to the CrawlStacker. This process has two child process queues!",
new String[]{"condenseDocument", "CrawlStacker"}, new String[]{"condenseDocument", "CrawlStacker"},
this, "parseDocument", 2 * WorkflowProcessor.useCPU + 1, indexingCondensementProcessor, 2 * WorkflowProcessor.useCPU + 1); this, "parseDocument", 4 * WorkflowProcessor.useCPU, indexingCondensementProcessor, WorkflowProcessor.useCPU + 1);
// deploy busy threads // deploy busy threads
log.logConfig("Starting Threads"); log.logConfig("Starting Threads");
@ -1925,8 +1925,8 @@ public final class Switchboard extends serverSwitch {
// 10 < wantedPPM < 1000: custom performance // 10 < wantedPPM < 1000: custom performance
// 1000 <= wantedPPM : maximum performance // 1000 <= wantedPPM : maximum performance
if (wPPM <= 10) wPPM = 10; if (wPPM <= 10) wPPM = 10;
if (wPPM >= 6000) wPPM = 6000; if (wPPM >= 30000) wPPM = 30000;
final int newBusySleep = 60000 / wPPM; // for wantedPPM = 10: 6000; for wantedPPM = 1000: 60 final int newBusySleep = 30000 / wPPM; // for wantedPPM = 10: 6000; for wantedPPM = 1000: 60
BusyThread thread; BusyThread thread;

@ -445,7 +445,7 @@ public final class serverCore extends AbstractBusyThread implements BusyThread {
for (Thread t: threadList) { for (Thread t: threadList) {
if (t == null) continue; if (t == null) continue;
if (!(t instanceof Session)) { if (!(t instanceof Session)) {
log.logSevere("serverCore.getJobList - thread is not Session: " + t.getClass().getName()); //log.logSevere("serverCore.getJobList - thread is not Session: " + t.getClass().getName());
continue; continue;
} }
l.add((Session) t); l.add((Session) t);

@ -32,7 +32,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.LinkedHashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -52,7 +52,7 @@ public class Compressor implements BLOB {
static byte[] plainMagic = {(byte) 'p', (byte) '|'}; // magic for plain content (no encoding) static byte[] plainMagic = {(byte) 'p', (byte) '|'}; // magic for plain content (no encoding)
private final BLOB backend; private final BLOB backend;
private LinkedHashMap<String, byte[]> buffer; // entries which are not yet compressed, format is RAW (without magic) private HashMap<String, byte[]> buffer; // entries which are not yet compressed, format is RAW (without magic)
private BlockingQueue<Entity> writeQueue; private BlockingQueue<Entity> writeQueue;
private long bufferlength; private long bufferlength;
private final long maxbufferlength; private final long maxbufferlength;
@ -131,22 +131,7 @@ public class Compressor implements BLOB {
} }
private void initBuffer() { private void initBuffer() {
this.buffer = new LinkedHashMap<String, byte[]>(100, 0.1f, false) { this.buffer = new HashMap<String, byte[]>();
private static final long serialVersionUID = 1L;
@Override
protected boolean removeEldestEntry(final Map.Entry<String, byte[]> eldest) {
if (size() > 100) {
try {
Compressor.this.writeQueue.put(new Entity(eldest.getKey(), eldest.getValue()));
} catch (InterruptedException e) {
Log.logException(e);
}
return true;
} else {
return false;
}
}
};
this.bufferlength = 0; this.bufferlength = 0;
} }
@ -300,16 +285,17 @@ public class Compressor implements BLOB {
return 0; return 0;
} }
public synchronized void put(byte[] key, byte[] b) throws IOException { public void put(byte[] key, byte[] b) throws IOException {
// first ensure that the files do not exist anywhere // first ensure that the files do not exist anywhere
remove(key); remove(key);
// check if the buffer is full or could be full after this write // check if the buffer is full or could be full after this write
if (this.bufferlength + b.length * 2 > this.maxbufferlength) { if (this.bufferlength + b.length * 2 > this.maxbufferlength) synchronized (this) {
// in case that we compress, just compress as much as is necessary to get enough room // in case that we compress, just compress as much as is necessary to get enough room
while (this.bufferlength + b.length * 2 > this.maxbufferlength && !this.buffer.isEmpty()) { while (this.bufferlength + b.length * 2 > this.maxbufferlength) {
try { try {
if (this.buffer.isEmpty()) break;
flushOne(); flushOne();
} catch (RowSpaceExceededException e) { } catch (RowSpaceExceededException e) {
Log.logException(e); Log.logException(e);
@ -323,9 +309,11 @@ public class Compressor implements BLOB {
// files are written uncompressed to the uncompressed-queue // files are written uncompressed to the uncompressed-queue
// they are either written uncompressed to the database // they are either written uncompressed to the database
// or compressed later // or compressed later
synchronized (this) {
this.buffer.put(new String(key), b); this.buffer.put(new String(key), b);
this.bufferlength += b.length; this.bufferlength += b.length;
} }
}
public synchronized void remove(byte[] key) throws IOException { public synchronized void remove(byte[] key) throws IOException {
this.backend.remove(key); this.backend.remove(key);

@ -28,13 +28,12 @@ package net.yacy.kelondro.util;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
public class EventTracker { public class EventTracker {
private final static Map<String, Queue<Event>> historyMaps = new ConcurrentHashMap<String, Queue<Event>>(); private final static Map<String, ConcurrentLinkedQueue<Event>> historyMaps = new ConcurrentHashMap<String, ConcurrentLinkedQueue<Event>>();
private final static Map<String, Long> eventAccess = new ConcurrentHashMap<String, Long>(); // value: last time when this was accessed private final static Map<String, Long> eventAccess = new ConcurrentHashMap<String, Long>(); // value: last time when this was accessed
public final static void update( public final static void update(
@ -58,7 +57,7 @@ public class EventTracker {
} }
// get event history container // get event history container
Queue<Event> history = historyMaps.get(eventName); ConcurrentLinkedQueue<Event> history = historyMaps.get(eventName);
// create history // create history
if (history == null) { if (history == null) {
@ -73,15 +72,14 @@ public class EventTracker {
} }
// update history // update history
synchronized (history) {
// update entry
history.offer(new Event(eventPayload)); history.offer(new Event(eventPayload));
// clean up too old entries // clean up too old entries
int tp = history.size() - maxQueueSize; int tp = history.size() - maxQueueSize;
while (tp-- > 0) history.poll(); while (tp-- > 0) history.poll();
if (history.size() % 10 == 0) { // reduce number of System.currentTimeMillis() calls if (history.size() % 10 == 0) { // reduce number of System.currentTimeMillis() calls
synchronized (history) {
if (history.size() % 10 == 0) { // check again
Event e; Event e;
final long now = System.currentTimeMillis(); final long now = System.currentTimeMillis();
while (history.size() > 0) { while (history.size() > 0) {
@ -92,9 +90,10 @@ public class EventTracker {
} }
} }
} }
}
public final static Iterator<Event> getHistory(final String eventName) { public final static Iterator<Event> getHistory(final String eventName) {
Queue<Event> list = historyMaps.get(eventName); ConcurrentLinkedQueue<Event> list = historyMaps.get(eventName);
if (list == null) return null; if (list == null) return null;
return list.iterator(); return list.iterator();
} }

@ -163,7 +163,7 @@ public abstract class AbstractBusyThread extends AbstractThread implements BusyT
if (isBusy) { if (isBusy) {
memstamp1 = MemoryControl.used(); memstamp1 = MemoryControl.used();
if (memstamp1 >= memstamp0) { if (memstamp1 >= memstamp0) {
// no GC in between. this is not shure but most probable // no GC in between. this is not sure but most probable
memuse += memstamp1 - memstamp0; memuse += memstamp1 - memstamp0;
} else { } else {
// GC was obviously in between. Add an average as simple heuristic // GC was obviously in between. Add an average as simple heuristic

@ -125,6 +125,9 @@ public final class InstantBusyThread extends AbstractBusyThread implements BusyT
Log.logSevere("BUSYTHREAD", "OutOfMemory Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage()); Log.logSevere("BUSYTHREAD", "OutOfMemory Error in serverInstantThread.job, thread '" + this.getName() + "': " + e.getMessage());
Log.logException(e); Log.logException(e);
freemem(); freemem();
} catch (final Exception e) {
Log.logSevere("BUSYTHREAD", "Generic Exception, thread '" + this.getName() + "': " + e.getMessage());
Log.logException(e);
} }
instantThreadCounter--; instantThreadCounter--;
synchronized(jobs) {jobs.remove(this.handle);} synchronized(jobs) {jobs.remove(this.handle);}

Loading…
Cancel
Save