@ -122,7 +122,6 @@ import java.util.Iterator;
import java.util.Map ;
import java.util.Set ;
import java.util.TreeSet ;
import java.util.logging.Level ;
import de.anomic.data.blogBoard ;
import de.anomic.data.bookmarksDB ;
@ -155,6 +154,7 @@ import de.anomic.server.serverObjects;
import de.anomic.server.serverSemaphore ;
import de.anomic.server.serverSwitch ;
import de.anomic.server.serverFileUtils ;
import de.anomic.server.serverThread ;
import de.anomic.server.logging.serverLog ;
import de.anomic.tools.bitfield ;
import de.anomic.tools.crypt ;
@ -176,6 +176,21 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
private int dhtTransferIndexCount = 50 ;
// we must distinguish the following cases: resource-load was initiated by
// 1) global crawling: the index is extern, not here (not possible here)
// 2) result of search queries, some indexes are here (not possible here)
// 3) result of index transfer, some of them are here (not possible here)
// 4) proxy-load (initiator is "------------")
// 5) local prefetch/crawling (initiator is own seedHash)
// 6) local fetching for global crawling (other known or unknwon initiator)
public static final int PROCESSCASE_0_UNKNOWN = 0 ;
public static final int PROCESSCASE_1_GLOBAL_CRAWLING = 1 ;
public static final int PROCESSCASE_2_SEARCH_QUERY_RESULT = 2 ;
public static final int PROCESSCASE_3_INDEX_TRANSFER_RESULT = 3 ;
public static final int PROCESSCASE_4_PROXY_LOAD = 4 ;
public static final int PROCESSCASE_5_LOCAL_CRAWLING = 5 ;
public static final int PROCESSCASE_6_GLOBAL_CRAWLING = 6 ;
// couloured list management
public static TreeSet badwords = null ;
public static TreeSet blueList = null ;
@ -958,92 +973,106 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
}
public boolean deQueue ( ) {
// work off fresh entries from the proxy or from the crawler
if ( onlineCaution ( ) ) {
log . logFine ( "deQueue: online caution, omitting resource stack processing" ) ;
return false ;
}
try {
// work off fresh entries from the proxy or from the crawler
if ( onlineCaution ( ) ) {
log . logFine ( "deQueue: online caution, omitting resource stack processing" ) ;
return false ;
}
// flush some entries from the RAM cache
// (new permanent cache flushing)
wordIndex . flushCacheSome ( sbQueue . size ( ) ! = 0 ) ;
boolean doneSomething = false ;
// possibly delete entries from last chunk
if ( ( this . dhtTransferChunk ! = null ) & &
( this . dhtTransferChunk . getStatus ( ) = = plasmaDHTChunk . chunkStatus_COMPLETE ) ) {
int deletedURLs = this . dhtTransferChunk . deleteTransferIndexes ( ) ;
this . log . logFine ( "Deleted from " + this . dhtTransferChunk . containers ( ) . length + " transferred RWIs locally, removed " + deletedURLs + " URL references" ) ;
this . dhtTransferChunk = null ;
}
// generate a dht chunk
if ( ( dhtShallTransfer ( ) = = null ) & &
( ( this . dhtTransferChunk = = null ) | |
( this . dhtTransferChunk . getStatus ( ) = = plasmaDHTChunk . chunkStatus_UNDEFINED ) | |
// (this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) ||
( this . dhtTransferChunk . getStatus ( ) = = plasmaDHTChunk . chunkStatus_FAILED ) ) ) {
// generate new chunk
int minChunkSize = ( int ) getConfigLong ( "indexDistribution.minChunkSize" , 30 ) ;
dhtTransferChunk = new plasmaDHTChunk ( this . log , this . wordIndex , this . urlPool . loadedURL , minChunkSize , dhtTransferIndexCount ) ;
doneSomething = true ;
}
// flush some entries from the RAM cache
// (new permanent cache flushing)
wordIndex . flushCacheSome ( sbQueue . size ( ) ! = 0 ) ;
synchronized ( sbQueue ) {
boolean doneSomething = false ;
if ( sbQueue . size ( ) = = 0 ) {
//log.logFine("deQueue: nothing to do, queue is emtpy");
return doneSomething ; // nothing to do
// possibly delete entries from last chunk
if ( ( this . dhtTransferChunk ! = null ) & &
( this . dhtTransferChunk . getStatus ( ) = = plasmaDHTChunk . chunkStatus_COMPLETE ) ) {
int deletedURLs = this . dhtTransferChunk . deleteTransferIndexes ( ) ;
this . log . logFine ( "Deleted from " + this . dhtTransferChunk . containers ( ) . length + " transferred RWIs locally, removed " + deletedURLs + " URL references" ) ;
this . dhtTransferChunk = null ;
}
/ *
if ( wordIndex . wordCacheRAMSize ( ) + 1000 > ( int ) getConfigLong ( "wordCacheMaxLow" , 8000 ) ) {
log . logFine ( "deQueue: word index ram cache too full (" + ( ( int ) getConfigLong ( "wordCacheMaxLow" , 8000 ) - wordIndex . wordCacheRAMSize ( ) ) + " slots left); dismissed to omit ram flush lock" ) ;
return false ;
}
* /
int stackCrawlQueueSize ;
if ( ( stackCrawlQueueSize = sbStackCrawlThread . size ( ) ) > = stackCrawlSlots ) {
log . logFine ( "deQueue: too many processes in stack crawl thread queue, dismissed to protect emergency case (" + "stackCrawlQueue=" + stackCrawlQueueSize + ")" ) ;
return doneSomething ;
// generate a dht chunk
if (
( dhtShallTransfer ( ) = = null ) & &
(
( this . dhtTransferChunk = = null ) | |
( this . dhtTransferChunk . getStatus ( ) = = plasmaDHTChunk . chunkStatus_UNDEFINED ) | |
// (this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) ||
( this . dhtTransferChunk . getStatus ( ) = = plasmaDHTChunk . chunkStatus_FAILED )
)
) {
// generate new chunk
int minChunkSize = ( int ) getConfigLong ( "indexDistribution.minChunkSize" , 30 ) ;
dhtTransferChunk = new plasmaDHTChunk ( this . log , this . wordIndex , this . urlPool . loadedURL , minChunkSize , dhtTransferIndexCount ) ;
doneSomething = true ;
}
plasmaSwitchboardQueue . Entry nextentry ;
// if we were interrupted we should return now
if ( Thread . currentThread ( ) . isInterrupted ( ) ) {
log . logFine ( "deQueue: thread was interrupted" ) ;
return false ;
}
// do one processing step
log . logFine ( "DEQUEUE: sbQueueSize=" + sbQueue . size ( ) +
", coreStackSize=" + urlPool . noticeURL . stackSize ( plasmaCrawlNURL . STACK_TYPE_CORE ) +
", limitStackSize=" + urlPool . noticeURL . stackSize ( plasmaCrawlNURL . STACK_TYPE_LIMIT ) +
", overhangStackSize=" + urlPool . noticeURL . stackSize ( plasmaCrawlNURL . STACK_TYPE_OVERHANG ) +
", remoteStackSize=" + urlPool . noticeURL . stackSize ( plasmaCrawlNURL . STACK_TYPE_REMOTE ) ) ;
try {
nextentry = sbQueue . pop ( ) ;
if ( nextentry = = null ) {
log . logFine ( "deQueue: null entry on queue stack" ) ;
// check for interruption
checkInterruption ( ) ;
// getting the next entry from the indexing queue
synchronized ( sbQueue ) {
if ( sbQueue . size ( ) = = 0 ) {
//log.logFine("deQueue: nothing to do, queue is emtpy");
return doneSomething ; // nothing to do
}
/ *
if ( wordIndex . wordCacheRAMSize ( ) + 1000 > ( int ) getConfigLong ( "wordCacheMaxLow" , 8000 ) ) {
log . logFine ( "deQueue: word index ram cache too full (" + ( ( int ) getConfigLong ( "wordCacheMaxLow" , 8000 ) - wordIndex . wordCacheRAMSize ( ) ) + " slots left); dismissed to omit ram flush lock" ) ;
return false ;
}
} catch ( IOException e ) {
log . logSevere ( "IOError in plasmaSwitchboard.deQueue: " + e . getMessage ( ) , e ) ;
return doneSomething ;
}
synchronized ( this . indexingTasksInProcess ) {
this . indexingTasksInProcess . put ( nextentry . urlHash ( ) , nextentry ) ;
* /
int stackCrawlQueueSize ;
if ( ( stackCrawlQueueSize = sbStackCrawlThread . size ( ) ) > = stackCrawlSlots ) {
log . logFine ( "deQueue: too many processes in stack crawl thread queue, dismissed to protect emergency case (" + "stackCrawlQueue=" + stackCrawlQueueSize + ")" ) ;
return doneSomething ;
}
plasmaSwitchboardQueue . Entry nextentry ;
// if we were interrupted we should return now
if ( Thread . currentThread ( ) . isInterrupted ( ) ) {
log . logFine ( "deQueue: thread was interrupted" ) ;
return false ;
}
// do one processing step
log . logFine ( "DEQUEUE: sbQueueSize=" + sbQueue . size ( ) +
", coreStackSize=" + urlPool . noticeURL . stackSize ( plasmaCrawlNURL . STACK_TYPE_CORE ) +
", limitStackSize=" + urlPool . noticeURL . stackSize ( plasmaCrawlNURL . STACK_TYPE_LIMIT ) +
", overhangStackSize=" + urlPool . noticeURL . stackSize ( plasmaCrawlNURL . STACK_TYPE_OVERHANG ) +
", remoteStackSize=" + urlPool . noticeURL . stackSize ( plasmaCrawlNURL . STACK_TYPE_REMOTE ) ) ;
try {
nextentry = sbQueue . pop ( ) ;
if ( nextentry = = null ) {
log . logFine ( "deQueue: null entry on queue stack" ) ;
return false ;
}
} catch ( IOException e ) {
log . logSevere ( "IOError in plasmaSwitchboard.deQueue: " + e . getMessage ( ) , e ) ;
return doneSomething ;
}
synchronized ( this . indexingTasksInProcess ) {
this . indexingTasksInProcess . put ( nextentry . urlHash ( ) , nextentry ) ;
}
// parse and index the resource
processResourceStack ( nextentry ) ;
}
processResourceStack ( nextentry ) ;
// ready & finished
return true ;
} catch ( InterruptedException e ) {
log . logInfo ( "DEQUEUE: Shutdown detected." ) ;
return false ;
}
// ready & finished
return true ;
}
public int cleanupJobSize ( ) {
@ -1350,7 +1379,44 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
}
}
private void processResourceStack ( plasmaSwitchboardQueue . Entry entry ) {
private plasmaParserDocument parseResource ( plasmaSwitchboardQueue . Entry entry , String initiatorHash ) throws InterruptedException {
plasmaParserDocument document = null ;
// the http header that belongs to this entry
httpHeader entryRespHeader = entry . responseHeader ( ) ;
// the mimetype of this entry
String mimeType = ( entryRespHeader = = null ) ? null : entryRespHeader . mime ( ) ;
// the parser logger
serverLog parserLogger = parser . getLogger ( ) ;
// if the document content is supported we can start to parse the content
if ( plasmaParser . supportedContent (
entry . url ( ) ,
mimeType )
) {
if ( ( entry . cacheFile ( ) . exists ( ) ) & & ( entry . cacheFile ( ) . length ( ) > 0 ) ) {
parserLogger . logFine ( "'" + entry . normalizedURLString ( ) + "' is not parsed yet, parsing now from File" ) ;
document = parser . parseSource ( entry . url ( ) , mimeType , entry . cacheFile ( ) ) ;
} else {
parserLogger . logFine ( "'" + entry . normalizedURLString ( ) + "' cannot be parsed, no resource available" ) ;
addURLtoErrorDB ( entry . url ( ) , entry . referrerHash ( ) , initiatorHash , entry . anchorName ( ) , plasmaCrawlEURL . DENIED_NOT_PARSEABLE_NO_CONTENT , new bitfield ( indexURL . urlFlagLength ) ) ;
}
if ( document = = null ) {
parserLogger . logSevere ( "'" + entry . normalizedURLString ( ) + "' parse failure" ) ;
addURLtoErrorDB ( entry . url ( ) , entry . referrerHash ( ) , initiatorHash , entry . anchorName ( ) , plasmaCrawlEURL . DENIED_PARSER_ERROR , new bitfield ( indexURL . urlFlagLength ) ) ;
}
} else {
parserLogger . logFine ( "'" + entry . normalizedURLString ( ) + "'. Unsupported mimeType '" + ( ( mimeType = = null ) ? "null" : mimeType ) + "'." ) ;
addURLtoErrorDB ( entry . url ( ) , entry . referrerHash ( ) , initiatorHash , entry . anchorName ( ) , plasmaCrawlEURL . DENIED_WRONG_MIMETYPE_OR_EXT , new bitfield ( indexURL . urlFlagLength ) ) ;
}
checkInterruption ( ) ;
return document ;
}
private void processResourceStack ( plasmaSwitchboardQueue . Entry entry ) throws InterruptedException {
try {
// work off one stack entry with a fresh resource
long stackStartTime = 0 , stackEndTime = 0 ,
@ -1365,58 +1431,41 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// 4) proxy-load (initiator is "------------")
// 5) local prefetch/crawling (initiator is own seedHash)
// 6) local fetching for global crawling (other known or unknwon initiator)
int processCase = 0 ;
yacySeed initiator = null ;
String initiator Hash = ( entry . proxy ( ) ) ? indexURL . dummyHash : entry . initiator ( ) ;
if ( initiator Hash. equals ( indexURL . dummyHash ) ) {
int processCase = PROCESSCASE_0_UNKNOWN ;
yacySeed initiator Peer = null ;
String initiator Peer Hash = ( entry . proxy ( ) ) ? indexURL . dummyHash : entry . initiator ( ) ;
if ( initiator Peer Hash. equals ( indexURL . dummyHash ) ) {
// proxy-load
processCase = 4 ;
} else if ( initiator Hash. equals ( yacyCore . seedDB . mySeed . hash ) ) {
processCase = PROCESSCASE_4_PROXY_LOAD ;
} else if ( initiator Peer Hash. equals ( yacyCore . seedDB . mySeed . hash ) ) {
// normal crawling
processCase = 5 ;
processCase = PROCESSCASE_5_LOCAL_CRAWLING ;
} else {
// this was done for remote peer (a global crawl)
initiator = yacyCore . seedDB . getConnected ( initiato rHash) ;
processCase = 6 ;
initiator Peer = yacyCore . seedDB . getConnected ( initiato rPee rHash) ;
processCase = PROCESSCASE_6_GLOBAL_CRAWLING ;
}
log . logFine ( "processResourceStack processCase=" + processCase +
", depth=" + entry . depth ( ) +
", maxDepth=" + ( ( entry . profile ( ) = = null ) ? "null" : Integer . toString ( entry . profile ( ) . generalDepth ( ) ) ) +
", filter=" + ( ( entry . profile ( ) = = null ) ? "null" : entry . profile ( ) . generalFilter ( ) ) +
", initiatorHash=" + initiator Hash +
", initiatorHash=" + initiator Peer Hash +
", responseHeader=" + ( ( entry . responseHeader ( ) = = null ) ? "null" : entry . responseHeader ( ) . toString ( ) ) +
", url=" + entry . url ( ) ) ; // DEBUG
// parse content
parsingStartTime = System . currentTimeMillis ( ) ;
/ * = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
* PARSE CONTENT
* = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = * /
plasmaParserDocument document = null ;
httpHeader entryRespHeader = entry . responseHeader ( ) ;
String mimeType = ( entryRespHeader = = null ) ? null : entryRespHeader . mime ( ) ;
if ( plasmaParser . supportedContent (
entry . url ( ) ,
mimeType )
) {
if ( ( entry . cacheFile ( ) . exists ( ) ) & & ( entry . cacheFile ( ) . length ( ) > 0 ) ) {
log . logFine ( "(Parser) '" + entry . normalizedURLString ( ) + "' is not parsed yet, parsing now from File" ) ;
document = parser . parseSource ( entry . url ( ) , mimeType , entry . cacheFile ( ) ) ;
} else {
log . logFine ( "(Parser) '" + entry . normalizedURLString ( ) + "' cannot be parsed, no resource available" ) ;
addURLtoErrorDB ( entry . url ( ) , entry . referrerHash ( ) , initiatorHash , entry . anchorName ( ) , plasmaCrawlEURL . DENIED_NOT_PARSEABLE_NO_CONTENT , new bitfield ( indexURL . urlFlagLength ) ) ;
return ;
}
if ( document = = null ) {
log . logSevere ( "(Parser) '" + entry . normalizedURLString ( ) + "' parse failure" ) ;
addURLtoErrorDB ( entry . url ( ) , entry . referrerHash ( ) , initiatorHash , entry . anchorName ( ) , plasmaCrawlEURL . DENIED_PARSER_ERROR , new bitfield ( indexURL . urlFlagLength ) ) ;
return ;
}
} else {
log . logFine ( "(Parser) '" + entry . normalizedURLString ( ) + "'. Unsupported mimeType '" + ( ( mimeType = = null ) ? "null" : mimeType ) + "'." ) ;
addURLtoErrorDB ( entry . url ( ) , entry . referrerHash ( ) , initiatorHash , entry . anchorName ( ) , plasmaCrawlEURL . DENIED_WRONG_MIMETYPE_OR_EXT , new bitfield ( indexURL . urlFlagLength ) ) ;
return ;
}
parsingEndTime = System . currentTimeMillis ( ) ;
parsingStartTime = System . currentTimeMillis ( ) ;
document = this . parseResource ( entry , initiatorPeerHash ) ;
if ( document = = null ) return ;
parsingEndTime = System . currentTimeMillis ( ) ;
// getting the document date
Date docDate = null ;
if ( entry . responseHeader ( ) ! = null ) {
docDate = entry . responseHeader ( ) . lastModified ( ) ;
@ -1424,103 +1473,152 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
}
if ( docDate = = null ) docDate = new Date ( ) ;
// put anchors on crawl stack
/ * = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
* put anchors on crawl stack
* = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = * /
stackStartTime = System . currentTimeMillis ( ) ;
if ( ( ( processCase = = 4 ) | | ( processCase = = 5 ) ) & &
( ( entry . profile ( ) = = null ) | | ( entry . depth ( ) < entry . profile ( ) . generalDepth ( ) ) ) ) {
if (
( ( processCase = = PROCESSCASE_4_PROXY_LOAD ) | | ( processCase = = PROCESSCASE_5_LOCAL_CRAWLING ) ) & &
( ( entry . profile ( ) = = null ) | | ( entry . depth ( ) < entry . profile ( ) . generalDepth ( ) ) )
) {
Map hl = document . getHyperlinks ( ) ;
Iterator i = hl . entrySet ( ) . iterator ( ) ;
String nexturlstring ;
//String rejectReason;
Map . Entry e ;
String nextUrlString ;
Map . Entry nextEntry ;
while ( i . hasNext ( ) ) {
e = ( Map . Entry ) i . next ( ) ;
nexturlstring = ( String ) e . getKey ( ) ;
try { nexturlstring = new URL ( nexturlstring ) . toNormalform ( ) ; } catch ( MalformedURLException e1 ) { }
// check for interruption
checkInterruption ( ) ;
sbStackCrawlThread . enqueue ( nexturlstring , entry . url ( ) . toString ( ) , initiatorHash , ( String ) e . getValue ( ) , docDate , entry . depth ( ) + 1 , entry . profile ( ) ) ;
// rejectReason = stackCrawl(nexturlstring, entry.normalizedURLString(), initiatorHash, (String) e.getValue(), loadDate, entry.depth() + 1, entry.profile());
// if (rejectReason == null) { c++; } else {
// urlPool.errorURL.newEntry(new URL(nexturlstring), entry.normalizedURLString(), entry.initiator(), yacyCore.seedDB.mySeed.hash,
// (String) e.getValue(), rejectReason, new bitfield(indexURL.urlFlagLength), false);
// }
// fetching the next hyperlink
nextEntry = ( Map . Entry ) i . next ( ) ;
nextUrlString = ( String ) nextEntry . getKey ( ) ;
try {
nextUrlString = new URL ( nextUrlString ) . toNormalform ( ) ;
// enqueue the hyperlink into the pre-notice-url db
sbStackCrawlThread . enqueue ( nextUrlString , entry . url ( ) . toString ( ) , initiatorPeerHash , ( String ) nextEntry . getValue ( ) , docDate , entry . depth ( ) + 1 , entry . profile ( ) ) ;
} catch ( MalformedURLException e1 ) { }
}
log . logInfo ( "CRAWL: ADDED " + hl . size ( ) + " LINKS FROM " + entry . normalizedURLString ( ) +
", NEW CRAWL STACK SIZE IS " + urlPool . noticeURL . stackSize ( plasmaCrawlNURL . STACK_TYPE_CORE ) ) ;
}
stackEndTime = System . currentTimeMillis ( ) ;
// create index
String descr = document . getMainLongTitle ( ) ;
String referrerHash ;
/ * = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
* CREATE INDEX
* = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = * /
String docDescription = document . getMainLongTitle ( ) ;
URL referrerURL = entry . referrerURL ( ) ;
referrerHash = indexURL . urlHash ( referrerURL ) ;
if ( referrer Hash = = null ) referrer Hash = indexURL . dummyHash ;
String referrerUrl Hash = indexURL . urlHash ( referrerURL ) ;
if ( referrer Url Hash = = null ) referrer Url Hash = indexURL . dummyHash ;
String noIndexReason = plasmaCrawlEURL . DENIED_UNSPECIFIED_INDEXING_ERROR ;
if ( processCase = = 4 ) {
if ( processCase = = PROCESSCASE_4_PROXY_LOAD ) {
// proxy-load
noIndexReason = entry . shallIndexCacheForProxy ( ) ;
} else {
// normal crawling
noIndexReason = entry . shallIndexCacheForCrawler ( ) ;
}
if ( noIndexReason = = null ) {
// strip out words
indexingStartTime = System . currentTimeMillis ( ) ;
checkInterruption ( ) ;
log . logFine ( "Condensing for '" + entry . normalizedURLString ( ) + "'" ) ;
plasmaCondenser condenser = new plasmaCondenser ( new ByteArrayInputStream ( document . getText ( ) ) ) ;
// generate citation reference
Integer [ ] ioLinks = generateCitationReference ( entry . urlHash ( ) , docDate , document , condenser ) ;
//log.logInfo("INDEXING HEADLINE:" + descr);
try {
//log.logDebug("Create LURL-Entry for '" + entry.normalizedURLString() + "', " +
// "responseHeader=" + entry.responseHeader().toString());
try {
// check for interruption
checkInterruption ( ) ;
// create a new loaded URL db entry
plasmaCrawlLURL . Entry newEntry = urlPool . loadedURL . newEntry (
entry . url ( ) , descr , docDate , new Date ( ) ,
referrerHash ,
0 , true ,
condenser . RESULT_WORD_ENTROPHY ,
indexEntryAttribute . language ( entry . url ( ) ) ,
indexEntryAttribute . docType ( document . getMimeType ( ) ) ,
( int ) entry . size ( ) ,
condenser . RESULT_NUMB_WORDS
entry . url ( ) , // URL
docDescription , // document description
docDate , // modification date
new Date ( ) , // loaded date
referrerUrlHash , // referer hash
0 , // copy count
true , // local need
condenser . RESULT_WORD_ENTROPHY , // quality
indexEntryAttribute . language ( entry . url ( ) ) , // language
indexEntryAttribute . docType ( document . getMimeType ( ) ) , // doctype
( int ) entry . size ( ) , // size
condenser . RESULT_NUMB_WORDS // word count
) ;
/ * = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
* STORE URL TO LOADED - URL - DB
* = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = * /
newEntry . store ( ) ;
urlPool . loadedURL . stackEntry (
newEntry ,
initiatorHash ,
yacyCore . seedDB . mySeed . hash ,
processCase
) ;
String urlHash = newEntry . hash ( ) ;
newEntry , // loaded url db entry
initiatorPeerHash , // initiator peer hash
yacyCore . seedDB . mySeed . hash , // executor peer hash
processCase // process case
) ;
if ( ( ( processCase = = 4 ) | | ( processCase = = 5 ) | | ( processCase = = 6 ) ) & & ( entry . profile ( ) . localIndexing ( ) ) ) {
// remove stopwords
// check for interruption
checkInterruption ( ) ;
/ * = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
* STORE WORD INDEX
* = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = * /
if (
(
( processCase = = PROCESSCASE_4_PROXY_LOAD ) | |
( processCase = = PROCESSCASE_5_LOCAL_CRAWLING ) | |
( processCase = = PROCESSCASE_6_GLOBAL_CRAWLING )
) & &
( entry . profile ( ) . localIndexing ( ) )
) {
String urlHash = newEntry . hash ( ) ;
// remove stopwords
log . logInfo ( "Excluded " + condenser . excludeWords ( stopwords ) + " words in URL " + entry . url ( ) ) ;
indexingEndTime = System . currentTimeMillis ( ) ;
// do indexing
//log.logDebug("Create Index for '" + entry.normalizedURLString() + "'");
storageStartTime = System . currentTimeMillis ( ) ;
int words = 0 ;
String storagePeerHash ;
yacySeed seed ;
if ( ( ( storagePeerHash = getConfig ( "storagePeerHash" , null ) ) = = null ) | |
if (
( ( storagePeerHash = getConfig ( "storagePeerHash" , null ) ) = = null ) | |
( storagePeerHash . trim ( ) . length ( ) = = 0 ) | |
( ( seed = yacyCore . seedDB . getConnected ( storagePeerHash ) ) = = null ) ) {
words = wordIndex . addPageIndex ( entry . url ( ) , urlHash , docDate , ( int ) entry . size ( ) , document , condenser ,
indexEntryAttribute . language ( entry . url ( ) ) , indexEntryAttribute . docType ( document . getMimeType ( ) ) ,
ioLinks [ 0 ] . intValue ( ) , ioLinks [ 1 ] . intValue ( ) ) ;
( ( seed = yacyCore . seedDB . getConnected ( storagePeerHash ) ) = = null )
) {
/ * = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
* STORE PAGE INDEX INTO WORD INDEX DB
* = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = * /
words = wordIndex . addPageIndex (
entry . url ( ) , // document url
urlHash , // document url hash
docDate , // document mod date
( int ) entry . size ( ) , // document size
document , // document content
condenser , // document condenser
indexEntryAttribute . language ( entry . url ( ) ) , // document language
indexEntryAttribute . docType ( document . getMimeType ( ) ) , // document type
ioLinks [ 0 ] . intValue ( ) , // outlinkSame
ioLinks [ 1 ] . intValue ( ) // outlinkOthers
) ;
} else {
/ * = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
* SEND PAGE INDEX TO STORAGE PEER
* = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = * /
HashMap urlCache = new HashMap ( 1 ) ;
urlCache . put ( newEntry . hash ( ) , newEntry ) ;
ArrayList tmpContainers = new ArrayList ( condenser . RESULT_SIMI_WORDS ) ;
String language = indexEntryAttribute . language ( entry . url ( ) ) ;
String language = indexEntryAttribute . language ( entry . url ( ) ) ;
char doctype = indexEntryAttribute . docType ( document . getMimeType ( ) ) ;
int urlLength = newEntry . url ( ) . toString ( ) . length ( ) ;
int urlComps = htmlFilterContentScraper . urlComps ( newEntry . url ( ) . toString ( ) ) . length ;
@ -1535,56 +1633,71 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
wordStat = ( plasmaCondenser . wordStatProp ) wentry . getValue ( ) ;
String wordHash = indexEntryAttribute . word2hash ( word ) ;
indexContainer wordIdxContainer = new indexRowSetContainer ( wordHash ) ;
indexEntry wordIdxEntry = new indexURLEntry ( urlHash ,
urlLength , urlComps ,
wordStat . count ,
document . longTitle . length ( ) ,
condenser . RESULT_SIMI_WORDS ,
condenser . RESULT_SIMI_SENTENCES ,
wordStat . posInText ,
wordStat . posInPhrase ,
wordStat . numOfPhrase ,
0 ,
newEntry . size ( ) ,
docDate . getTime ( ) ,
System . currentTimeMillis ( ) ,
condenser . RESULT_WORD_ENTROPHY ,
language ,
doctype ,
ioLinks [ 0 ] . intValue ( ) ,
ioLinks [ 1 ] . intValue ( ) ,
true ) ;
indexEntry wordIdxEntry = new indexURLEntry (
urlHash ,
urlLength , urlComps ,
wordStat . count ,
document . longTitle . length ( ) ,
condenser . RESULT_SIMI_WORDS ,
condenser . RESULT_SIMI_SENTENCES ,
wordStat . posInText ,
wordStat . posInPhrase ,
wordStat . numOfPhrase ,
0 ,
newEntry . size ( ) ,
docDate . getTime ( ) ,
System . currentTimeMillis ( ) ,
condenser . RESULT_WORD_ENTROPHY ,
language ,
doctype ,
ioLinks [ 0 ] . intValue ( ) ,
ioLinks [ 1 ] . intValue ( ) ,
true
) ;
wordIdxContainer . add ( wordIdxEntry ) ;
tmpContainers . add ( wordIdxContainer ) ;
// wordIndex.addEntries(plasmaWordIndexEntryContainer.instantContainer(wordHash, System.currentTimeMillis(), entry));
}
//System.out.println("DEBUG: plasmaSearch.addPageIndex: added " + condenser.getWords().size() + " words, flushed " + c + " entries");
words = condenser . RESULT_SIMI_WORDS ;
// transfering the index to the storage peer
indexContainer [ ] indexData = ( indexContainer [ ] ) tmpContainers . toArray ( new indexContainer [ tmpContainers . size ( ) ] ) ;
HashMap resultObj = yacyClient . transferIndex (
seed ,
( indexContainer [ ] ) tmpContainers . toArray ( new indexContainer [ tmpContainers . size ( ) ] ) ,
urlCache ,
true ,
120000 ) ;
seed , // target seed
indexData , // word index data
urlCache , // urls
true , // gzip body
120000 // transfer timeout
) ;
// check for interruption
checkInterruption ( ) ;
// if the transfer failed we try to store the index locally
String error = ( String ) resultObj . get ( "result" ) ;
if ( error ! = null ) {
words = wordIndex . addPageIndex ( entry . url ( ) , urlHash , docDate , ( int ) entry . size ( ) ,
document , condenser ,
indexEntryAttribute . language ( entry . url ( ) ) ,
indexEntryAttribute . docType ( document . getMimeType ( ) ) ,
ioLinks [ 0 ] . intValue ( ) , ioLinks [ 1 ] . intValue ( ) ) ;
words = wordIndex . addPageIndex (
entry . url ( ) ,
urlHash ,
docDate ,
( int ) entry . size ( ) ,
document ,
condenser ,
indexEntryAttribute . language ( entry . url ( ) ) ,
indexEntryAttribute . docType ( document . getMimeType ( ) ) ,
ioLinks [ 0 ] . intValue ( ) ,
ioLinks [ 1 ] . intValue ( )
) ;
}
tmpContainers = null ;
}
storageEndTime = System . currentTimeMillis ( ) ;
if ( log . isLoggable ( Level . INFO ) ) {
if ( log . is Info( ) ) {
log . logInfo ( "*Indexed " + words + " words in URL " + entry . url ( ) +
" [" + entry . urlHash ( ) + "]" +
"\n\tDescription: " + d escr +
"\n\tDescription: " + d ocD escription +
"\n\tMimeType: " + document . getMimeType ( ) + " | " +
"Size: " + document . text . length + " bytes | " +
"Anchors: " + ( ( document . anchors = = null ) ? 0 : document . anchors . size ( ) ) +
@ -1594,46 +1707,59 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
"StorageTime: " + ( storageEndTime - storageStartTime ) + " ms" ) ;
}
// check for interruption
checkInterruption ( ) ;
// if this was performed for a remote crawl request, notify requester
if ( ( processCase = = 6 ) & & ( initiator ! = null ) ) {
log . logInfo ( "Sending crawl receipt for '" + entry . normalizedURLString ( ) + "' to " + initiator . getName ( ) ) ;
yacyClient . crawlReceipt ( initiator , "crawl" , "fill" , "indexed" , newEntry , "" ) ;
if ( ( processCase = = PROCESSCASE_6_GLOBAL_CRAWLING ) & & ( initiato rPee r ! = null ) ) {
log . logInfo ( "Sending crawl receipt for '" + entry . normalizedURLString ( ) + "' to " + initiator Peer . getName ( ) ) ;
yacyClient . crawlReceipt ( initiator Peer , "crawl" , "fill" , "indexed" , newEntry , "" ) ;
}
} else {
log . logFine ( "Not Indexed Resource '" + entry . normalizedURLString ( ) + "': process case=" + processCase ) ;
addURLtoErrorDB ( entry . url ( ) , referrer Hash, initiator Hash, d escr, plasmaCrawlEURL . DENIED_UNKNOWN_INDEXING_PROCESS_CASE , new bitfield ( indexURL . urlFlagLength ) ) ;
addURLtoErrorDB ( entry . url ( ) , referrer Url Hash, initiator Peer Hash, d ocD escription , plasmaCrawlEURL . DENIED_UNKNOWN_INDEXING_PROCESS_CASE , new bitfield ( indexURL . urlFlagLength ) ) ;
}
} catch ( Exception ee ) {
if ( ee instanceof InterruptedException ) throw ( InterruptedException ) ee ;
// check for interruption
checkInterruption ( ) ;
log . logSevere ( "Could not index URL " + entry . url ( ) + ": " + ee . getMessage ( ) , ee ) ;
if ( ( processCase = = 6 ) & & ( initiator ! = null ) ) {
yacyClient . crawlReceipt ( initiator , "crawl" , "exception" , ee . getMessage ( ) , null , "" ) ;
if ( ( processCase = = PROCESSCASE_6_GLOBAL_CRAWLING ) & & ( initiato rPee r ! = null ) ) {
yacyClient . crawlReceipt ( initiator Peer , "crawl" , "exception" , ee . getMessage ( ) , null , "" ) ;
}
addURLtoErrorDB ( entry . url ( ) , referrerHash , initiatorHash , descr , plasmaCrawlEURL . DENIED_UNSPECIFIED_INDEXING_ERROR , new bitfield ( indexURL . urlFlagLength ) ) ;
addURLtoErrorDB ( entry . url ( ) , referrer Url Hash, initiator Peer Hash, d ocD escription , plasmaCrawlEURL . DENIED_UNSPECIFIED_INDEXING_ERROR , new bitfield ( indexURL . urlFlagLength ) ) ;
}
} else {
// check for interruption
checkInterruption ( ) ;
log . logInfo ( "Not indexed any word in URL " + entry . url ( ) + "; cause: " + noIndexReason ) ;
addURLtoErrorDB ( entry . url ( ) , referrerHash , initiatorHash , descr , noIndexReason , new bitfield ( indexURL . urlFlagLength ) ) ;
if ( ( processCase = = 6 ) & & ( initiator ! = null ) ) {
yacyClient . crawlReceipt ( initiator , "crawl" , "rejected" , noIndexReason , null , "" ) ;
addURLtoErrorDB ( entry . url ( ) , referrer Url Hash, initiator Peer Hash, d ocD escription , noIndexReason , new bitfield ( indexURL . urlFlagLength ) ) ;
if ( ( processCase = = PROCESSCASE_6_GLOBAL_CRAWLING ) & & ( initiato rPee r ! = null ) ) {
yacyClient . crawlReceipt ( initiator Peer , "crawl" , "rejected" , noIndexReason , null , "" ) ;
}
}
document = null ;
} finally {
checkInterruption ( ) ;
// The following code must be into the finally block, otherwise it will not be executed
// on errors!
// removing current entry from in process list
synchronized ( this . indexingTasksInProcess ) {
this . indexingTasksInProcess . remove ( entry . urlHash ( ) ) ;
}
// removing current entry from notice URL queue
boolean removed = urlPool . noticeURL . remove ( entry . urlHash ( ) ) ; // worked-off
if ( ! removed ) {
log . logFinest ( "Unable to remove indexed URL " + entry . url ( ) + " from Crawler Queue. This could be because of an URL redirect." ) ;
}
// explicit delete/free resources
if ( ( entry ! = null ) & & ( entry . profile ( ) ! = null ) & & ( ! ( entry . profile ( ) . storeHTCache ( ) ) ) ) {
plasmaHTCache . filesInUse . remove ( entry . cacheFile ( ) ) ;
@ -2242,10 +2368,13 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
Iterator seedIter = seeds . iterator ( ) ;
ArrayList transfer = new ArrayList ( peerCount ) ;
while ( hc1 < peerCount & & ( transfer . size ( ) > 0 | | seedIter . hasNext ( ) ) ) {
// starting up some transfer threads
int transferThreadCount = transfer . size ( ) ;
for ( int i = 0 ; i < peerCount - hc1 - transferThreadCount ; i + + ) {
// check for interruption
checkInterruption ( ) ;
if ( seedIter . hasNext ( ) ) {
plasmaDHTTransfer t = new plasmaDHTTransfer ( log , ( yacySeed ) seedIter . next ( ) , dhtChunk , gzipBody , timeout , retries ) ;
t . start ( ) ;
@ -2258,6 +2387,9 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// waiting for the transfer threads to finish
Iterator transferIter = transfer . iterator ( ) ;
while ( transferIter . hasNext ( ) ) {
// check for interruption
checkInterruption ( ) ;
plasmaDHTTransfer t = ( plasmaDHTTransfer ) transferIter . next ( ) ;
if ( ! t . isAlive ( ) ) {
// remove finished thread from the list
@ -2311,6 +2443,12 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
this . urlPool . errorURL . stackPushEntry ( ee ) ;
}
public void checkInterruption ( ) throws InterruptedException {
Thread curThread = Thread . currentThread ( ) ;
if ( ( curThread instanceof serverThread ) & & ( ( serverThread ) curThread ) . shutdownInProgress ( ) ) throw new InterruptedException ( "Shutdown in progress ..." ) ;
else if ( this . terminate | | curThread . isInterrupted ( ) ) throw new InterruptedException ( "Shutdown in progress ..." ) ;
}
public void terminate ( long delay ) {
if ( delay < = 0 ) throw new IllegalArgumentException ( "The shutdown delay must be greater than 0." ) ;
( new delayedShutdown ( this , delay ) ) . start ( ) ;