@ -193,6 +193,7 @@ import net.yacy.kelondro.util.OS;
import net.yacy.kelondro.util.SetTools ;
import net.yacy.kelondro.util.SetTools ;
import net.yacy.kelondro.workflow.BusyThread ;
import net.yacy.kelondro.workflow.BusyThread ;
import net.yacy.kelondro.workflow.InstantBusyThread ;
import net.yacy.kelondro.workflow.InstantBusyThread ;
import net.yacy.kelondro.workflow.OneTimeBusyThread ;
import net.yacy.kelondro.workflow.WorkflowProcessor ;
import net.yacy.kelondro.workflow.WorkflowProcessor ;
import net.yacy.kelondro.workflow.WorkflowThread ;
import net.yacy.kelondro.workflow.WorkflowThread ;
import net.yacy.peers.DHTSelection ;
import net.yacy.peers.DHTSelection ;
@ -337,7 +338,14 @@ public final class Switchboard extends serverSwitch {
// UPnP port mapping
// UPnP port mapping
if ( getConfigBool ( SwitchboardConstants . UPNP_ENABLED , false ) ) {
if ( getConfigBool ( SwitchboardConstants . UPNP_ENABLED , false ) ) {
InstantBusyThread . oneTimeJob ( UPnP . class , "addPortMappings" , 0 ) ;
new OneTimeBusyThread ( "UPnP.addPortMappings" ) {
@Override
public boolean jobImpl ( ) throws Exception {
UPnP . addPortMappings ( ) ;
return true ;
}
} . start ( ) ;
}
}
// init TrayIcon if possible
// init TrayIcon if possible
@ -611,7 +619,14 @@ public final class Switchboard extends serverSwitch {
// start yacy core
// start yacy core
this . log . config ( "Starting YaCy Protocol Core" ) ;
this . log . config ( "Starting YaCy Protocol Core" ) ;
this . yc = new Network ( this ) ;
this . yc = new Network ( this ) ;
InstantBusyThread . oneTimeJob ( this , "loadSeedLists" , 0 ) ;
new OneTimeBusyThread ( "Switchboard.loadSeedLists" ) {
@Override
public boolean jobImpl ( ) throws Exception {
loadSeedLists ( ) ;
return true ;
}
} . start ( ) ;
//final long startedSeedListAquisition = System.currentTimeMillis();
//final long startedSeedListAquisition = System.currentTimeMillis();
// init a DHT transmission dispatcher
// init a DHT transmission dispatcher
@ -941,7 +956,17 @@ public final class Switchboard extends serverSwitch {
// initializing the resourceObserver
// initializing the resourceObserver
this . observer = new ResourceObserver ( this ) ;
this . observer = new ResourceObserver ( this ) ;
InstantBusyThread . oneTimeJob ( this . observer , "resourceObserverJob" , 0 ) ;
final ResourceObserver resourceObserver = this . observer ;
new OneTimeBusyThread ( "ResourceObserver.resourceObserverJob" ) {
@Override
public boolean jobImpl ( ) throws Exception {
resourceObserver . resourceObserverJob ( ) ;
return true ;
}
} . start ( ) ;
// initializing the stackCrawlThread
// initializing the stackCrawlThread
this . crawlStacker =
this . crawlStacker =
@ -1032,101 +1057,142 @@ public final class Switchboard extends serverSwitch {
// deploy busy threads
// deploy busy threads
this . log . config ( "Starting Threads" ) ;
this . log . config ( "Starting Threads" ) ;
MemoryControl . gc ( 10000 , "plasmaSwitchboard, help for profiler" ) ; // help for profiler - thq
MemoryControl . gc ( 10000 , "plasmaSwitchboard, help for profiler" ) ; // help for profiler - thq
deployThread (
deployThread (
SwitchboardConstants . CLEANUP ,
SwitchboardConstants . CLEANUP ,
"Cleanup" ,
"Cleanup" ,
"cleaning process" ,
"cleaning process" ,
null ,
null ,
new InstantBusyThread (
new InstantBusyThread ( "Switchboard.cleanupJob" , 30000 , 10000 ) {
this ,
SwitchboardConstants . CLEANUP_METHOD_START ,
@Override
SwitchboardConstants . CLEANUP_METHOD_JOBCOUNT ,
public boolean jobImpl ( ) throws Exception {
SwitchboardConstants . CLEANUP_METHOD_FREEMEM ,
return cleanupJob ( ) ;
30000 ,
}
10000 ) ,
@Override
public int getJobCount ( ) {
return cleanupJobSize ( ) ;
}
@Override
public void freememImpl ( ) {
}
} ,
60000 ) ; // all 10 minutes, wait 1 minute until first run
60000 ) ; // all 10 minutes, wait 1 minute until first run
deployThread (
deployThread (
SwitchboardConstants . SCHEDULER ,
SwitchboardConstants . SCHEDULER ,
"Scheduler" ,
"Scheduler" ,
"starts scheduled processes from the API Processing table" ,
"starts scheduled processes from the API Processing table" ,
null ,
null ,
new InstantBusyThread (
new InstantBusyThread ( "Switchboard.schedulerJob" , 30000 , 10000 ) {
this ,
@Override
SwitchboardConstants . SCHEDULER_METHOD_START ,
public boolean jobImpl ( ) throws Exception {
SwitchboardConstants . SCHEDULER_METHOD_JOBCOUNT ,
return schedulerJob ( ) ;
SwitchboardConstants . SCHEDULER_METHOD_FREEMEM ,
}
30000 ,
10000 ) ,
@Override
public int getJobCount ( ) {
return schedulerJobSize ( ) ;
}
@Override
public void freememImpl ( ) {
}
} ,
60000 ) ; // all 10 minutes, wait 1 minute until first run
60000 ) ; // all 10 minutes, wait 1 minute until first run
deployThread (
deployThread (
SwitchboardConstants . SURROGATES ,
SwitchboardConstants . SURROGATES ,
"Surrogates" ,
"Surrogates" ,
"A thread that polls the SURROGATES path and puts all Documents in one surroagte file into the indexing queue." ,
"A thread that polls the SURROGATES path and puts all Documents in one surroagte file into the indexing queue." ,
null ,
null ,
new InstantBusyThread (
new InstantBusyThread ( "Switchboard.surrogateProcess" , 20000 , 0 ) {
this ,
@Override
SwitchboardConstants . SURROGATES_METHOD_START ,
public boolean jobImpl ( ) throws Exception {
SwitchboardConstants . SURROGATES_METHOD_JOBCOUNT ,
return surrogateProcess ( ) ;
SwitchboardConstants . SURROGATES_METHOD_FREEMEM ,
}
20000 ,
0 ) ,
@Override
public int getJobCount ( ) {
return surrogateQueueSize ( ) ;
}
@Override
public void freememImpl ( ) {
surrogateFreeMem ( ) ;
}
} ,
10000 ) ;
10000 ) ;
this . initRemoteCrawler ( this . getConfigBool ( SwitchboardConstants . CRAWLJOB_REMOTE , false ) ) ;
this . initRemoteCrawler ( this . getConfigBool ( SwitchboardConstants . CRAWLJOB_REMOTE , false ) ) ;
this . initAutocrawl ( this . getConfigBool ( SwitchboardConstants . AUTOCRAWL , false ) ) ;
this . initAutocrawl ( this . getConfigBool ( SwitchboardConstants . AUTOCRAWL , false ) ) ;
final CrawlQueues crawlQueue = this . crawlQueues ;
deployThread (
deployThread (
SwitchboardConstants . CRAWLJOB_LOCAL_CRAWL ,
SwitchboardConstants . CRAWLJOB_LOCAL_CRAWL ,
"Local Crawl" ,
"Local Crawl" ,
"thread that performes a single crawl step from the local crawl queue" ,
"thread that performes a single crawl step from the local crawl queue" ,
"/IndexCreateQueues_p.html?stack=LOCAL" ,
"/IndexCreateQueues_p.html?stack=LOCAL" ,
new InstantBusyThread (
new InstantBusyThread ( "CrawlQueues.coreCrawlJob" , 0 , 0 ) {
this . crawlQueues ,
@Override
SwitchboardConstants . CRAWLJOB_LOCAL_CRAWL_METHOD_START ,
public boolean jobImpl ( ) throws Exception {
SwitchboardConstants . CRAWLJOB_LOCAL_CRAWL_METHOD_JOBCOUNT ,
return crawlQueue . coreCrawlJob ( ) ;
SwitchboardConstants . CRAWLJOB_LOCAL_CRAWL_METHOD_FREEMEM ,
}
0 ,
0 ) ,
@Override
public int getJobCount ( ) {
return crawlQueue . coreCrawlJobSize ( ) ;
}
@Override
public void freememImpl ( ) {
crawlQueue . freemem ( ) ;
}
} ,
10000 ) ;
10000 ) ;
final Network net = this . yc ;
deployThread (
deployThread (
SwitchboardConstants . SEED_UPLOAD ,
SwitchboardConstants . SEED_UPLOAD ,
"Seed-List Upload" ,
"Seed-List Upload" ,
"task that a principal peer performes to generate and upload a seed-list to a ftp account" ,
"task that a principal peer performes to generate and upload a seed-list to a ftp account" ,
null ,
null ,
new InstantBusyThread (
new InstantBusyThread ( "Network.publishSeedList" , 600000 , 300000 ) {
this . yc ,
@Override
SwitchboardConstants . SEED_UPLOAD_METHOD_START ,
public boolean jobImpl ( ) throws Exception {
SwitchboardConstants . SEED_UPLOAD_METHOD_JOBCOUNT ,
net . publishSeedList ( ) ;
SwitchboardConstants . SEED_UPLOAD_METHOD_FREEMEM ,
return true ;
600000 ,
}
300000 ) ,
} ,
180000 ) ;
180000 ) ;
deployThread (
deployThread (
SwitchboardConstants . PEER_PING ,
SwitchboardConstants . PEER_PING ,
"YaCy Core" ,
"YaCy Core" ,
"this is the p2p-control and peer-ping task" ,
"this is the p2p-control and peer-ping task" ,
null ,
null ,
new InstantBusyThread (
new InstantBusyThread ( "Network.peerPing" , 30000 , 30000 ) {
this . yc ,
@Override
SwitchboardConstants . PEER_PING_METHOD_START ,
public boolean jobImpl ( ) throws Exception {
SwitchboardConstants . PEER_PING_METHOD_JOBCOUNT ,
net . peerPing ( ) ;
SwitchboardConstants . PEER_PING_METHOD_FREEMEM ,
return true ;
30000 ,
}
30000 ) ,
} ,
10000 ) ;
10000 ) ;
deployThread (
deployThread (
SwitchboardConstants . INDEX_DIST ,
SwitchboardConstants . INDEX_DIST ,
"DHT Distribution" ,
"DHT Distribution" ,
"selection, transfer and deletion of index entries that are not searched on your peer, but on others" ,
"selection, transfer and deletion of index entries that are not searched on your peer, but on others" ,
null ,
null ,
new InstantBusyThread (
new InstantBusyThread ( "Switchboard.dhtTransferJob" , 10000 , 1000 ) {
this ,
@Override
SwitchboardConstants . INDEX_DIST_METHOD_START ,
public boolean jobImpl ( ) throws Exception {
SwitchboardConstants . INDEX_DIST_METHOD_JOBCOUNT ,
return dhtTransferJob ( ) ;
SwitchboardConstants . INDEX_DIST_METHOD_FREEMEM ,
}
10000 ,
} ,
1000 ) ,
60000 ,
60000 ,
Long . parseLong ( getConfig ( SwitchboardConstants . INDEX_DIST_IDLESLEEP , "5000" ) ) ,
Long . parseLong ( getConfig ( SwitchboardConstants . INDEX_DIST_IDLESLEEP , "5000" ) ) ,
Long . parseLong ( getConfig ( SwitchboardConstants . INDEX_DIST_BUSYSLEEP , "0" ) ) ,
Long . parseLong ( getConfig ( SwitchboardConstants . INDEX_DIST_BUSYSLEEP , "0" ) ) ,
@ -1138,28 +1204,20 @@ public final class Switchboard extends serverSwitch {
"720_ccimport" ,
"720_ccimport" ,
"Content Control Import" ,
"Content Control Import" ,
"this is the content control import thread" ,
"this is the content control import thread" ,
null ,
null ,
new InstantBusyThread (
InstantBusyThread . createFromRunnable (
new SMWListSyncThread ( this , sb . getConfig ( "contentcontrol.bookmarklist" , "contentcontrol" ) , "Category:Content Source" , "/?Url/?Filter/?Category/?Modification date" , sb . getConfigBool (
new SMWListSyncThread ( this , sb . getConfig ( "contentcontrol.bookmarklist" , "contentcontrol" ) ,
"contentcontrol.smwimport.purgelistoninit" , false ) ) ,
"Category:Content Source" , "/?Url/?Filter/?Category/?Modification date" ,
"run" ,
sb . getConfigBool ( "contentcontrol.smwimport.purgelistoninit" , false ) ) ,
SwitchboardConstants . PEER_PING_METHOD_JOBCOUNT ,
3000 , 3000 ) ,
SwitchboardConstants . PEER_PING_METHOD_FREEMEM ,
3000 ,
3000 ) ,
2000 ) ;
2000 ) ;
deployThread (
deployThread (
"730_ccfilter" ,
"730_ccfilter" ,
"Content Control Filter" ,
"Content Control Filter" ,
"this is the content control filter update thread" ,
"this is the content control filter update thread" ,
null ,
null ,
new InstantBusyThread (
InstantBusyThread . createFromRunnable ( new ContentControlFilterUpdateThread ( this ) , 3000 , 3000 ) ,
new ContentControlFilterUpdateThread ( this ) ,
"run" ,
SwitchboardConstants . PEER_PING_METHOD_JOBCOUNT ,
SwitchboardConstants . PEER_PING_METHOD_FREEMEM ,
3000 ,
3000 ) ,
2000 ) ;
2000 ) ;
// set network-specific performance attributes
// set network-specific performance attributes
@ -1513,6 +1571,8 @@ public final class Switchboard extends serverSwitch {
this . peers . mySeed ( ) . setFlagAcceptRemoteCrawl ( activate ) ;
this . peers . mySeed ( ) . setFlagAcceptRemoteCrawl ( activate ) ;
if ( activate ) {
if ( activate ) {
this . crawlQueues . initRemoteCrawlQueues ( ) ;
this . crawlQueues . initRemoteCrawlQueues ( ) ;
final CrawlQueues queues = this . crawlQueues ;
BusyThread rct = getThread ( SwitchboardConstants . CRAWLJOB_REMOTE_TRIGGERED_CRAWL ) ;
BusyThread rct = getThread ( SwitchboardConstants . CRAWLJOB_REMOTE_TRIGGERED_CRAWL ) ;
if ( rct = = null ) {
if ( rct = = null ) {
@ -1521,13 +1581,19 @@ public final class Switchboard extends serverSwitch {
"Remote Crawl Job" ,
"Remote Crawl Job" ,
"thread that performes a single crawl/indexing step triggered by a remote peer" ,
"thread that performes a single crawl/indexing step triggered by a remote peer" ,
"/IndexCreateQueues_p.html?stack=REMOTE" ,
"/IndexCreateQueues_p.html?stack=REMOTE" ,
new InstantBusyThread (
new InstantBusyThread ( "CrawlQueues.remoteTriggeredCrawlJob" , 0 , 0 ) {
this . crawlQueues ,
SwitchboardConstants . CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_START ,
@Override
SwitchboardConstants . CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_JOBCOUNT ,
public boolean jobImpl ( ) throws Exception {
SwitchboardConstants . CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_FREEMEM ,
return queues . remoteTriggeredCrawlJob ( ) ;
0 ,
}
0 ) ,
@Override
public int getJobCount ( ) {
return queues . remoteTriggeredCrawlJobSize ( ) ;
}
} ,
10000 ) ;
10000 ) ;
rct = getThread ( SwitchboardConstants . CRAWLJOB_REMOTE_TRIGGERED_CRAWL ) ;
rct = getThread ( SwitchboardConstants . CRAWLJOB_REMOTE_TRIGGERED_CRAWL ) ;
}
}
@ -1541,13 +1607,12 @@ public final class Switchboard extends serverSwitch {
"Remote Crawl URL Loader" ,
"Remote Crawl URL Loader" ,
"thread that loads remote crawl lists from other peers" ,
"thread that loads remote crawl lists from other peers" ,
null ,
null ,
new InstantBusyThread (
new InstantBusyThread ( "CrawlQueues.remoteCrawlLoaderJob" , 10000 , 10000 ) {
this . crawlQueues ,
@Override
SwitchboardConstants . CRAWLJOB_REMOTE_CRAWL_LOADER_METHOD_START ,
public boolean jobImpl ( ) throws Exception {
SwitchboardConstants . CRAWLJOB_REMOTE_CRAWL_LOADER_METHOD_JOBCOUNT ,
return queues . remoteCrawlLoaderJob ( ) ;
SwitchboardConstants . CRAWLJOB_REMOTE_CRAWL_LOADER_METHOD_FREEMEM ,
}
10000 ,
} ,
10000 ) ,
10000 ) ;
10000 ) ;
rcl = getThread ( SwitchboardConstants . CRAWLJOB_REMOTE_CRAWL_LOADER ) ;
rcl = getThread ( SwitchboardConstants . CRAWLJOB_REMOTE_CRAWL_LOADER ) ;
@ -1569,18 +1634,19 @@ public final class Switchboard extends serverSwitch {
if ( activate ) {
if ( activate ) {
BusyThread acr = getThread ( SwitchboardConstants . CRAWLJOB_AUTOCRAWL ) ;
BusyThread acr = getThread ( SwitchboardConstants . CRAWLJOB_AUTOCRAWL ) ;
if ( acr = = null ) {
if ( acr = = null ) {
final CrawlQueues queues = this . crawlQueues ;
deployThread (
deployThread (
SwitchboardConstants . CRAWLJOB_AUTOCRAWL ,
SwitchboardConstants . CRAWLJOB_AUTOCRAWL ,
"Autocrawl" ,
"Autocrawl" ,
"Thread that selects and automatically adds crawling jobs to the local queue" ,
"Thread that selects and automatically adds crawling jobs to the local queue" ,
null ,
null ,
new InstantBusyThread (
new InstantBusyThread ( "CrawlQueues.autocrawlJob" , 10000 , 10000 ) {
this . crawlQueues ,
@Override
SwitchboardConstants . CRAWLJOB_AUTOCRAWL_METHOD_START ,
public boolean jobImpl ( ) throws Exception {
SwitchboardConstants . CRAWLJOB_AUTOCRAWL_METHOD_JOBCOUNT ,
return queues . autocrawlJob ( ) ;
SwitchboardConstants . CRAWLJOB_AUTOCRAWL_METHOD_FREEMEM ,
}
10000 ,
} ,
10000 ) ,
10000 ) ;
10000 ) ;
acr = getThread ( SwitchboardConstants . CRAWLJOB_AUTOCRAWL ) ;
acr = getThread ( SwitchboardConstants . CRAWLJOB_AUTOCRAWL ) ;
@ -2832,6 +2898,11 @@ public final class Switchboard extends serverSwitch {
}
}
}
}
/ * *
* Parse a response to produce a new document to add to the index .
* < strong > Important : < / strong > this method is called using reflection as a Workflow process and must therefore remain public .
* @param in an indexing workflow entry containing a response to parse
* /
public IndexingQueueEntry parseDocument ( final IndexingQueueEntry in ) {
public IndexingQueueEntry parseDocument ( final IndexingQueueEntry in ) {
in . queueEntry . updateStatus ( Response . QUEUE_STATE_PARSING ) ;
in . queueEntry . updateStatus ( Response . QUEUE_STATE_PARSING ) ;
Document [ ] documents = null ;
Document [ ] documents = null ;
@ -3009,6 +3080,10 @@ public final class Switchboard extends serverSwitch {
return documents ;
return documents ;
}
}
/ * *
* < strong > Important : < / strong > this method is called using reflection as a Workflow process and must therefore remain public .
* @param in an indexing workflow entry containing a response and the related parsed document ( s )
* /
public IndexingQueueEntry condenseDocument ( final IndexingQueueEntry in ) {
public IndexingQueueEntry condenseDocument ( final IndexingQueueEntry in ) {
in . queueEntry . updateStatus ( Response . QUEUE_STATE_CONDENSING ) ;
in . queueEntry . updateStatus ( Response . QUEUE_STATE_CONDENSING ) ;
CrawlProfile profile = in . queueEntry . profile ( ) ;
CrawlProfile profile = in . queueEntry . profile ( ) ;
@ -3083,6 +3158,11 @@ public final class Switchboard extends serverSwitch {
return new IndexingQueueEntry ( in . queueEntry , in . documents , condenser ) ;
return new IndexingQueueEntry ( in . queueEntry , in . documents , condenser ) ;
}
}
/ * *
* Perform web structure analysis on parsed documents and update the web structure graph .
* < strong > Important : < / strong > this method is called using reflection as a Workflow process and must therefore remain public .
* @param in an indexing workflow entry containing parsed document ( s )
* /
public IndexingQueueEntry webStructureAnalysis ( final IndexingQueueEntry in ) {
public IndexingQueueEntry webStructureAnalysis ( final IndexingQueueEntry in ) {
in . queueEntry . updateStatus ( Response . QUEUE_STATE_STRUCTUREANALYSIS ) ;
in . queueEntry . updateStatus ( Response . QUEUE_STATE_STRUCTUREANALYSIS ) ;
for ( Document document : in . documents ) {
for ( Document document : in . documents ) {
@ -3096,6 +3176,11 @@ public final class Switchboard extends serverSwitch {
return in ;
return in ;
}
}
/ * *
* Store a new entry to the local index .
* < strong > Important : < / strong > this method is called using reflection as a Workflow process and must therefore remain public .
* @param in an indexing workflow entry containing parsed document ( s ) and a condenser instance
* /
public void storeDocumentIndex ( final IndexingQueueEntry in ) {
public void storeDocumentIndex ( final IndexingQueueEntry in ) {
in . queueEntry . updateStatus ( Response . QUEUE_STATE_INDEXSTORAGE ) ;
in . queueEntry . updateStatus ( Response . QUEUE_STATE_INDEXSTORAGE ) ;
// the condenser may be null in case that an indexing is not wanted (there may be a no-indexing flag in the file)
// the condenser may be null in case that an indexing is not wanted (there may be a no-indexing flag in the file)