- commented out experimental distributed ranking loading

- less threads for blocking threads
- disable all threads for DHT transmission for networks with zero peers

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@7737 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 14 years ago
parent 98c4d25185
commit bd55dcee50

@ -240,7 +240,7 @@ public final class CrawlStacker {
}.start(); }.start();
} }
public void enqueueEntries(byte[] initiator, String profileHandle, Map<MultiProtocolURI, Properties> hyperlinks, boolean replace) { private void enqueueEntries(byte[] initiator, String profileHandle, Map<MultiProtocolURI, Properties> hyperlinks, boolean replace) {
for (Map.Entry<MultiProtocolURI, Properties> e: hyperlinks.entrySet()) { for (Map.Entry<MultiProtocolURI, Properties> e: hyperlinks.entrySet()) {
if (e.getKey() == null) continue; if (e.getKey() == null) continue;

@ -110,7 +110,6 @@ import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.order.Base64Order; import net.yacy.kelondro.order.Base64Order;
import net.yacy.kelondro.order.Digest; import net.yacy.kelondro.order.Digest;
import net.yacy.kelondro.order.NaturalOrder; import net.yacy.kelondro.order.NaturalOrder;
import net.yacy.kelondro.rwi.ReferenceContainerCache;
import net.yacy.kelondro.util.EventTracker; import net.yacy.kelondro.util.EventTracker;
import net.yacy.kelondro.util.FileUtils; import net.yacy.kelondro.util.FileUtils;
import net.yacy.kelondro.util.MemoryControl; import net.yacy.kelondro.util.MemoryControl;
@ -170,7 +169,6 @@ import de.anomic.yacy.yacyRelease;
import de.anomic.yacy.dht.Dispatcher; import de.anomic.yacy.dht.Dispatcher;
import de.anomic.yacy.dht.PeerSelection; import de.anomic.yacy.dht.PeerSelection;
import de.anomic.yacy.graphics.WebStructureGraph; import de.anomic.yacy.graphics.WebStructureGraph;
import de.anomic.yacy.graphics.WebStructureGraph.HostReference;
public final class Switchboard extends serverSwitch { public final class Switchboard extends serverSwitch {
@ -228,7 +226,7 @@ public final class Switchboard extends serverSwitch {
public URLLicense licensedURLs; public URLLicense licensedURLs;
public List<Pattern> networkWhitelist, networkBlacklist; public List<Pattern> networkWhitelist, networkBlacklist;
public FilterEngine domainList; public FilterEngine domainList;
public Dispatcher dhtDispatcher; private Dispatcher dhtDispatcher;
public LinkedBlockingQueue<String> trail; public LinkedBlockingQueue<String> trail;
public yacySeedDB peers; public yacySeedDB peers;
public WorkTables tables; public WorkTables tables;
@ -386,7 +384,7 @@ public final class Switchboard extends serverSwitch {
//final long startedSeedListAquisition = System.currentTimeMillis(); //final long startedSeedListAquisition = System.currentTimeMillis();
// init a DHT transmission dispatcher // init a DHT transmission dispatcher
this.dhtDispatcher = new Dispatcher( this.dhtDispatcher = (peers.sizeConnected() == 0) ? null : new Dispatcher(
indexSegments.segment(Segments.Process.LOCALCRAWLING), indexSegments.segment(Segments.Process.LOCALCRAWLING),
peers, peers,
true, true,
@ -451,8 +449,9 @@ public final class Switchboard extends serverSwitch {
BlockRank.loadBlockRankTable(rankingPath, 8); BlockRank.loadBlockRankTable(rankingPath, 8);
// load distributed ranking // load distributed ranking
final File hostIndexFile = new File(queuesRoot, "hostIndex.blob");
// very large memory configurations allow to re-compute a ranking table // very large memory configurations allow to re-compute a ranking table
/*
final File hostIndexFile = new File(queuesRoot, "hostIndex.blob");
if (MemoryControl.available() > 1024 * 1024 * 1024) new Thread() { if (MemoryControl.available() > 1024 * 1024 * 1024) new Thread() {
public void run() { public void run() {
ReferenceContainerCache<HostReference> hostIndex; // this will get large, more than 0.5 million entries by now ReferenceContainerCache<HostReference> hostIndex; // this will get large, more than 0.5 million entries by now
@ -470,6 +469,7 @@ public final class Switchboard extends serverSwitch {
//BlockRank.storeBlockRankTable(rankingPath); //BlockRank.storeBlockRankTable(rankingPath);
} }
}.start(); }.start();
*/
// load the robots.txt db // load the robots.txt db
this.log.logConfig("Initializing robots.txt DB"); this.log.logConfig("Initializing robots.txt DB");
@ -639,27 +639,26 @@ public final class Switchboard extends serverSwitch {
this.clusterhashes = this.peers.clusterHashes(getConfig("cluster.peers.yacydomain", "")); this.clusterhashes = this.peers.clusterHashes(getConfig("cluster.peers.yacydomain", ""));
// deploy blocking threads // deploy blocking threads
int indexerThreads = Math.max(1, WorkflowProcessor.useCPU / 2);
this.indexingStorageProcessor = new WorkflowProcessor<indexingQueueEntry>( this.indexingStorageProcessor = new WorkflowProcessor<indexingQueueEntry>(
"storeDocumentIndex", "storeDocumentIndex",
"This is the sequencing step of the indexing queue. Files are written as streams, too much councurrency would destroy IO performance. In this process the words are written to the RWI cache, which flushes if it is full.", "This is the sequencing step of the indexing queue. Files are written as streams, too much councurrency would destroy IO performance. In this process the words are written to the RWI cache, which flushes if it is full.",
new String[]{"RWI/Cache/Collections"}, new String[]{"RWI/Cache/Collections"},
this, "storeDocumentIndex", 2 * WorkflowProcessor.useCPU, null, indexerThreads); this, "storeDocumentIndex", 2 * WorkflowProcessor.availableCPU, null, 1 /*Math.max(1, WorkflowProcessor.availableCPU / 2)*/);
this.indexingAnalysisProcessor = new WorkflowProcessor<indexingQueueEntry>( this.indexingAnalysisProcessor = new WorkflowProcessor<indexingQueueEntry>(
"webStructureAnalysis", "webStructureAnalysis",
"This just stores the link structure of the document into a web structure database.", "This just stores the link structure of the document into a web structure database.",
new String[]{"storeDocumentIndex"}, new String[]{"storeDocumentIndex"},
this, "webStructureAnalysis", 2 * WorkflowProcessor.useCPU, indexingStorageProcessor, WorkflowProcessor.useCPU + 1); this, "webStructureAnalysis", 2 * WorkflowProcessor.availableCPU, indexingStorageProcessor, WorkflowProcessor.availableCPU);
this.indexingCondensementProcessor = new WorkflowProcessor<indexingQueueEntry>( this.indexingCondensementProcessor = new WorkflowProcessor<indexingQueueEntry>(
"condenseDocument", "condenseDocument",
"This does a structural analysis of plain texts: markup of headlines, slicing into phrases (i.e. sentences), markup with position, counting of words, calculation of term frequency.", "This does a structural analysis of plain texts: markup of headlines, slicing into phrases (i.e. sentences), markup with position, counting of words, calculation of term frequency.",
new String[]{"webStructureAnalysis"}, new String[]{"webStructureAnalysis"},
this, "condenseDocument", 4 * WorkflowProcessor.useCPU, indexingAnalysisProcessor, WorkflowProcessor.useCPU + 1); this, "condenseDocument", 4 * WorkflowProcessor.availableCPU, indexingAnalysisProcessor, WorkflowProcessor.availableCPU);
this.indexingDocumentProcessor = new WorkflowProcessor<indexingQueueEntry>( this.indexingDocumentProcessor = new WorkflowProcessor<indexingQueueEntry>(
"parseDocument", "parseDocument",
"This does the parsing of the newly loaded documents from the web. The result is not only a plain text document, but also a list of URLs that are embedded into the document. The urls are handed over to the CrawlStacker. This process has two child process queues!", "This does the parsing of the newly loaded documents from the web. The result is not only a plain text document, but also a list of URLs that are embedded into the document. The urls are handed over to the CrawlStacker. This process has two child process queues!",
new String[]{"condenseDocument", "CrawlStacker"}, new String[]{"condenseDocument", "CrawlStacker"},
this, "parseDocument", 4 * WorkflowProcessor.useCPU, indexingCondensementProcessor, WorkflowProcessor.useCPU + 1); this, "parseDocument", 4 * WorkflowProcessor.availableCPU, indexingCondensementProcessor, WorkflowProcessor.availableCPU);
// deploy busy threads // deploy busy threads
log.logConfig("Starting Threads"); log.logConfig("Starting Threads");
@ -894,7 +893,7 @@ public final class Switchboard extends serverSwitch {
// shut down // shut down
this.crawler.close(); this.crawler.close();
this.dhtDispatcher.close(); if (this.dhtDispatcher != null) this.dhtDispatcher.close();
synchronized (this.indexSegments) { synchronized (this.indexSegments) {
this.indexSegments.close(); this.indexSegments.close();
} }
@ -952,7 +951,7 @@ public final class Switchboard extends serverSwitch {
this.queuesRoot); this.queuesRoot);
// init a DHT transmission dispatcher // init a DHT transmission dispatcher
dhtDispatcher = new Dispatcher( dhtDispatcher = (peers.sizeConnected() == 0) ? null : new Dispatcher(
indexSegments.segment(Segments.Process.LOCALCRAWLING), indexSegments.segment(Segments.Process.LOCALCRAWLING),
peers, peers,
true, true,
@ -1245,7 +1244,7 @@ public final class Switchboard extends serverSwitch {
indexingCondensementProcessor.announceShutdown(); indexingCondensementProcessor.announceShutdown();
indexingAnalysisProcessor.announceShutdown(); indexingAnalysisProcessor.announceShutdown();
indexingStorageProcessor.announceShutdown(); indexingStorageProcessor.announceShutdown();
dhtDispatcher.close(); if (dhtDispatcher != null) dhtDispatcher.close();
indexingCondensementProcessor.awaitShutdown(12000); indexingCondensementProcessor.awaitShutdown(12000);
indexingAnalysisProcessor.awaitShutdown(12000); indexingAnalysisProcessor.awaitShutdown(12000);
indexingStorageProcessor.awaitShutdown(12000); indexingStorageProcessor.awaitShutdown(12000);
@ -2379,6 +2378,7 @@ public final class Switchboard extends serverSwitch {
} }
public boolean dhtTransferJob(final String segment) { public boolean dhtTransferJob(final String segment) {
if (dhtDispatcher == null) return false;
final String rejectReason = dhtShallTransfer(segment); final String rejectReason = dhtShallTransfer(segment);
if (rejectReason != null) { if (rejectReason != null) {
if (this.log.isFine()) { if (this.log.isFine()) {

@ -170,9 +170,9 @@ public final class serverCore extends AbstractBusyThread implements BusyThread {
public static String clientAddress(final Socket s) { public static String clientAddress(final Socket s) {
final InetAddress uAddr = s.getInetAddress(); final InetAddress uAddr = s.getInetAddress();
if (uAddr.isAnyLocalAddress()) return "localhost"; if (uAddr.isAnyLocalAddress()) return "127.0.0.1";
String cIP = uAddr.getHostAddress(); String cIP = uAddr.getHostAddress();
if (Domains.isLocal(cIP)) cIP = "localhost"; if (Domains.isLocal(cIP)) cIP = "127.0.0.1";
return cIP; return cIP;
} }

@ -117,12 +117,12 @@ public class Dispatcher {
gzipBody, gzipBody,
timeout); timeout);
int concurrentSender = Math.min(25, Math.max(10, WorkflowProcessor.useCPU * 2 + 1)); int concurrentSender = Math.min(32, Math.max(10, WorkflowProcessor.availableCPU));
indexingTransmissionProcessor = new WorkflowProcessor<Transmission.Chunk>( indexingTransmissionProcessor = new WorkflowProcessor<Transmission.Chunk>(
"storeDocumentIndex", "transferDocumentIndex",
"This is the RWI transmission process", "This is the RWI transmission process",
new String[]{"RWI/Cache/Collections"}, new String[]{"RWI/Cache/Collections"},
this, "storeDocumentIndex", concurrentSender * 2, null, concurrentSender); this, "transferDocumentIndex", concurrentSender * 2, null, concurrentSender);
} }
public int cloudSize() { public int cloudSize() {
@ -391,7 +391,7 @@ public class Dispatcher {
return true; return true;
} }
public Transmission.Chunk storeDocumentIndex(Transmission.Chunk chunk) { public Transmission.Chunk transferDocumentIndex(Transmission.Chunk chunk) {
// do the transmission // do the transmission
boolean success = chunk.transmit(); boolean success = chunk.transmit();

@ -217,15 +217,16 @@ public class WebStructureGraph {
for (final Map.Entry<String, Integer> entry : map.entrySet()) { for (final Map.Entry<String, Integer> entry : map.entrySet()) {
s.append(entry.getKey()); s.append(entry.getKey());
h = Integer.toHexString(entry.getValue().intValue()); h = Integer.toHexString(entry.getValue().intValue());
if (h.length() == 0) { int hl = h.length();
if (hl == 0) {
s.append("0000"); s.append("0000");
} else if (h.length() == 1) { } else if (hl == 1) {
s.append("000").append(h); s.append("000").append(h);
} else if (h.length() == 2) { } else if (hl == 2) {
s.append("00").append(h); s.append("00").append(h);
} else if (h.length() == 3) { } else if (hl == 3) {
s.append('0').append(h); s.append('0').append(h);
} else if (h.length() == 4) { } else if (hl == 4) {
s.append(h); s.append(h);
} else { } else {
s.append("FFFF"); s.append("FFFF");

@ -557,6 +557,7 @@ public class Domains {
// do the dns lookup on the dns server // do the dns lookup on the dns server
//if (!matchesList(host, nameCacheNoCachingPatterns)) System.out.println("DNSLOOKUP " + host); //if (!matchesList(host, nameCacheNoCachingPatterns)) System.out.println("DNSLOOKUP " + host);
try { try {
//System.out.println("DNSLOOKUP-*LOOKUP* " + host);
ip = InetAddress.getByName(host); //TimeoutRequest.getByName(host, 1000); // this makes the DNS request to backbone ip = InetAddress.getByName(host); //TimeoutRequest.getByName(host, 1000); // this makes the DNS request to backbone
} catch (final UnknownHostException e) { } catch (final UnknownHostException e) {
// add new entries // add new entries
@ -824,7 +825,10 @@ public class Domains {
// check if there are other local IP addresses that are not in // check if there are other local IP addresses that are not in
// the standard IP range // the standard IP range
if (localHostNames.contains(host)) return true; if (localHostNames.contains(host)) return true;
if (globalHosts != null && globalHosts.contains(host)) return false; if (globalHosts != null && globalHosts.contains(host)) {
//System.out.println("ISLOCAL-GLOBALHOSTS-HIT " + host);
return false;
}
// check dns lookup: may be a local address even if the domain name looks global // check dns lookup: may be a local address even if the domain name looks global
if (!recursive) return false; if (!recursive) return false;

@ -41,7 +41,6 @@ import net.yacy.kelondro.util.NamePrefixThreadFactory;
public class WorkflowProcessor<J extends WorkflowJob> { public class WorkflowProcessor<J extends WorkflowJob> {
public static final int availableCPU = Runtime.getRuntime().availableProcessors(); public static final int availableCPU = Runtime.getRuntime().availableProcessors();
public static int useCPU = availableCPU;
private static final ArrayList<WorkflowProcessor<?>> processMonitor = new ArrayList<WorkflowProcessor<?>>(); private static final ArrayList<WorkflowProcessor<?>> processMonitor = new ArrayList<WorkflowProcessor<?>>();
private ExecutorService executor; private ExecutorService executor;

Loading…
Cancel
Save