@ -114,7 +114,6 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@ -130,7 +129,6 @@ import de.anomic.kelondro.kelondroTables;
import de.anomic.server.serverAbstractSwitch;
import de.anomic.server.serverCodings;
import de.anomic.server.serverCore;
import de.anomic.server.serverDate;
import de.anomic.server.serverInstantThread;
import de.anomic.server.serverObjects;
import de.anomic.server.serverSemaphore;
@ -149,6 +147,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// load slots
public static int crawlSlots = 10;
public static int indexingSlots = 100;
// couloured list management
public static TreeSet blueList = null;
@ -164,7 +163,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public plasmaHTCache cacheManager;
public plasmaSnippetCache snippetCache;
public plasmaCrawlLoader cacheLoader;
public LinkedList queueStack = new LinkedList();
public plasmaSwitchboardQueue sbQueue;
public messageBoard messageDB;
public wikiBoard wikiDB;
public String remoteProxyHost;
@ -256,11 +255,16 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// start a cache manager
log.logSystem("Starting HT Cache Manager");
this.cacheManager = new plasmaHTCache(this, ramHTTP);
File htCachePath = new File(getRootPath(), getConfig("proxyCache","HTCACHE"));
long maxCacheSize = 1024 * 1024 * Long.parseLong(getConfig("proxyCacheSize", "2")); // this is megabyte
this.cacheManager = new plasmaHTCache(htCachePath, maxCacheSize, ramHTTP);
// make parser
log.logSystem("Starting Parser");
this.parser = new plasmaParser();
this.parser = new plasmaParser();
// initialize switchboard queue
sbQueue = new plasmaSwitchboardQueue(this.cacheManager, urlPool.loadedURL, new File(plasmaPath, "switchboardQueue0.stack"), 10, profiles);
// define an extension-blacklist
log.logSystem("Parser: Initializing Extension Mappings for Media/Parser");
@ -347,7 +351,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
new serverInstantThread(this, "deQueue", "queueSize"), 10000 + (i * 1000));
deployThread("70_cachemanager", "Proxy Cache Enqueue", "job takes new proxy files from RAM stack, stores them, and hands over to the Indexing Stack",
new serverInstantThread(cacheManager, "job", "size"), 10000);
new serverInstantThread(this, "htEntryStoreJob", "htEntrySize"), 10000);
deployThread("62_remotetriggeredcrawl", "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer",
new serverInstantThread(this, "remoteTriggeredCrawlJob", "remoteTriggeredCrawlJobSize"), 30000);
deployThread("61_globalcrawltrigger", "Global Crawl Trigger", "thread that triggeres remote peers for crawling",
@ -423,7 +427,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
} catch (IOException e) {}
private void cleanProfiles() {
if ((queueStack.size() > 0) || (cacheLoader.size() > 0) || (urlPool.noticeURL.stackSize() > 0)) return;
if ((sbQueue.size() > 0) || (cacheLoader.size() > 0) || (urlPool.noticeURL.stackSize() > 0)) return;
Iterator i = profiles.profiles(true);
plasmaCrawlProfile.entry entry;
try {
@ -440,6 +444,100 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
return cacheManager;
synchronized public void htEntryStoreEnqueued(plasmaHTCache.Entry entry) throws IOException {
if (cacheManager.full())
synchronized public boolean htEntryStoreProcess(plasmaHTCache.Entry entry) throws IOException {
if (entry == null) return false;
// store response header
if ((entry.status == plasmaHTCache.CACHE_FILL) ||
(entry.status == plasmaHTCache.CACHE_STALE_RELOAD_GOOD) ||
(entry.status == plasmaHTCache.CACHE_STALE_RELOAD_BAD)) {
cacheManager.storeHeader(entry.nomalizedURLHash, entry.responseHeader);
// work off unwritten files and undone parsing
String storeError = null;
if (((entry.status == plasmaHTCache.CACHE_FILL) || (entry.status == plasmaHTCache.CACHE_STALE_RELOAD_GOOD)) &&
((storeError = entry.shallStoreCache()) == null)) {
// write file if not written yet
if (entry.cacheArray != null) {
cacheManager.writeFile(entry.url, entry.cacheArray);
log.logInfo("WRITE FILE (" + entry.cacheArray.length + " bytes) " + entry.cacheFile);
// enqueue for further crawling
enQueue(sbQueue.newEntry(entry.url, plasmaURL.urlHash(entry.referrerURL()),
entry.requestHeader.ifModifiedSince(), entry.requestHeader.containsKey(httpHeader.COOKIE),
entry.initiator(), entry.depth, entry.profile.handle(),
(entry.scraper == null) ? 0 : entry.scraper.getAnchors().size(),
(entry.scraper == null) ? 0 : entry.scraper.getImages().size(),
(entry.scraper == null) ? "" : entry.scraper.getHeadline()
} else if (entry.status == plasmaHTCache.CACHE_PASSING) {
// even if the file should not be stored in the cache, it can be used to be indexed
if (storeError != null) log.logDebug("NOT STORED " + entry.cacheFile + ":" + storeError);
// enqueue for further crawling
enQueue(sbQueue.newEntry(entry.url, plasmaURL.urlHash(entry.referrerURL()),
entry.requestHeader.ifModifiedSince(), entry.requestHeader.containsKey(httpHeader.COOKIE),
entry.initiator(), entry.depth, entry.profile.handle(),
(entry.scraper == null) ? 0 : entry.scraper.getAnchors().size(),
(entry.scraper == null) ? 0 : entry.scraper.getImages().size(),
(entry.scraper == null) ? "" : entry.scraper.getHeadline()
// write log
switch (entry.status) {
case plasmaHTCache.CACHE_UNFILLED:
log.logInfo("CACHE UNFILLED: " + entry.cacheFile); break;
case plasmaHTCache.CACHE_FILL:
log.logInfo("CACHE FILL: " + entry.cacheFile +
((entry.cacheArray == null) ? "" : " (cacheArray is filled)") +
((entry.scraper == null) ? "" : " (scraper is filled)"));
case plasmaHTCache.CACHE_HIT:
log.logInfo("CACHE HIT: " + entry.cacheFile); break;
log.logInfo("CACHE STALE, NO RELOAD: " + entry.cacheFile); break;
log.logInfo("CACHE STALE, NECESSARY RELOAD: " + entry.cacheFile); break;
log.logInfo("CACHE STALE, SUPERFLUOUS RELOAD: " + entry.cacheFile); break;
case plasmaHTCache.CACHE_PASSING:
log.logInfo("PASSING: " + entry.cacheFile); break;
log.logInfo("CACHE STATE UNKNOWN: " + entry.cacheFile); break;
return true;
public boolean htEntryStoreJob() {
if (cacheManager.empty()) return false;
try {
return htEntryStoreProcess(cacheManager.pop());
} catch (IOException e) {
return false;
public int htEntrySize() {
return cacheManager.size();
private static TreeSet loadList(File file) {
TreeSet list = new TreeSet(kelondroMSetTools.fastStringComparator);
if (!(file.exists())) return list;
@ -487,7 +585,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public int queueSize() {
return queueStack.size();
return sbQueue.size();
//return processStack.size() + cacheLoader.size() + noticeURL.stackSize();
@ -502,16 +600,24 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public void enQueue(Object job) {
plasmaHTCache.Entry entry = (plasmaHTCache.Entry) job;
if (!(job instanceof plasmaSwitchboardQueue.Entry)) {
System.out.println("internal error at plasmaSwitchboard.enQueue: wrong job type");
try {
sbQueue.push((plasmaSwitchboardQueue.Entry) job);
} catch (IOException e) {
log.logError("IOError in plasmaSwitchboard.enQueue: " + e.getMessage());
public boolean deQueue() {
// work off fresh entries from the proxy or from the crawler
plasmaHTCache.Entry nextentry;
synchronized (queueStack) {
if (queueStack.size() == 0) {
plasmaSwitchboardQueue.Entry nextentry;
synchronized (sbQueue) {
if (sbQueue.size() == 0) {
//log.logDebug("DEQUEUE: queue is empty");
return false; // nothing to do
@ -521,12 +627,18 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// do one processing step
log.logDebug("DEQUEUE: cacheManager=" + ((cacheManager.idle()) ? "idle" : "busy") +
", queueStack=" + queueStack.size() +
", sbQueueSize=" + sbQueue.size() +
", coreStackSize=" + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE) +
", limitStackSize=" + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_LIMIT) +
", overhangStackSize=" + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_OVERHANG) +
", remoteStackSize=" + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_REMOTE));
nextentry = (plasmaHTCache.Entry) queueStack.removeFirst();
try {
nextentry = sbQueue.pop();
} catch (IOException e) {
log.logError("IOError in plasmaSwitchboard.deQueue: " + e.getMessage());
return false;
return true;
@ -601,13 +713,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
//log.logDebug("CoreCrawl: queue is empty");
return false;
if (queueStack.size() >= crawlSlots) {
log.logDebug("CoreCrawl: too many processes in queue, dismissed (" +
"queueStack=" + queueStack.size() + ")");
if (sbQueue.size() >= indexingSlots) {
log.logDebug("CoreCrawl: too many processes in indexing queue, dismissed (" +
"sbQueueSize=" + sbQueue.size() + ")");
return false;
if (cacheLoader.size() >= crawlSlots) {
log.logDebug("CoreCrawl: too many loader in queue, dismissed (" +
log.logDebug("CoreCrawl: too many processes in loader queue, dismissed (" +
"cacheLoader=" + cacheLoader.size() + ")");
return false;
@ -688,7 +800,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
", permission=" + ((yacyCore.seedDB == null) ? "undefined" : (((yacyCore.seedDB.mySeed.isSenior()) || (yacyCore.seedDB.mySeed.isPrincipal())) ? "true" : "false")));
boolean tryRemote =
((urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE) != 0) || (queueStack.size() != 0)) /* should do ourself */ &&
((urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE) != 0) || (sbQueue.size() != 0)) /* should do ourself */ &&
(profile.remoteIndexing()) /* granted */ &&
(urlEntry.initiator() != null) && (!(urlEntry.initiator().equals(plasmaURL.dummyHash))) /* not proxy */ &&
((yacyCore.seedDB.mySeed.isSenior()) ||
@ -700,9 +812,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// alternatively do a local crawl
if (queueStack.size() >= crawlSlots) {
if (sbQueue.size() >= crawlSlots) {
log.logDebug("LimitCrawl: too many processes in queue, dismissed (" +
"queueStack=" + queueStack.size() + ")");
"sbQueueSize=" + sbQueue.size() + ")");
return false;
if (cacheLoader.size() >= crawlSlots) {
@ -776,8 +888,8 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
processLocalCrawling(urlEntry, profile, stats);
return true;
private void processResourceStack(plasmaHTCache.Entry entry) {
private void processResourceStack(plasmaSwitchboardQueue.Entry entry) {
// work off one stack entry with a fresh resource (scraped web page)
try {
// we must distinguish the following cases: resource-load was initiated by
@ -802,39 +914,43 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
processCase = 6;
log.logDebug("processResourceStack processCase=" + processCase + ", depth=" + entry.depth + ", maxDepth=" + entry.profile.generalDepth() + ", filter=" + entry.profile.generalFilter() + ", initiatorHash=" + initiatorHash + ", status=" + entry.status + ", source=" + ((entry.cacheArray == null) ? "scraper" : "byte[]") + ", url=" + entry.nomalizedURLString); // DEBUG
log.logDebug("processResourceStack processCase=" + processCase +
", depth=" + entry.depth() +
", maxDepth=" + ((entry.profile() == null) ? "null" : "" + entry.profile().generalDepth()) +
", filter=" + ((entry.profile() == null) ? "null" : "" + entry.profile().generalFilter()) +
", initiatorHash=" + initiatorHash +
", responseHeader=" + ((entry.responseHeader() == null) ? "null" : entry.responseHeader().toString()) +
", url=" + entry.url()); // DEBUG
// parse content
plasmaParserDocument document = null;
if (plasmaParser.supportedMimeTypesContains(entry.responseHeader.mime())) {
if (entry.scraper != null) {
log.logDebug("(Parser) '" + entry.nomalizedURLString + "' is pre-parsed by scraper");
document = parser.transformScraper(entry.url, entry.responseHeader.mime(), entry.scraper);
} else if (entry.cacheArray != null) {
log.logDebug("(Parser) '" + entry.nomalizedURLString + "' is not parsed yet, parsing now from cacheArray");
document = parser.parseSource(entry.url, entry.responseHeader.mime(), entry.cacheArray);
if ((plasmaParser.supportedFileExt(entry.url())) ||
((entry.responseHeader() != null) &&
(plasmaParser.supportedMimeTypesContains(entry.responseHeader().mime())))) {
if (entry.cacheFile().exists()) {
log.logDebug("(Parser) '" + entry.normalizedURLString() + "' is not parsed yet, parsing now from File");
document = parser.parseSource(entry.url(), (entry.responseHeader() == null) ? null : entry.responseHeader().mime(), entry.cacheFile());
} else {
if (entry.cacheFile.exists()) {
log.logDebug("(Parser) '" + entry.nomalizedURLString + "' is not parsed yet, parsing now from File");
document = parser.parseSource(entry.url, entry.responseHeader.mime(), entry.cacheFile);
} else {
log.logDebug("(Parser) '" + entry.nomalizedURLString + "' cannot be parsed, no resource available");
log.logDebug("(Parser) '" + entry.normalizedURLString() + "' cannot be parsed, no resource available");
if (document == null) {
log.logError("(Parser) '" + entry.nomalizedURLString + "' parse failure");
log.logError("(Parser) '" + entry.normalizedURLString() + "' parse failure");
} else {
log.logDebug("(Parser) '" + entry.nomalizedURLString + "'. Unsupported mimeType '" + entry.responseHeader.mime() + "'.");
log.logDebug("(Parser) '" + entry.normalizedURLString() + "'. Unsupported mimeType '" + ((entry.responseHeader() == null) ? null : entry.responseHeader().mime()) + "'.");
Date loadDate = entry.responseHeader().lastModified();
if (loadDate == null) loadDate = entry.responseHeader().date();
if (loadDate == null) loadDate = new Date();
// put anchors on crawl stack
if (((processCase == 4) || (processCase == 5)) &&
(entry.depth < entry.profile.generalDepth())) {
((entry.profile() == null) || (entry.depth() < entry.profile().generalDepth()))) {
Map hl = document.getHyperlinks();
Iterator i = hl.entrySet().iterator();
String nexturlstring;
@ -844,15 +960,16 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
while (i.hasNext()) {
e = (Map.Entry) i.next();
nexturlstring = (String) e.getKey();
rejectReason = stackCrawl(nexturlstring, entry.nomalizedURLString, initiatorHash, (String) e.getValue(), entry.lastModified, entry.depth + 1, entry.profile);
rejectReason = stackCrawl(nexturlstring, entry.normalizedURLString(), initiatorHash, (String) e.getValue(), loadDate, entry.depth() + 1, entry.profile());
if (rejectReason == null) {
} else {
urlPool.errorURL.newEntry(new URL(nexturlstring), entry.nomalizedURLString, entry.initiator(), yacyCore.seedDB.mySeed.hash,
urlPool.errorURL.newEntry(new URL(nexturlstring), entry.normalizedURLString(), entry.initiator(), yacyCore.seedDB.mySeed.hash,
(String) e.getValue(), rejectReason, new bitfield(plasmaURL.urlFlagLength), false);
log.logInfo("CRAWL: ADDED " + c + " LINKS FROM " + entry.url.toString() +
log.logInfo("CRAWL: ADDED " + c + " LINKS FROM " + entry.url().toString() +
", NEW CRAWL STACK SIZE IS " + urlPool.noticeURL.stackSize(plasmaCrawlNURL.STACK_TYPE_CORE));
@ -870,51 +987,56 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
if (noIndexReason == null) {
// strip out words
log.logDebug("(Profile) Condensing for '" + entry.nomalizedURLString + "'");
log.logDebug("(Profile) Condensing for '" + entry.normalizedURLString() + "'");
plasmaCondenser condenser = new plasmaCondenser(new ByteArrayInputStream(document.getText()));
//log.logInfo("INDEXING HEADLINE:" + descr);
try {
log.logDebug("(Profile) Create LURL-Entry for '" + entry.nomalizedURLString + "'");
log.logDebug("(Profile) Create LURL-Entry for '" + entry.normalizedURLString() + "', " +
"responseHeader=" + entry.responseHeader().toString());
Date lastModified = entry.responseHeader().lastModified();
if (lastModified == null) lastModified = entry.responseHeader().date();
if (lastModified == null) lastModified = new Date();
plasmaCrawlLURL.entry newEntry = urlPool.loadedURL.newEntry(
entry.url, descr, entry.lastModified, new Date(),
entry.url(), descr, lastModified, new Date(),
0, true,
Integer.parseInt(condenser.getAnalysis().getProperty("INFORMATION_VALUE","0"), 16),
entry.language, entry.doctype,
(int) Long.parseLong(condenser.getAnalysis().getProperty("NUMB_WORDS","0"), 16),
String urlHash = newEntry.hash();
log.logDebug("(Profile) Remove NURL for '" + entry.nomalizedURLString + "'");
log.logDebug("(Profile) Remove NURL for '" + entry.normalizedURLString() + "'");
urlPool.noticeURL.remove(urlHash); // worked-off
if (((processCase == 4) || (processCase == 5) || (processCase == 6)) &&
(entry.profile.localIndexing())) {
(entry.profile().localIndexing())) {
// remove stopwords
log.logDebug("(Profile) Exclude Stopwords for '" + entry.nomalizedURLString + "'");
log.logInfo("Excluded " + condenser.excludeWords(stopwords) + " words in URL " + entry.url);
log.logDebug("(Profile) Exclude Stopwords for '" + entry.normalizedURLString() + "'");
log.logInfo("Excluded " + condenser.excludeWords(stopwords) + " words in URL " + entry.url());
//System.out.println("DEBUG: words left to be indexed: " + condenser.getWords());
// do indexing
log.logDebug("(Profile) Create Index for '" + entry.nomalizedURLString + "'");
int words = searchManager.addPageIndex(entry.url, urlHash, entry.lastModified, condenser, entry.language, entry.doctype);
log.logInfo("Indexed " + words + " words in URL " + entry.url + " (" + descr + ")");
log.logDebug("(Profile) Create Index for '" + entry.normalizedURLString() + "'");
int words = searchManager.addPageIndex(entry.url(), urlHash, loadDate, condenser, plasmaWordIndexEntry.language(entry.url()), plasmaWordIndexEntry.docType(entry.responseHeader().mime()));
log.logInfo("Indexed " + words + " words in URL " + entry.url() + " (" + descr + ")");
// if this was performed for a remote crawl request, notify requester
if ((processCase == 6) && (initiator != null)) {
log.logInfo("Sending crawl receipt for '" + entry.nomalizedURLString + "' to " + initiator.getName());
log.logInfo("Sending crawl receipt for '" + entry.normalizedURLString() + "' to " + initiator.getName());
yacyClient.crawlReceipt(initiator, "crawl", "fill", "indexed", newEntry, "");
} else {
log.logDebug("Resource '" + entry.nomalizedURLString + "' not indexed (indexing is off)");
log.logDebug("Resource '" + entry.normalizedURLString() + "' not indexed (indexing is off)");
} catch (Exception ee) {
log.logError("Could not index URL " + entry.url + ": " + ee.getMessage());
log.logError("Could not index URL " + entry.url() + ": " + ee.getMessage());
if ((processCase == 6) && (initiator != null)) {
yacyClient.crawlReceipt(initiator, "crawl", "exception", ee.getMessage(), null, "");
@ -922,8 +1044,8 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
} else {
log.logInfo("Not indexed any word in URL " + entry.url + "; cause: " + noIndexReason);
urlPool.errorURL.newEntry(entry.url, referrerHash,
log.logInfo("Not indexed any word in URL " + entry.url() + "; cause: " + noIndexReason);
urlPool.errorURL.newEntry(entry.url(), referrerHash,
((entry.proxy()) ? plasmaURL.dummyHash : entry.initiator()),
descr, noIndexReason, new bitfield(plasmaURL.urlFlagLength), true);
@ -1464,7 +1586,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public String toString() {
// it is possible to use this method in the cgi pages.
// actually it is used there for testing purpose
return "PROPS: " + super.toString() + "; QUEUE: " + queueStack.toString();
return "PROPS: " + super.toString() + "; QUEUE: " + sbQueue.toString();
// method for index deletion
@ -1536,7 +1658,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
long starttime = System.currentTimeMillis();
try {
if (
(queueStack.size() == 0) &&
(sbQueue.size() == 0) &&
(cacheLoader.size() == 0) &&
(urlPool.noticeURL.stackSize() == 0) &&
(getConfig("allowDistributeIndex", "false").equals("true")) &&