more multithreading support:

- replaced some synchronized classes by classes from util.concurrent
- used a util.concurrent.SynchronousQueue to implement a persistent sorting thread in
  the very basic kelondroRowCollection which supports sorting with a second thread
  in case that a double-core processing CPU is used

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4517 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 17 years ago
parent 6779b455d7
commit 1dce2f1079

@ -58,6 +58,7 @@ import de.anomic.server.serverDate;
import de.anomic.server.serverDomains;
import de.anomic.server.serverMemory;
import de.anomic.server.serverObjects;
import de.anomic.server.serverProcessor;
import de.anomic.server.serverSwitch;
import de.anomic.tools.yFormatter;
import de.anomic.yacy.yacyCore;
@ -293,7 +294,7 @@ public class Status {
prop.put("freeMemory", serverMemory.bytesToString(rt.freeMemory()));
prop.put("totalMemory", serverMemory.bytesToString(rt.totalMemory()));
prop.put("maxMemory", serverMemory.bytesToString(rt.maxMemory()));
prop.put("processors", rt.availableProcessors());
prop.put("processors", serverProcessor.availableCPU);
// proxy traffic
//prop.put("trafficIn",bytesToString(httpdByteCountInputStream.getGlobalCount()));

@ -44,6 +44,7 @@ import de.anomic.http.httpdByteCountInputStream;
import de.anomic.http.httpdByteCountOutputStream;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.serverObjects;
import de.anomic.server.serverProcessor;
import de.anomic.server.serverSwitch;
import de.anomic.yacy.yacyCore;
@ -75,7 +76,7 @@ public class status_p {
prop.putNum("freeMemory", rt.freeMemory());
prop.putNum("totalMemory", rt.totalMemory());
prop.putNum("maxMemory", rt.maxMemory());
prop.putNum("processors", rt.availableProcessors());
prop.putNum("processors", serverProcessor.availableCPU);
// proxy traffic
prop.put("trafficIn", httpdByteCountInputStream.getGlobalCount());

@ -36,6 +36,7 @@ import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.plasma.plasmaCondenser;
import de.anomic.plasma.plasmaSearchRankingProcess;
import de.anomic.plasma.plasmaSearchRankingProfile;
import de.anomic.server.serverProcessor;
import de.anomic.yacy.yacyURL;
public class indexRWIEntryOrder {
@ -44,8 +45,6 @@ public class indexRWIEntryOrder {
private kelondroMScoreCluster<String> doms; // collected for "authority" heuristic
private int maxdomcount;
private static final int processors = Runtime.getRuntime().availableProcessors(); // for multiprocessor support, used during normalization
public indexRWIEntryOrder(plasmaSearchRankingProfile profile) {
this.min = null;
this.max = null;
@ -60,7 +59,7 @@ public class indexRWIEntryOrder {
ArrayList<indexRWIVarEntry> result = null;
//long s0 = System.currentTimeMillis();
if ((processors > 1) && (container.size() > 600)) {
if ((serverProcessor.useCPU > 1) && (container.size() > 600)) {
// run minmax with two threads
int middle = container.size() / 2;
minmaxfinder mmf0 = new minmaxfinder(container, 0, middle);

@ -31,9 +31,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.SynchronousQueue;
import de.anomic.server.serverFileUtils;
import de.anomic.server.serverMemory;
import de.anomic.server.serverProcessor;
import de.anomic.server.logging.serverLog;
import de.anomic.yacy.yacySeedDB;
@ -41,7 +43,17 @@ public class kelondroRowCollection {
public static final double growfactor = 1.4;
private static final int isortlimit = 20;
private static final Integer dummy = new Integer(0);
public static final qsortthread sortingthread;
static {
if (serverProcessor.useCPU > 1) {
sortingthread = new qsortthread();
sortingthread.start();
} else {
sortingthread = null;
}
}
protected byte[] chunkcache;
protected int chunkcount;
@ -57,8 +69,6 @@ public class kelondroRowCollection {
private static final int exp_order_bound = 5;
private static final int exp_collection = 6;
private static int processors = Runtime.getRuntime().availableProcessors();
public kelondroRowCollection(kelondroRowCollection rc) {
this.rowdef = rc.rowdef;
this.chunkcache = rc.chunkcache;
@ -465,12 +475,11 @@ public class kelondroRowCollection {
}
byte[] swapspace = new byte[this.rowdef.objectsize];
int p = partition(0, this.chunkcount, this.sortBound, swapspace);
if ((processors > 1) && (this.chunkcount >= 10000)) {
// sort this using multi-threading; use one second thread
qsortthread qs = new qsortthread(0, p, 0);
qs.start();
if ((sortingthread != null) && (p > 50) && (sortingthread.isAlive())) {
// sort this using multi-threading
sortingthread.process(this, 0, p, 0);
qsort(p, this.chunkcount, 0, swapspace);
try {qs.join();} catch (InterruptedException e) {e.printStackTrace();}
sortingthread.waitFinish();
} else {
qsort(0, p, 0, swapspace);
qsort(p, this.chunkcount, 0, swapspace);
@ -479,18 +488,56 @@ public class kelondroRowCollection {
//assert this.isSorted();
}
private class qsortthread extends Thread {
private int sl, sr, sb;
public qsortthread(int L, int R, int S) {
this.sl = L;
this.sr = R;
this.sb = S;
public static class qsortthread extends Thread {
private boolean terminate;
private SynchronousQueue<qsortobject> startObject;
private SynchronousQueue<Integer> finishObject;
public qsortthread() {
this.terminate = false;
this.startObject = new SynchronousQueue<qsortobject>();
this.finishObject = new SynchronousQueue<Integer>();
this.setName("kelondroRowCollection SORT THREAD");
}
public void process(kelondroRowCollection rc, int L, int R, int S) {
assert rc != null;
synchronized (startObject) {
try {this.startObject.put(new qsortobject(rc, L, R, S));} catch (InterruptedException e) {}
}
}
public void waitFinish() {
try {this.finishObject.take();} catch (InterruptedException e) {}
}
public void terminate() {
this.terminate = true;
this.interrupt();
}
public void run() {
qsort(sl, sr, sb, new byte[rowdef.objectsize]);
qsortobject so = null;
while (!terminate) {
try {so = this.startObject.take();} catch (InterruptedException e) {
break;
}
assert so != null;
so.rc.qsort(so.sl, so.sr, so.sb, new byte[so.rc.rowdef.objectsize]);
try {this.finishObject.put(dummy);} catch (InterruptedException e1) {
break;
}
so = null;
}
}
}
private static class qsortobject {
protected kelondroRowCollection rc;
protected int sl, sr, sb;
public qsortobject(kelondroRowCollection rc, int L, int R, int S) {
this.rc = rc;
this.sl = L;
this.sr = R;
this.sb = S;
}
}
private final void qsort(int L, int R, int S, byte[] swapspace) {
if (R - L < isortlimit) {
isort(L, R, swapspace);
@ -790,11 +837,11 @@ public class kelondroRowCollection {
}
long t2 = System.currentTimeMillis();
System.out.println("copy c -> d: " + (t2 - t1) + " milliseconds, " + d(testsize, (t2 - t1)) + " entries/millisecond");
processors = 1;
serverProcessor.useCPU = 1;
c.sort();
long t3 = System.currentTimeMillis();
System.out.println("sort c (1) : " + (t3 - t2) + " milliseconds, " + d(testsize, (t3 - t2)) + " entries/millisecond");
processors = 2;
serverProcessor.useCPU = 2;
d.sort();
long t4 = System.currentTimeMillis();
System.out.println("sort d (2) : " + (t4 - t3) + " milliseconds, " + d(testsize, (t4 - t3)) + " entries/millisecond");

@ -53,7 +53,7 @@ public class kelondroSortStack<E> {
return this.onstack.size();
}
public synchronized void push(stackElement se) {
public void push(stackElement se) {
push(se.element, se.weight);
}

@ -90,7 +90,7 @@ public class plasmaSearchAPI {
public static plasmaSearchRankingProcess genSearchresult(serverObjects prop, plasmaSwitchboard sb, String keyhash, kelondroBitfield filter, int sortorder) {
plasmaSearchQuery query = new plasmaSearchQuery(keyhash, -1, sb.getRanking(), filter);
plasmaSearchRankingProcess ranked = new plasmaSearchRankingProcess(sb.wordIndex, query, sortorder, Integer.MAX_VALUE);
plasmaSearchRankingProcess ranked = new plasmaSearchRankingProcess(sb.wordIndex, query, sortorder, Integer.MAX_VALUE, 1);
ranked.execQuery();
if (ranked.filteredCount() == 0) {

@ -123,7 +123,7 @@ public final class plasmaSearchEvent {
if ((query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT) ||
(query.domType == plasmaSearchQuery.SEARCHDOM_CLUSTERALL)) {
// do a global search
this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation);
this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation, 16);
int fetchpeers = 30;
@ -156,7 +156,7 @@ public final class plasmaSearchEvent {
serverLog.logFine("SEARCH_EVENT", "SEARCH TIME AFTER GLOBAL-TRIGGER TO " + primarySearchThreads.length + " PEERS: " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
} else {
// do a local search
this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation);
this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation, 2);
this.rankedCache.execQuery();
//plasmaWordIndex.Finding finding = wordIndex.retrieveURLs(query, false, 2, ranking, process);

@ -34,6 +34,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import de.anomic.htmlFilter.htmlFilterContentScraper;
import de.anomic.index.indexContainer;
@ -62,14 +63,14 @@ public final class plasmaSearchRankingProcess {
private int maxentries;
private int remote_peerCount, remote_indexCount, remote_resourceSize, local_resourceSize;
private indexRWIEntryOrder order;
private HashMap<String, Integer> urlhashes; // map for double-check; String/Long relation, addresses ranking number (backreference for deletion)
private ConcurrentHashMap<String, Integer> urlhashes; // map for double-check; String/Long relation, addresses ranking number (backreference for deletion)
private kelondroMScoreCluster<String> ref; // reference score computation for the commonSense heuristic
private int[] flagcount; // flag counter
private TreeSet<String> misses; // contains url-hashes that could not been found in the LURL-DB
private plasmaWordIndex wordIndex;
private Map<String, indexContainer>[] localSearchContainerMaps;
private HashMap<String, indexContainer>[] localSearchContainerMaps;
public plasmaSearchRankingProcess(plasmaWordIndex wordIndex, plasmaSearchQuery query, int sortorder, int maxentries) {
public plasmaSearchRankingProcess(plasmaWordIndex wordIndex, plasmaSearchQuery query, int sortorder, int maxentries, int concurrency) {
// we collect the urlhashes and construct a list with urlEntry objects
// attention: if minEntries is too high, this method will not terminate within the maxTime
// sortorder: 0 = hash, 1 = url, 2 = ranking
@ -84,7 +85,7 @@ public final class plasmaSearchRankingProcess {
this.remote_indexCount = 0;
this.remote_resourceSize = 0;
this.local_resourceSize = 0;
this.urlhashes = new HashMap<String, Integer>();
this.urlhashes = new ConcurrentHashMap<String, Integer>(0, 0.75f, concurrency);
this.ref = new kelondroMScoreCluster<String>();
this.misses = new TreeSet<String>();
this.wordIndex = wordIndex;
@ -262,7 +263,7 @@ public final class plasmaSearchRankingProcess {
return false;
}
public synchronized Map<String, indexContainer>[] searchContainerMaps() {
public Map<String, indexContainer>[] searchContainerMaps() {
// direct access to the result maps is needed for abstract generation
// this is only available if execQuery() was called before
return localSearchContainerMaps;

@ -385,11 +385,11 @@ public final class plasmaWordIndex implements indexRI {
return container;
}
public Map<String, indexContainer> getContainers(Set<String> wordHashes, Set<String> urlselection, boolean deleteIfEmpty, boolean interruptIfEmpty) {
public HashMap<String, indexContainer> getContainers(Set<String> wordHashes, Set<String> urlselection, boolean deleteIfEmpty, boolean interruptIfEmpty) {
// return map of wordhash:indexContainer
// retrieve entities that belong to the hashes
HashMap<String, indexContainer> containers = new HashMap<String, indexContainer>();
HashMap<String, indexContainer> containers = new HashMap<String, indexContainer>(wordHashes.size());
String singleHash;
indexContainer singleContainer;
Iterator<String> i = wordHashes.iterator();
@ -402,7 +402,7 @@ public final class plasmaWordIndex implements indexRI {
singleContainer = getContainer(singleHash, urlselection);
// check result
if (((singleContainer == null) || (singleContainer.size() == 0)) && (interruptIfEmpty)) return new HashMap<String, indexContainer>();
if (((singleContainer == null) || (singleContainer.size() == 0)) && (interruptIfEmpty)) return new HashMap<String, indexContainer>(0);
containers.put(singleHash, singleContainer);
}
@ -410,22 +410,22 @@ public final class plasmaWordIndex implements indexRI {
}
@SuppressWarnings("unchecked")
public Map<String, indexContainer>[] localSearchContainers(plasmaSearchQuery query, Set<String> urlselection) {
public HashMap<String, indexContainer>[] localSearchContainers(plasmaSearchQuery query, Set<String> urlselection) {
// search for the set of hashes and return a map of of wordhash:indexContainer containing the seach result
// retrieve entities that belong to the hashes
Map<String, indexContainer> inclusionContainers = (query.queryHashes.size() == 0) ? new HashMap<String, indexContainer>() : getContainers(
HashMap<String, indexContainer> inclusionContainers = (query.queryHashes.size() == 0) ? new HashMap<String, indexContainer>(0) : getContainers(
query.queryHashes,
urlselection,
true,
true);
if ((inclusionContainers.size() != 0) && (inclusionContainers.size() < query.queryHashes.size())) inclusionContainers = new HashMap<String, indexContainer>(); // prevent that only a subset is returned
Map<String, indexContainer> exclusionContainers = (inclusionContainers.size() == 0) ? new HashMap<String, indexContainer>() : getContainers(
if ((inclusionContainers.size() != 0) && (inclusionContainers.size() < query.queryHashes.size())) inclusionContainers = new HashMap<String, indexContainer>(0); // prevent that only a subset is returned
HashMap<String, indexContainer> exclusionContainers = (inclusionContainers.size() == 0) ? new HashMap<String, indexContainer>(0) : getContainers(
query.excludeHashes,
urlselection,
true,
true);
return new Map[]{inclusionContainers, exclusionContainers};
return new HashMap[]{inclusionContainers, exclusionContainers};
}
public int size() {

@ -28,13 +28,13 @@ package de.anomic.server;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.plasma.plasmaSwitchboard;
@ -42,7 +42,7 @@ import de.anomic.plasma.plasmaSwitchboard;
public class serverDomains {
// a dns cache
private static final Map<String, InetAddress> nameCacheHit = Collections.synchronizedMap(new HashMap<String, InetAddress>()); // a not-synchronized map resulted in deadlocks
private static final Map<String, InetAddress> nameCacheHit = new ConcurrentHashMap<String, InetAddress>(); // a not-synchronized map resulted in deadlocks
private static final Set<String> nameCacheMiss = Collections.synchronizedSet(new HashSet<String>());
private static final kelondroMScoreCluster<String> nameCacheHitAges = new kelondroMScoreCluster<String>();
private static final kelondroMScoreCluster<String> nameCacheMissAges = new kelondroMScoreCluster<String>();

@ -0,0 +1,33 @@
// serverProcessor.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 27.02.2008 on http://yacy.net
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.server;
public class serverProcessor {
public static final int availableCPU = Runtime.getRuntime().availableProcessors();
public static int useCPU = availableCPU;
}

@ -26,11 +26,10 @@
package de.anomic.server;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
public class serverProfiling extends Thread {
@ -41,8 +40,8 @@ public class serverProfiling extends Thread {
static {
// initialize profiling
historyMaps = Collections.synchronizedMap(new HashMap<String, TreeMap<Long, Event>>());
eventCounter = Collections.synchronizedMap(new HashMap<String, Integer>());
historyMaps = new ConcurrentHashMap<String, TreeMap<Long, Event>>();
eventCounter = new ConcurrentHashMap<String, Integer>();
lastCompleteCleanup = System.currentTimeMillis();
systemProfiler = null;
}

@ -75,6 +75,7 @@ import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.kelondro.kelondroDyn;
import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.kelondro.kelondroMapObjects;
import de.anomic.kelondro.kelondroRowCollection;
import de.anomic.plasma.plasmaCondenser;
import de.anomic.plasma.plasmaCrawlLURL;
import de.anomic.plasma.plasmaSwitchboard;
@ -408,6 +409,7 @@ public final class yacy {
serverLog.logSevere("MAIN CONTROL LOOP", "PANIC: " + e.getMessage(),e);
}
// shut down
if (kelondroRowCollection.sortingthread != null) kelondroRowCollection.sortingthread.terminate();
serverLog.logConfig("SHUTDOWN", "caught termination signal");
server.terminate(false);
server.interrupt();

Loading…
Cancel
Save