diff --git a/htroot/Status.java b/htroot/Status.java index bc5ae863c..4ff69da5c 100644 --- a/htroot/Status.java +++ b/htroot/Status.java @@ -58,6 +58,7 @@ import de.anomic.server.serverDate; import de.anomic.server.serverDomains; import de.anomic.server.serverMemory; import de.anomic.server.serverObjects; +import de.anomic.server.serverProcessor; import de.anomic.server.serverSwitch; import de.anomic.tools.yFormatter; import de.anomic.yacy.yacyCore; @@ -293,7 +294,7 @@ public class Status { prop.put("freeMemory", serverMemory.bytesToString(rt.freeMemory())); prop.put("totalMemory", serverMemory.bytesToString(rt.totalMemory())); prop.put("maxMemory", serverMemory.bytesToString(rt.maxMemory())); - prop.put("processors", rt.availableProcessors()); + prop.put("processors", serverProcessor.availableCPU); // proxy traffic //prop.put("trafficIn",bytesToString(httpdByteCountInputStream.getGlobalCount())); diff --git a/htroot/xml/status_p.java b/htroot/xml/status_p.java index 9fee2d0e5..fdeba4260 100644 --- a/htroot/xml/status_p.java +++ b/htroot/xml/status_p.java @@ -44,6 +44,7 @@ import de.anomic.http.httpdByteCountInputStream; import de.anomic.http.httpdByteCountOutputStream; import de.anomic.plasma.plasmaSwitchboard; import de.anomic.server.serverObjects; +import de.anomic.server.serverProcessor; import de.anomic.server.serverSwitch; import de.anomic.yacy.yacyCore; @@ -75,7 +76,7 @@ public class status_p { prop.putNum("freeMemory", rt.freeMemory()); prop.putNum("totalMemory", rt.totalMemory()); prop.putNum("maxMemory", rt.maxMemory()); - prop.putNum("processors", rt.availableProcessors()); + prop.putNum("processors", serverProcessor.availableCPU); // proxy traffic prop.put("trafficIn", httpdByteCountInputStream.getGlobalCount()); diff --git a/source/de/anomic/index/indexRWIEntryOrder.java b/source/de/anomic/index/indexRWIEntryOrder.java index 15b4b0679..5d3f60fd8 100644 --- a/source/de/anomic/index/indexRWIEntryOrder.java +++ b/source/de/anomic/index/indexRWIEntryOrder.java @@ -36,6 +36,7 @@ import de.anomic.kelondro.kelondroMScoreCluster; import de.anomic.plasma.plasmaCondenser; import de.anomic.plasma.plasmaSearchRankingProcess; import de.anomic.plasma.plasmaSearchRankingProfile; +import de.anomic.server.serverProcessor; import de.anomic.yacy.yacyURL; public class indexRWIEntryOrder { @@ -44,8 +45,6 @@ public class indexRWIEntryOrder { private kelondroMScoreCluster doms; // collected for "authority" heuristic private int maxdomcount; - private static final int processors = Runtime.getRuntime().availableProcessors(); // for multiprocessor support, used during normalization - public indexRWIEntryOrder(plasmaSearchRankingProfile profile) { this.min = null; this.max = null; @@ -60,7 +59,7 @@ public class indexRWIEntryOrder { ArrayList result = null; //long s0 = System.currentTimeMillis(); - if ((processors > 1) && (container.size() > 600)) { + if ((serverProcessor.useCPU > 1) && (container.size() > 600)) { // run minmax with two threads int middle = container.size() / 2; minmaxfinder mmf0 = new minmaxfinder(container, 0, middle); diff --git a/source/de/anomic/kelondro/kelondroRowCollection.java b/source/de/anomic/kelondro/kelondroRowCollection.java index 41bb5b936..8e31dc391 100644 --- a/source/de/anomic/kelondro/kelondroRowCollection.java +++ b/source/de/anomic/kelondro/kelondroRowCollection.java @@ -31,9 +31,11 @@ import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.Set; +import java.util.concurrent.SynchronousQueue; import de.anomic.server.serverFileUtils; import de.anomic.server.serverMemory; +import de.anomic.server.serverProcessor; import de.anomic.server.logging.serverLog; import de.anomic.yacy.yacySeedDB; @@ -41,7 +43,17 @@ public class kelondroRowCollection { public static final double growfactor = 1.4; private static final int isortlimit = 20; + private static final Integer dummy = new Integer(0); + public static final qsortthread sortingthread; + static { + if (serverProcessor.useCPU > 1) { + sortingthread = new qsortthread(); + sortingthread.start(); + } else { + sortingthread = null; + } + } protected byte[] chunkcache; protected int chunkcount; @@ -57,8 +69,6 @@ public class kelondroRowCollection { private static final int exp_order_bound = 5; private static final int exp_collection = 6; - private static int processors = Runtime.getRuntime().availableProcessors(); - public kelondroRowCollection(kelondroRowCollection rc) { this.rowdef = rc.rowdef; this.chunkcache = rc.chunkcache; @@ -465,12 +475,11 @@ public class kelondroRowCollection { } byte[] swapspace = new byte[this.rowdef.objectsize]; int p = partition(0, this.chunkcount, this.sortBound, swapspace); - if ((processors > 1) && (this.chunkcount >= 10000)) { - // sort this using multi-threading; use one second thread - qsortthread qs = new qsortthread(0, p, 0); - qs.start(); + if ((sortingthread != null) && (p > 50) && (sortingthread.isAlive())) { + // sort this using multi-threading + sortingthread.process(this, 0, p, 0); qsort(p, this.chunkcount, 0, swapspace); - try {qs.join();} catch (InterruptedException e) {e.printStackTrace();} + sortingthread.waitFinish(); } else { qsort(0, p, 0, swapspace); qsort(p, this.chunkcount, 0, swapspace); @@ -479,18 +488,56 @@ public class kelondroRowCollection { //assert this.isSorted(); } - private class qsortthread extends Thread { - private int sl, sr, sb; - public qsortthread(int L, int R, int S) { - this.sl = L; - this.sr = R; - this.sb = S; + public static class qsortthread extends Thread { + private boolean terminate; + private SynchronousQueue startObject; + private SynchronousQueue finishObject; + public qsortthread() { + this.terminate = false; + this.startObject = new SynchronousQueue(); + this.finishObject = new SynchronousQueue(); + this.setName("kelondroRowCollection SORT THREAD"); + } + public void process(kelondroRowCollection rc, int L, int R, int S) { + assert rc != null; + synchronized (startObject) { + try {this.startObject.put(new qsortobject(rc, L, R, S));} catch (InterruptedException e) {} + } + } + public void waitFinish() { + try {this.finishObject.take();} catch (InterruptedException e) {} + } + public void terminate() { + this.terminate = true; + this.interrupt(); } public void run() { - qsort(sl, sr, sb, new byte[rowdef.objectsize]); + qsortobject so = null; + while (!terminate) { + try {so = this.startObject.take();} catch (InterruptedException e) { + break; + } + assert so != null; + so.rc.qsort(so.sl, so.sr, so.sb, new byte[so.rc.rowdef.objectsize]); + try {this.finishObject.put(dummy);} catch (InterruptedException e1) { + break; + } + so = null; + } } } + private static class qsortobject { + protected kelondroRowCollection rc; + protected int sl, sr, sb; + public qsortobject(kelondroRowCollection rc, int L, int R, int S) { + this.rc = rc; + this.sl = L; + this.sr = R; + this.sb = S; + } + } + private final void qsort(int L, int R, int S, byte[] swapspace) { if (R - L < isortlimit) { isort(L, R, swapspace); @@ -790,11 +837,11 @@ public class kelondroRowCollection { } long t2 = System.currentTimeMillis(); System.out.println("copy c -> d: " + (t2 - t1) + " milliseconds, " + d(testsize, (t2 - t1)) + " entries/millisecond"); - processors = 1; + serverProcessor.useCPU = 1; c.sort(); long t3 = System.currentTimeMillis(); System.out.println("sort c (1) : " + (t3 - t2) + " milliseconds, " + d(testsize, (t3 - t2)) + " entries/millisecond"); - processors = 2; + serverProcessor.useCPU = 2; d.sort(); long t4 = System.currentTimeMillis(); System.out.println("sort d (2) : " + (t4 - t3) + " milliseconds, " + d(testsize, (t4 - t3)) + " entries/millisecond"); diff --git a/source/de/anomic/kelondro/kelondroSortStack.java b/source/de/anomic/kelondro/kelondroSortStack.java index a66127023..f3da6d989 100644 --- a/source/de/anomic/kelondro/kelondroSortStack.java +++ b/source/de/anomic/kelondro/kelondroSortStack.java @@ -53,7 +53,7 @@ public class kelondroSortStack { return this.onstack.size(); } - public synchronized void push(stackElement se) { + public void push(stackElement se) { push(se.element, se.weight); } diff --git a/source/de/anomic/plasma/plasmaSearchAPI.java b/source/de/anomic/plasma/plasmaSearchAPI.java index 5e39fece5..660abd1de 100644 --- a/source/de/anomic/plasma/plasmaSearchAPI.java +++ b/source/de/anomic/plasma/plasmaSearchAPI.java @@ -90,7 +90,7 @@ public class plasmaSearchAPI { public static plasmaSearchRankingProcess genSearchresult(serverObjects prop, plasmaSwitchboard sb, String keyhash, kelondroBitfield filter, int sortorder) { plasmaSearchQuery query = new plasmaSearchQuery(keyhash, -1, sb.getRanking(), filter); - plasmaSearchRankingProcess ranked = new plasmaSearchRankingProcess(sb.wordIndex, query, sortorder, Integer.MAX_VALUE); + plasmaSearchRankingProcess ranked = new plasmaSearchRankingProcess(sb.wordIndex, query, sortorder, Integer.MAX_VALUE, 1); ranked.execQuery(); if (ranked.filteredCount() == 0) { diff --git a/source/de/anomic/plasma/plasmaSearchEvent.java b/source/de/anomic/plasma/plasmaSearchEvent.java index 5a472cb56..daeca35a4 100644 --- a/source/de/anomic/plasma/plasmaSearchEvent.java +++ b/source/de/anomic/plasma/plasmaSearchEvent.java @@ -123,7 +123,7 @@ public final class plasmaSearchEvent { if ((query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT) || (query.domType == plasmaSearchQuery.SEARCHDOM_CLUSTERALL)) { // do a global search - this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation); + this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation, 16); int fetchpeers = 30; @@ -156,7 +156,7 @@ public final class plasmaSearchEvent { serverLog.logFine("SEARCH_EVENT", "SEARCH TIME AFTER GLOBAL-TRIGGER TO " + primarySearchThreads.length + " PEERS: " + ((System.currentTimeMillis() - start) / 1000) + " seconds"); } else { // do a local search - this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation); + this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation, 2); this.rankedCache.execQuery(); //plasmaWordIndex.Finding finding = wordIndex.retrieveURLs(query, false, 2, ranking, process); diff --git a/source/de/anomic/plasma/plasmaSearchRankingProcess.java b/source/de/anomic/plasma/plasmaSearchRankingProcess.java index a687d893a..a30b7ba51 100644 --- a/source/de/anomic/plasma/plasmaSearchRankingProcess.java +++ b/source/de/anomic/plasma/plasmaSearchRankingProcess.java @@ -34,6 +34,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; import de.anomic.htmlFilter.htmlFilterContentScraper; import de.anomic.index.indexContainer; @@ -62,14 +63,14 @@ public final class plasmaSearchRankingProcess { private int maxentries; private int remote_peerCount, remote_indexCount, remote_resourceSize, local_resourceSize; private indexRWIEntryOrder order; - private HashMap urlhashes; // map for double-check; String/Long relation, addresses ranking number (backreference for deletion) + private ConcurrentHashMap urlhashes; // map for double-check; String/Long relation, addresses ranking number (backreference for deletion) private kelondroMScoreCluster ref; // reference score computation for the commonSense heuristic private int[] flagcount; // flag counter private TreeSet misses; // contains url-hashes that could not been found in the LURL-DB private plasmaWordIndex wordIndex; - private Map[] localSearchContainerMaps; + private HashMap[] localSearchContainerMaps; - public plasmaSearchRankingProcess(plasmaWordIndex wordIndex, plasmaSearchQuery query, int sortorder, int maxentries) { + public plasmaSearchRankingProcess(plasmaWordIndex wordIndex, plasmaSearchQuery query, int sortorder, int maxentries, int concurrency) { // we collect the urlhashes and construct a list with urlEntry objects // attention: if minEntries is too high, this method will not terminate within the maxTime // sortorder: 0 = hash, 1 = url, 2 = ranking @@ -84,7 +85,7 @@ public final class plasmaSearchRankingProcess { this.remote_indexCount = 0; this.remote_resourceSize = 0; this.local_resourceSize = 0; - this.urlhashes = new HashMap(); + this.urlhashes = new ConcurrentHashMap(0, 0.75f, concurrency); this.ref = new kelondroMScoreCluster(); this.misses = new TreeSet(); this.wordIndex = wordIndex; @@ -262,7 +263,7 @@ public final class plasmaSearchRankingProcess { return false; } - public synchronized Map[] searchContainerMaps() { + public Map[] searchContainerMaps() { // direct access to the result maps is needed for abstract generation // this is only available if execQuery() was called before return localSearchContainerMaps; diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index e165fd57f..bbe618e1c 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -385,11 +385,11 @@ public final class plasmaWordIndex implements indexRI { return container; } - public Map getContainers(Set wordHashes, Set urlselection, boolean deleteIfEmpty, boolean interruptIfEmpty) { + public HashMap getContainers(Set wordHashes, Set urlselection, boolean deleteIfEmpty, boolean interruptIfEmpty) { // return map of wordhash:indexContainer // retrieve entities that belong to the hashes - HashMap containers = new HashMap(); + HashMap containers = new HashMap(wordHashes.size()); String singleHash; indexContainer singleContainer; Iterator i = wordHashes.iterator(); @@ -402,7 +402,7 @@ public final class plasmaWordIndex implements indexRI { singleContainer = getContainer(singleHash, urlselection); // check result - if (((singleContainer == null) || (singleContainer.size() == 0)) && (interruptIfEmpty)) return new HashMap(); + if (((singleContainer == null) || (singleContainer.size() == 0)) && (interruptIfEmpty)) return new HashMap(0); containers.put(singleHash, singleContainer); } @@ -410,22 +410,22 @@ public final class plasmaWordIndex implements indexRI { } @SuppressWarnings("unchecked") - public Map[] localSearchContainers(plasmaSearchQuery query, Set urlselection) { + public HashMap[] localSearchContainers(plasmaSearchQuery query, Set urlselection) { // search for the set of hashes and return a map of of wordhash:indexContainer containing the seach result // retrieve entities that belong to the hashes - Map inclusionContainers = (query.queryHashes.size() == 0) ? new HashMap() : getContainers( + HashMap inclusionContainers = (query.queryHashes.size() == 0) ? new HashMap(0) : getContainers( query.queryHashes, urlselection, true, true); - if ((inclusionContainers.size() != 0) && (inclusionContainers.size() < query.queryHashes.size())) inclusionContainers = new HashMap(); // prevent that only a subset is returned - Map exclusionContainers = (inclusionContainers.size() == 0) ? new HashMap() : getContainers( + if ((inclusionContainers.size() != 0) && (inclusionContainers.size() < query.queryHashes.size())) inclusionContainers = new HashMap(0); // prevent that only a subset is returned + HashMap exclusionContainers = (inclusionContainers.size() == 0) ? new HashMap(0) : getContainers( query.excludeHashes, urlselection, true, true); - return new Map[]{inclusionContainers, exclusionContainers}; + return new HashMap[]{inclusionContainers, exclusionContainers}; } public int size() { diff --git a/source/de/anomic/server/serverDomains.java b/source/de/anomic/server/serverDomains.java index 27730f750..e2a8a7409 100644 --- a/source/de/anomic/server/serverDomains.java +++ b/source/de/anomic/server/serverDomains.java @@ -28,13 +28,13 @@ package de.anomic.server; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import de.anomic.kelondro.kelondroMScoreCluster; import de.anomic.plasma.plasmaSwitchboard; @@ -42,7 +42,7 @@ import de.anomic.plasma.plasmaSwitchboard; public class serverDomains { // a dns cache - private static final Map nameCacheHit = Collections.synchronizedMap(new HashMap()); // a not-synchronized map resulted in deadlocks + private static final Map nameCacheHit = new ConcurrentHashMap(); // a not-synchronized map resulted in deadlocks private static final Set nameCacheMiss = Collections.synchronizedSet(new HashSet()); private static final kelondroMScoreCluster nameCacheHitAges = new kelondroMScoreCluster(); private static final kelondroMScoreCluster nameCacheMissAges = new kelondroMScoreCluster(); diff --git a/source/de/anomic/server/serverProcessor.java b/source/de/anomic/server/serverProcessor.java new file mode 100644 index 000000000..06992be4d --- /dev/null +++ b/source/de/anomic/server/serverProcessor.java @@ -0,0 +1,33 @@ +// serverProcessor.java +// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany +// first published 27.02.2008 on http://yacy.net +// +// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ +// $LastChangedRevision: 1986 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +package de.anomic.server; + + +public class serverProcessor { + + public static final int availableCPU = Runtime.getRuntime().availableProcessors(); + public static int useCPU = availableCPU; + +} diff --git a/source/de/anomic/server/serverProfiling.java b/source/de/anomic/server/serverProfiling.java index e14fa0ca2..04bb4948e 100644 --- a/source/de/anomic/server/serverProfiling.java +++ b/source/de/anomic/server/serverProfiling.java @@ -26,11 +26,10 @@ package de.anomic.server; -import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; public class serverProfiling extends Thread { @@ -41,8 +40,8 @@ public class serverProfiling extends Thread { static { // initialize profiling - historyMaps = Collections.synchronizedMap(new HashMap>()); - eventCounter = Collections.synchronizedMap(new HashMap()); + historyMaps = new ConcurrentHashMap>(); + eventCounter = new ConcurrentHashMap(); lastCompleteCleanup = System.currentTimeMillis(); systemProfiler = null; } diff --git a/source/yacy.java b/source/yacy.java index 83eb2f41b..0f33e7993 100644 --- a/source/yacy.java +++ b/source/yacy.java @@ -75,6 +75,7 @@ import de.anomic.kelondro.kelondroBase64Order; import de.anomic.kelondro.kelondroDyn; import de.anomic.kelondro.kelondroMScoreCluster; import de.anomic.kelondro.kelondroMapObjects; +import de.anomic.kelondro.kelondroRowCollection; import de.anomic.plasma.plasmaCondenser; import de.anomic.plasma.plasmaCrawlLURL; import de.anomic.plasma.plasmaSwitchboard; @@ -408,6 +409,7 @@ public final class yacy { serverLog.logSevere("MAIN CONTROL LOOP", "PANIC: " + e.getMessage(),e); } // shut down + if (kelondroRowCollection.sortingthread != null) kelondroRowCollection.sortingthread.terminate(); serverLog.logConfig("SHUTDOWN", "caught termination signal"); server.terminate(false); server.interrupt();