@ -60,6 +60,7 @@ import java.util.Iterator;
import java.util.List ;
import java.util.Map ;
import java.util.Properties ;
import java.util.Set ;
import java.util.SortedMap ;
import java.util.SortedSet ;
import java.util.TreeMap ;
@ -124,6 +125,7 @@ import net.yacy.crawler.retrieval.Response;
import net.yacy.crawler.robots.RobotsTxt ;
import net.yacy.data.BlogBoard ;
import net.yacy.data.BlogBoardComments ;
import net.yacy.data.BookmarkHelper ;
import net.yacy.data.BookmarksDB ;
import net.yacy.data.ListManager ;
import net.yacy.data.MessageBoard ;
@ -133,11 +135,13 @@ import net.yacy.data.WorkTables;
import net.yacy.data.wiki.WikiBoard ;
import net.yacy.data.wiki.WikiCode ;
import net.yacy.data.wiki.WikiParser ;
import net.yacy.data.ymark.YMarkTables ;
import net.yacy.document.Condenser ;
import net.yacy.document.Document ;
import net.yacy.document.LibraryProvider ;
import net.yacy.document.Parser ;
import net.yacy.document.TextParser ;
import net.yacy.document.Parser.Failure ;
import net.yacy.document.content.DCEntry ;
import net.yacy.document.content.SurrogateReader ;
import net.yacy.document.importer.OAIListFriendsLoader ;
@ -195,8 +199,7 @@ import net.yacy.utils.crypt;
import com.google.common.io.Files ;
public final class Switchboard extends serverSwitch
{
public final class Switchboard extends serverSwitch {
// load slots
public static int xstackCrawlSlots = 2000 ;
@ -269,18 +272,12 @@ public final class Switchboard extends serverSwitch
private final Semaphore shutdownSync = new Semaphore ( 0 ) ;
private boolean terminate = false ;
//private Object crawlingPausedSync = new Object();
//private boolean crawlingIsPaused = false;
private static Switchboard sb ;
public HashMap < String , Object [ ] > crawlJobsStatus = new HashMap < String , Object [ ] > ( ) ;
private static Switchboard sb = null ;
public Switchboard ( final File dataPath , final File appPath , final String initPath , final String configPath )
throws IOException {
public Switchboard ( final File dataPath , final File appPath , final String initPath , final String configPath ) throws IOException {
super ( dataPath , appPath , initPath , configPath ) ;
sb = this ;
// check if port is already occupied
final int port = getConfigInt ( "port" , 8090 ) ;
try {
@ -294,7 +291,6 @@ public final class Switchboard extends serverSwitch
}
MemoryTracker . startSystemProfiling ( ) ;
sb = this ;
// set loglevel and log
setLog ( new Log ( "SWITCHBOARD" ) ) ;
@ -374,9 +370,9 @@ public final class Switchboard extends serverSwitch
// start indexing management
this . log . logConfig ( "Starting Indexing Management" ) ;
final String networkName = getConfig ( SwitchboardConstants . NETWORK_NAME , "" ) ;
final long fileSizeMax = ( OS . isWindows ) ? sb . getConfigLong ( "filesize.max.win" , Integer . MAX_VALUE ) : sb . getConfigLong ( "filesize.max.other" , Integer . MAX_VALUE ) ;
final int redundancy = ( int ) sb . getConfigLong ( "network.unit.dhtredundancy.senior" , 1 ) ;
final int partitionExponent = ( int ) sb . getConfigLong ( "network.unit.dht.partitionExponent" , 0 ) ;
final long fileSizeMax = ( OS . isWindows ) ? this . getConfigLong ( "filesize.max.win" , Integer . MAX_VALUE ) : this . getConfigLong ( "filesize.max.other" , Integer . MAX_VALUE ) ;
final int redundancy = ( int ) this . getConfigLong ( "network.unit.dhtredundancy.senior" , 1 ) ;
final int partitionExponent = ( int ) this . getConfigLong ( "network.unit.dht.partitionExponent" , 0 ) ;
this . networkRoot = new File ( new File ( indexPath , networkName ) , "NETWORK" ) ;
this . queuesRoot = new File ( new File ( indexPath , networkName ) , "QUEUES" ) ;
this . networkRoot . mkdirs ( ) ;
@ -1022,7 +1018,7 @@ public final class Switchboard extends serverSwitch
"this is the content control import thread" ,
null ,
new InstantBusyThread (
new ContentControlImportThread ( sb ) ,
new ContentControlImportThread ( this ) ,
"run" ,
SwitchboardConstants . PEER_PING_METHOD_JOBCOUNT ,
SwitchboardConstants . PEER_PING_METHOD_FREEMEM ,
@ -1037,7 +1033,7 @@ public final class Switchboard extends serverSwitch
"this is the content control filter update thread" ,
null ,
new InstantBusyThread (
new ContentControlFilterUpdateThread ( sb ) ,
new ContentControlFilterUpdateThread ( this ) ,
"run" ,
SwitchboardConstants . PEER_PING_METHOD_JOBCOUNT ,
SwitchboardConstants . PEER_PING_METHOD_FREEMEM ,
@ -1063,7 +1059,6 @@ public final class Switchboard extends serverSwitch
this . trail = new LinkedBlockingQueue < String > ( ) ;
this . log . logConfig ( "Finished Switchboard Initialization" ) ;
sb = this ;
}
public int getIndexingProcessorsQueueSize ( ) {
@ -1235,10 +1230,9 @@ public final class Switchboard extends serverSwitch
final int wordCacheMaxCount =
( int ) getConfigLong ( SwitchboardConstants . WORDCACHE_MAX_COUNT , 20000 ) ;
final long fileSizeMax =
( OS . isWindows ) ? sb . getConfigLong ( "filesize.max.win" , Integer . MAX_VALUE ) : sb
. getConfigLong ( "filesize.max.other" , Integer . MAX_VALUE ) ;
final int redundancy = ( int ) sb . getConfigLong ( "network.unit.dhtredundancy.senior" , 1 ) ;
final int partitionExponent = ( int ) sb . getConfigLong ( "network.unit.dht.partitionExponent" , 0 ) ;
( OS . isWindows ) ? this . getConfigLong ( "filesize.max.win" , Integer . MAX_VALUE ) : this . getConfigLong ( "filesize.max.other" , Integer . MAX_VALUE ) ;
final int redundancy = ( int ) this . getConfigLong ( "network.unit.dhtredundancy.senior" , 1 ) ;
final int partitionExponent = ( int ) this . getConfigLong ( "network.unit.dht.partitionExponent" , 0 ) ;
final String networkName = getConfig ( SwitchboardConstants . NETWORK_NAME , "" ) ;
this . networkRoot = new File ( new File ( indexPrimaryPath , networkName ) , "NETWORK" ) ;
this . queuesRoot = new File ( new File ( indexPrimaryPath , networkName ) , "QUEUES" ) ;
@ -1543,7 +1537,7 @@ public final class Switchboard extends serverSwitch
public RankingProfile getRanking ( ) {
return ( getConfig ( "rankingProfile" , "" ) . isEmpty ( ) )
? new RankingProfile ( Classification . ContentDomain . TEXT )
: new RankingProfile ( "" , crypt . simpleDecode ( sb . getConfig ( "rankingProfile" , "" ) ) ) ;
: new RankingProfile ( "" , crypt . simpleDecode ( this . getConfig ( "rankingProfile" , "" ) ) ) ;
}
/ * *
@ -1970,7 +1964,7 @@ public final class Switchboard extends serverSwitch
// clear caches if necessary
if ( ! MemoryControl . request ( 8000000L , false ) ) {
sb . index . fulltext ( ) . clearCache ( ) ;
this . index . fulltext ( ) . clearCache ( ) ;
SearchEventCache . cleanupEvents ( false ) ;
this . trail . clear ( ) ;
}
@ -2246,7 +2240,7 @@ public final class Switchboard extends serverSwitch
this . clusterhashes = this . peers . clusterHashes ( getConfig ( "cluster.peers.yacydomain" , "" ) ) ;
// check if we are reachable and try to map port again if not (e.g. when router rebooted)
if ( getConfigBool ( SwitchboardConstants . UPNP_ENABLED , false ) & & sb . peers . mySeed ( ) . isJunior ( ) ) {
if ( getConfigBool ( SwitchboardConstants . UPNP_ENABLED , false ) & & this . peers . mySeed ( ) . isJunior ( ) ) {
UPnP . addPortMapping ( ) ;
}
@ -2698,6 +2692,122 @@ public final class Switchboard extends serverSwitch
}
}
public void stackURLs ( Set < DigestURI > rootURLs , final CrawlProfile profile , final Set < DigestURI > successurls , final Map < DigestURI , String > failurls ) {
List < Thread > stackthreads = new ArrayList < Thread > ( ) ; // do this concurrently
for ( DigestURI url : rootURLs ) {
final DigestURI turl = url ;
Thread t = new Thread ( ) {
public void run ( ) {
String failreason ;
if ( ( failreason = Switchboard . this . stackUrl ( profile , turl ) ) = = null ) successurls . add ( turl ) ; else failurls . put ( turl , failreason ) ;
}
} ;
t . start ( ) ;
stackthreads . add ( t ) ;
}
for ( Thread t : stackthreads ) try { t . join ( 5000 ) ; } catch ( InterruptedException e ) { }
}
/ * *
* stack the url to the crawler
* @param profile
* @param url
* @return null if this was ok . If this failed , return a string with a fail reason
* /
public String stackUrl ( CrawlProfile profile , DigestURI url ) {
byte [ ] handle = ASCII . getBytes ( profile . handle ( ) ) ;
// remove url from the index to be prepared for a re-crawl
final byte [ ] urlhash = url . hash ( ) ;
this . index . fulltext ( ) . remove ( urlhash ) ;
this . crawlQueues . noticeURL . removeByURLHash ( urlhash ) ;
this . crawlQueues . errorURL . remove ( urlhash ) ;
// special handling of ftp protocol
if ( url . isFTP ( ) ) {
try {
this . crawler . putActive ( handle , profile ) ;
this . pauseCrawlJob ( SwitchboardConstants . CRAWLJOB_LOCAL_CRAWL ) ;
this . crawlStacker . enqueueEntriesFTP ( this . peers . mySeed ( ) . hash . getBytes ( ) , profile . handle ( ) , url . getHost ( ) , url . getPort ( ) , false ) ;
return null ;
} catch ( final Exception e ) {
// mist
Log . logException ( e ) ;
return "problem crawling an ftp site: " + e . getMessage ( ) ;
}
}
// get a scraper to get the title
Document scraper ;
try {
scraper = this . loader . loadDocument ( url , CacheStrategy . IFFRESH , BlacklistType . CRAWLER , CrawlQueues . queuedMinLoadDelay ) ;
} catch ( IOException e ) {
Log . logException ( e ) ;
return "scraper cannot load URL: " + e . getMessage ( ) ;
}
final String title = scraper = = null ? url . toNormalform ( true ) : scraper . dc_title ( ) ;
final String description = scraper . dc_description ( ) ;
// add the url to the crawl stack
this . crawler . removePassive ( handle ) ; // if there is an old entry, delete it
this . crawler . putActive ( handle , profile ) ;
final String reasonString = this . crawlStacker . stackCrawl ( new Request (
this . peers . mySeed ( ) . hash . getBytes ( ) ,
url ,
null ,
"CRAWLING-ROOT" ,
new Date ( ) ,
profile . handle ( ) ,
0 ,
0 ,
0 ,
0
) ) ;
if ( reasonString ! = null ) return reasonString ;
// create a bookmark from crawl start url
//final Set<String> tags=ListManager.string2set(BookmarkHelper.cleanTagsString(post.get("bookmarkFolder","/crawlStart")));
final Set < String > tags = ListManager . string2set ( BookmarkHelper . cleanTagsString ( "/crawlStart" ) ) ;
tags . add ( "crawlStart" ) ;
final String [ ] keywords = scraper . dc_subject ( ) ;
if ( keywords ! = null ) {
for ( final String k : keywords ) {
final String kk = BookmarkHelper . cleanTagsString ( k ) ;
if ( kk . length ( ) > 0 ) tags . add ( kk ) ;
}
}
String tagStr = tags . toString ( ) ;
if ( tagStr . length ( ) > 2 & & tagStr . startsWith ( "[" ) & & tagStr . endsWith ( "]" ) ) tagStr = tagStr . substring ( 1 , tagStr . length ( ) - 2 ) ;
// we will create always a bookmark to use this to track crawled hosts
final BookmarksDB . Bookmark bookmark = this . bookmarksDB . createBookmark ( url . toNormalform ( true ) , "admin" ) ;
if ( bookmark ! = null ) {
bookmark . setProperty ( BookmarksDB . Bookmark . BOOKMARK_TITLE , title ) ;
bookmark . setProperty ( BookmarksDB . Bookmark . BOOKMARK_DESCRIPTION , description ) ;
bookmark . setOwner ( "admin" ) ;
bookmark . setPublic ( false ) ;
bookmark . setTags ( tags , true ) ;
this . bookmarksDB . saveBookmark ( bookmark ) ;
}
// do the same for ymarks
// TODO: could a non admin user add crawls?
try {
this . tables . bookmarks . createBookmark ( this . loader , url , YMarkTables . USER_ADMIN , true , "crawlStart" , "/Crawl Start" ) ;
} catch ( IOException e ) {
Log . logException ( e ) ;
} catch ( Failure e ) {
Log . logException ( e ) ;
}
// that was ok
return null ;
}
/ * *
* load the content of a URL , parse the content and add the content to the index This process is started
* concurrently . The method returns immediately after the call .
@ -2718,7 +2828,7 @@ public final class Switchboard extends serverSwitch
return ; // don't do double-work
}
final Request request = this . loader . request ( url , true , true ) ;
final CrawlProfile profile = sb . crawler . getActive ( ASCII . getBytes ( request . profileHandle ( ) ) ) ;
final CrawlProfile profile = this . crawler . getActive ( ASCII . getBytes ( request . profileHandle ( ) ) ) ;
final String acceptedError = this . crawlStacker . checkAcceptance ( url , profile , 0 ) ;
final String urls = url . toNormalform ( true ) ;
if ( acceptedError ! = null ) {
@ -2793,7 +2903,7 @@ public final class Switchboard extends serverSwitch
return ; // don't do double-work
}
final Request request = this . loader . request ( url , true , true ) ;
final CrawlProfile profile = sb . crawler . getActive ( ASCII . getBytes ( request . profileHandle ( ) ) ) ;
final CrawlProfile profile = this . crawler . getActive ( ASCII . getBytes ( request . profileHandle ( ) ) ) ;
final String acceptedError = this . crawlStacker . checkAcceptance ( url , profile , 0 ) ;
if ( acceptedError ! = null ) {
this . log . logInfo ( "addToCrawler: cannot load "
@ -2804,9 +2914,9 @@ public final class Switchboard extends serverSwitch
}
final String s ;
if ( asglobal ) {
s = sb . crawlQueues . noticeURL . push ( StackType . GLOBAL , request , this . robots ) ;
s = this . crawlQueues . noticeURL . push ( StackType . GLOBAL , request , this . robots ) ;
} else {
s = sb . crawlQueues . noticeURL . push ( StackType . LOCAL , request , this . robots ) ;
s = this . crawlQueues . noticeURL . push ( StackType . LOCAL , request , this . robots ) ;
}
if ( s ! = null ) {
@ -3179,7 +3289,7 @@ public final class Switchboard extends serverSwitch
if ( links ! = null ) {
if ( links . size ( ) < 1000 ) { // limit to 1000 to skip large index pages
final Iterator < MultiProtocolURI > i = links . keySet ( ) . iterator ( ) ;
final boolean globalcrawljob = sb . getConfigBool ( "heuristic.searchresults.crawlglobal" , false ) ;
final boolean globalcrawljob = Switchboard. this . getConfigBool ( "heuristic.searchresults.crawlglobal" , false ) ;
while ( i . hasNext ( ) ) {
url = DigestURI . toDigestURI ( i . next ( ) ) ;
boolean islocal = url . getHost ( ) . contentEquals ( startUrl . getHost ( ) ) ;
@ -3239,7 +3349,7 @@ public final class Switchboard extends serverSwitch
searchEvent . getRankingResult ( ) . oneFeederStarted ( ) ;
try {
final Response response =
sb. loader . load ( sb . loader . request ( url , true , false ) , CacheStrategy . NOCACHE , BlacklistType . SEARCH , TextSnippet . snippetMinLoadDelay ) ;
Switchboard. this . loader . load ( Switchboard . this . loader . request ( url , true , false ) , CacheStrategy . NOCACHE , BlacklistType . SEARCH , TextSnippet . snippetMinLoadDelay ) ;
final byte [ ] resource = ( response = = null ) ? null : response . getContent ( ) ;
//System.out.println("BLEKKO: " + UTF8.String(resource));
rss = resource = = null ? null : RSSReader . parse ( RSSFeed . DEFAULT_MAXSIZE , resource ) ;
@ -3337,7 +3447,7 @@ public final class Switchboard extends serverSwitch
if ( Thread . currentThread ( ) . isInterrupted ( ) ) {
break ;
}
seedListFileURL = sb . getConfig ( "network.unit.bootstrap.seedlist" + c , "" ) ;
seedListFileURL = this . getConfig ( "network.unit.bootstrap.seedlist" + c , "" ) ;
if ( seedListFileURL . isEmpty ( ) ) {
break ;
}