@ -785,20 +785,20 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
}
public void switchNetwork ( final String networkDefinition ) {
log . logInfo ( "SWITCH NETWORK: switching to '" + networkDefinition + "'" ) ;
// pause crawls
final boolean lcp = crawlJobIsPaused ( plasmaSwitchboardConstants . CRAWLJOB_LOCAL_CRAWL ) ;
if ( ! lcp ) pauseCrawlJob ( plasmaSwitchboardConstants . CRAWLJOB_LOCAL_CRAWL ) ;
final boolean rcp = crawlJobIsPaused ( plasmaSwitchboardConstants . CRAWLJOB_REMOTE_TRIGGERED_CRAWL ) ;
if ( ! rcp ) pauseCrawlJob ( plasmaSwitchboardConstants . CRAWLJOB_REMOTE_TRIGGERED_CRAWL ) ;
// trigger online caution
proxyLastAccess = System . currentTimeMillis ( ) + 10000 ; // at least 10 seconds online caution to prevent unnecessary action on database meanwhile
proxyLastAccess = System . currentTimeMillis ( ) + 3000 ; // at least 3 seconds online caution to prevent unnecessary action on database meanwhile
log . logInfo ( "SWITCH NETWORK: SHUT DOWN OF OLD INDEX DATABASE..." ) ;
// clean search events which have cached relations to the old index
QueryEvent . cleanupEvents ( true ) ;
// switch the networks
synchronized ( this ) {
// shut down
synchronized ( this . indexSegment ) {
this . indexSegment . close ( ) ;
@ -809,6 +809,8 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
this . robots . close ( ) ;
this . crawlQueues . close ( ) ;
log . logInfo ( "SWITCH NETWORK: START UP OF NEW INDEX DATABASE..." ) ;
// start up
setConfig ( "network.unit.definition" , networkDefinition ) ;
overwriteNetworkDefinition ( ) ;
@ -846,14 +848,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
log ,
this . queuesRoot ) ;
// we need a new stacker, because this uses network-specific attributes to sort out urls (local, global)
this . crawlStacker = new CrawlStacker (
this . crawlQueues ,
this . crawler ,
this . indexSegment ,
this . peers ,
"local.any" . indexOf ( getConfig ( "network.unit.domain" , "global" ) ) > = 0 ,
"global.any" . indexOf ( getConfig ( "network.unit.domain" , "global" ) ) > = 0 ) ;
// create new web structure
this . webStructure = new plasmaWebStructure ( log , rankingPath , "LOCAL/010_cr/" , getConfig ( "CRDist0Path" , plasmaRankingDistribution . CR_OWN ) , new File ( queuesRoot , "webStructure.map" ) ) ;
@ -873,11 +867,21 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
this . getConfigLong ( "minimumLocalDelta" , this . crawlQueues . noticeURL . getMinimumLocalDelta ( ) ) ,
this . getConfigLong ( "minimumGlobalDelta" , this . crawlQueues . noticeURL . getMinimumGlobalDelta ( ) ) ) ;
// we need a new stacker, because this uses network-specific attributes to sort out urls (local, global)
this . crawlStacker = new CrawlStacker (
this . crawlQueues ,
this . crawler ,
this . indexSegment ,
this . peers ,
"local.any" . indexOf ( getConfig ( "network.unit.domain" , "global" ) ) > = 0 ,
"global.any" . indexOf ( getConfig ( "network.unit.domain" , "global" ) ) > = 0 ) ;
}
// start up crawl jobs
continueCrawlJob ( plasmaSwitchboardConstants . CRAWLJOB_LOCAL_CRAWL ) ;
continueCrawlJob ( plasmaSwitchboardConstants . CRAWLJOB_REMOTE_TRIGGERED_CRAWL ) ;
this . log . logInfo ( "switched network to " + networkDefinition ) ;
log . logInfo ( "SWITCH NETWORK: FINISHED START UP, new network is now '" + networkDefinition + "'." ) ;
// check status of account configuration: when local url crawling is allowed, it is not allowed
// that an automatic authorization of localhost is done, because in this case crawls from local
// addresses are blocked to prevent attack szenarios where remote pages contain links to localhost
@ -1032,11 +1036,17 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
new RankingProfile ( "" , crypt . simpleDecode ( sb . getConfig ( "rankingProfile" , "" ) , null ) ) ;
}
public boolean onlineCaution ( ) {
return
( System . currentTimeMillis ( ) - this . proxyLastAccess < Integer . parseInt ( getConfig ( plasmaSwitchboardConstants . PROXY_ONLINE_CAUTION_DELAY , "30000" ) ) ) | |
( System . currentTimeMillis ( ) - this . localSearchLastAccess < Integer . parseInt ( getConfig ( plasmaSwitchboardConstants . LOCALSEACH_ONLINE_CAUTION_DELAY , "30000" ) ) ) | |
( System . currentTimeMillis ( ) - this . remoteSearchLastAccess < Integer . parseInt ( getConfig ( plasmaSwitchboardConstants . REMOTESEARCH_ONLINE_CAUTION_DELAY , "30000" ) ) ) ;
/ * *
* checks if the proxy , the local search or remote search was accessed some time before
* If no limit is exceeded , null is returned . If a limit is exceeded ,
* then the name of the service that caused the caution is returned
* @return
* /
public String onlineCaution ( ) {
if ( System . currentTimeMillis ( ) - this . proxyLastAccess < Integer . parseInt ( getConfig ( plasmaSwitchboardConstants . PROXY_ONLINE_CAUTION_DELAY , "30000" ) ) ) return "proxy" ;
if ( System . currentTimeMillis ( ) - this . localSearchLastAccess < Integer . parseInt ( getConfig ( plasmaSwitchboardConstants . LOCALSEACH_ONLINE_CAUTION_DELAY , "30000" ) ) ) return "localsearch" ;
if ( System . currentTimeMillis ( ) - this . remoteSearchLastAccess < Integer . parseInt ( getConfig ( plasmaSwitchboardConstants . REMOTESEARCH_ONLINE_CAUTION_DELAY , "30000" ) ) ) return "remotesearch" ;
return null ;
}
private static String ppRamString ( long bytes ) {
@ -1113,7 +1123,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
doIndexing = false ;
}
synchronized ( crawler . queuePre Stack) {
synchronized ( crawler . indexing Stack) {
/ * = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
* STORING DATA
*
@ -1149,7 +1159,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
if ( doIndexing & & supportError = = null ) {
// enqueue for further crawling
enQueue ( this . crawler . queuePre Stack. newEntry (
enQueue ( this . crawler . indexing Stack. newEntry (
entry . url ( ) ,
( entry . referrerURL ( ) = = null ) ? null : entry . referrerURL ( ) . hash ( ) ,
entry . ifModifiedSince ( ) ,
@ -1179,13 +1189,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
terminateAllThreads ( true ) ;
log . logConfig ( "SWITCHBOARD SHUTDOWN STEP 2: sending termination signal to threaded indexing" ) ;
// closing all still running db importer jobs
dhtDispatcher . close ( ) ;
indexingDocumentProcessor . announceShutdown ( ) ;
indexingDocumentProcessor . awaitShutdown ( 12000 ) ;
crawlStacker . announceClose ( ) ;
indexingCondensementProcessor . announceShutdown ( ) ;
indexingAnalysisProcessor . announceShutdown ( ) ;
indexingStorageProcessor . announceShutdown ( ) ;
dhtDispatcher . close ( ) ;
indexingCondensementProcessor . awaitShutdown ( 12000 ) ;
indexingAnalysisProcessor . awaitShutdown ( 12000 ) ;
indexingStorageProcessor . awaitShutdown ( 12000 ) ;
@ -1213,13 +1223,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
}
public int queueSize ( ) {
return crawler . queuePre Stack. size ( ) ;
return crawler . indexing Stack. size ( ) ;
}
public void enQueue ( final IndexingStack . QueueEntry job ) {
assert job ! = null ;
try {
crawler . queuePre Stack. push ( job ) ;
crawler . indexing Stack. push ( job ) ;
} catch ( final IOException e ) {
log . logSevere ( "IOError in plasmaSwitchboard.enQueue: " + e . getMessage ( ) , e ) ;
}
@ -1234,24 +1244,24 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
public IndexingStack . QueueEntry deQueue ( ) {
// getting the next entry from the indexing queue
IndexingStack . QueueEntry nextentry = null ;
synchronized ( crawler . queuePre Stack) {
synchronized ( crawler . indexing Stack) {
// do one processing step
if ( this . log . isFine ( ) ) log . logFine ( "DEQUEUE: sbQueueSize=" + crawler . queuePre Stack. size ( ) +
if ( this . log . isFine ( ) ) log . logFine ( "DEQUEUE: sbQueueSize=" + crawler . indexing Stack. size ( ) +
", coreStackSize=" + crawlQueues . noticeURL . stackSize ( NoticedURL . STACK_TYPE_CORE ) +
", limitStackSize=" + crawlQueues . noticeURL . stackSize ( NoticedURL . STACK_TYPE_LIMIT ) +
", overhangStackSize=" + crawlQueues . noticeURL . stackSize ( NoticedURL . STACK_TYPE_OVERHANG ) +
", remoteStackSize=" + crawlQueues . noticeURL . stackSize ( NoticedURL . STACK_TYPE_REMOTE ) ) ;
try {
final int sizeBefore = crawler . queuePre Stack. size ( ) ;
final int sizeBefore = crawler . indexing Stack. size ( ) ;
if ( sizeBefore = = 0 ) return null ;
nextentry = crawler . queuePre Stack. pop ( ) ;
nextentry = crawler . indexing Stack. pop ( ) ;
if ( nextentry = = null ) {
log . logWarning ( "deQueue: null entry on queue stack." ) ;
if ( crawler . queuePre Stack. size ( ) = = sizeBefore ) {
if ( crawler . indexing Stack. size ( ) = = sizeBefore ) {
// this is a severe problem: because this time a null is returned, it means that this status will last forever
// to re-enable use of the sbQueue, it must be emptied completely
log . logSevere ( "deQueue: does not shrink after pop() == null. Emergency reset of sbQueue" ) ;
crawler . queuePre Stack. clear ( ) ;
crawler . indexing Stack. clear ( ) ;
}
return null ;
}
@ -1285,7 +1295,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
// create a queue entry
Document document = surrogate . document ( ) ;
queueentry = this . crawler . queuePre Stack. newEntry ( surrogate . url ( ) , null , null , false , null , 0 , this . crawler . defaultSurrogateProfile . handle ( ) , null ) ;
queueentry = this . crawler . indexing Stack. newEntry ( surrogate . url ( ) , null , null , false , null , 0 , this . crawler . defaultSurrogateProfile . handle ( ) , null ) ;
/ *
* public QueueEntry newEntry ( final yacyURL url , final String referrer , final Date ifModifiedSince , final boolean requestWithCookie ,
final String initiator , final int depth , final String profilehandle , final String anchorName )
@ -1311,13 +1321,12 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
public boolean deQueueProcess ( ) {
try {
// work off fresh entries from the proxy or from the crawler
if ( onlineCaution ( ) ) {
if ( this . log . isFine ( ) ) log . logFine ( "deQueue: online caution, omitting resource stack processing" ) ;
String cautionCause = onlineCaution ( ) ;
if ( cautionCause ! = null ) {
if ( this . log . isFine ( ) ) log . logFine ( "deQueue: online caution for " + cautionCause + ", omitting resource stack processing" ) ;
return false ;
}
boolean doneSomething = false ;
// check for interruption
checkInterruption ( ) ;
@ -1334,9 +1343,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
}
// getting the next entry from the indexing queue
if ( crawler . queuePre Stack. size ( ) = = 0 ) {
//log.logFine("deQueue: nothing to do, queue is emtpy");
return doneSomething ; // nothing to do
if ( crawler . indexing Stack. size ( ) = = 0 ) {
if ( log . isFinest ( ) ) log . logFinest ( "deQueue: nothing to do, queue is emtpy" ) ;
return false ; // nothing to do
}
// if we were interrupted we should return now
@ -1347,10 +1356,14 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
// get next queue entry and start a queue processing
final IndexingStack . QueueEntry queueEntry = deQueue ( ) ;
if ( queueEntry = = null ) return true ;
if ( queueEntry = = null ) {
if ( this . log . isFine ( ) ) log . logFine ( "deQueue: queue entry is null" ) ;
return false ;
}
if ( queueEntry . profile ( ) = = null ) {
queueEntry . close ( ) ;
return true ;
if ( this . log . isFine ( ) ) log . logFine ( "deQueue: profile is null" ) ;
return false ;
}
// check if the document should be indexed
@ -1366,19 +1379,18 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
if ( noIndexReason ! = null ) {
// this document should not be indexed. log cause and close queue
final yacyURL referrerURL = queueEntry . referrerURL ( this . indexSegment . urlMetadata ( ) ) ;
log . logFine ( " N ot indexed any word in URL " + queueEntry . url ( ) + "; cause: " + noIndexReason ) ;
if ( log . isFine ( ) ) log . logFine ( " deQueue: n ot indexed any word in URL " + queueEntry . url ( ) + "; cause: " + noIndexReason ) ;
addURLtoErrorDB ( queueEntry . url ( ) , ( referrerURL = = null ) ? "" : referrerURL . hash ( ) , queueEntry . initiator ( ) , queueEntry . anchorName ( ) , noIndexReason ) ;
// finish this entry
return true ;
}
// put document into the concurrent processing queue
crawler . queuePreStack . enQueueToActive ( queueEntry ) ;
// check for interruption
checkInterruption ( ) ;
// enqueue to indexing queue
// put document into the concurrent processing queue
if ( log . isFinest ( ) ) log . logFinest ( "deQueue: passing entry to indexing queue" ) ;
this . crawler . indexingStack . store ( queueEntry ) ;
this . indexingDocumentProcessor . enQueue ( new indexingQueueEntry ( queueEntry , null , null ) ) ;
return true ;
} catch ( final InterruptedException e ) {
@ -1879,7 +1891,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
public String toString ( ) {
// it is possible to use this method in the cgi pages.
// actually it is used there for testing purpose
return "PROPS: " + super . toString ( ) + "; QUEUE: " + crawler . queuePre Stack. toString ( ) ;
return "PROPS: " + super . toString ( ) + "; QUEUE: " + crawler . indexing Stack. toString ( ) ;
}
// method for index deletion
@ -2027,8 +2039,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
}
public String dhtShallTransfer ( ) {
if ( onlineCaution ( ) ) {
return "online caution, dht transmission" ;
String cautionCause = onlineCaution ( ) ;
if ( cautionCause ! = null ) {
return "online caution for " + cautionCause + ", dht transmission" ;
}
if ( this . peers = = null ) {
return "no DHT distribution: seedDB == null" ;
@ -2055,10 +2068,10 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
return "no DHT distribution: not enough words - wordIndex.size() = " + indexSegment . termIndex ( ) . sizesMax ( ) ;
}
if ( ( getConfig ( plasmaSwitchboardConstants . INDEX_DIST_ALLOW_WHILE_CRAWLING , "false" ) . equalsIgnoreCase ( "false" ) ) & & ( crawlQueues . noticeURL . notEmptyLocal ( ) ) ) {
return "no DHT distribution: crawl in progress: noticeURL.stackSize() = " + crawlQueues . noticeURL . size ( ) + ", sbQueue.size() = " + crawler . queuePre Stack. size ( ) ;
return "no DHT distribution: crawl in progress: noticeURL.stackSize() = " + crawlQueues . noticeURL . size ( ) + ", sbQueue.size() = " + crawler . indexing Stack. size ( ) ;
}
if ( ( getConfig ( plasmaSwitchboardConstants . INDEX_DIST_ALLOW_WHILE_INDEXING , "false" ) . equalsIgnoreCase ( "false" ) ) & & ( crawler . queuePre Stack. size ( ) > 1 ) ) {
return "no DHT distribution: indexing in progress: noticeURL.stackSize() = " + crawlQueues . noticeURL . size ( ) + ", sbQueue.size() = " + crawler . queuePre Stack. size ( ) ;
if ( ( getConfig ( plasmaSwitchboardConstants . INDEX_DIST_ALLOW_WHILE_INDEXING , "false" ) . equalsIgnoreCase ( "false" ) ) & & ( crawler . indexing Stack. size ( ) > 1 ) ) {
return "no DHT distribution: indexing in progress: noticeURL.stackSize() = " + crawlQueues . noticeURL . size ( ) + ", sbQueue.size() = " + crawler . indexing Stack. size ( ) ;
}
return null ; // this means; yes, please do dht transfer
}
@ -2275,7 +2288,6 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
public boolean waitForShutdown ( ) throws InterruptedException {
this . shutdownSync . P ( ) ;
dhtDispatcher . close ( ) ;
return this . terminate ;
}