@ -28,151 +28,73 @@
package de.anomic.crawler ;
import java.io.File ;
import java.io.IOException ;
import java.net.UnknownHostException ;
import java.util.ArrayList ;
import java.util.Date ;
import java.util.Iterator ;
import java.util.LinkedList ;
import de.anomic.index.indexReferenceBlacklist ;
import de.anomic.index.indexURLReference ;
import de.anomic.kelondro.kelondroCache ;
import de.anomic.kelondro.kelondroEcoTable ;
import de.anomic.kelondro.kelondroException ;
import de.anomic.kelondro.kelondroIndex ;
import de.anomic.kelondro.kelondroRow ;
import de.anomic.kelondro.kelondroRowSet ;
import de.anomic.kelondro.kelondroTree ;
import de.anomic.plasma.plasmaSwitchboard ;
import de.anomic.plasma.plasmaWordIndex ;
import de.anomic.server.serverDomains ;
import de.anomic.server.logging.serverLog ;
import de.anomic.yacy.yacyURL ;
public final class CrawlStacker extends Thread {
private static final int EcoFSBufferSize = 20 ;
private static String stackfile = "urlNoticeStacker9.db" ;
// keys for different database types
public static final int QUEUE_DB_TYPE_RAM = 0 ;
public static final int QUEUE_DB_TYPE_TREE = 1 ;
public static final int QUEUE_DB_TYPE_ECO = 2 ;
public final class CrawlStacker {
final serverLog log = new serverLog ( "STACKCRAWL" ) ;
private final plasmaSwitchboard sb ;
private final LinkedList < String > urlEntryHashCache ;
private kelondroIndex urlEntryCache ;
private final File cacheStacksPath ;
private final int dbtype ;
private final boolean prequeue ;
private long dnsHit , dnsMiss ;
private int alternateCount ;
private final LinkedList < String > urlEntryHashCache ; // the order how this queue is processed; entries with known DNS entries go first
private kelondroIndex urlEntryCache ; // the entries in the queue
private long dnsHit , dnsMiss ;
private int alternateCount ;
private CrawlQueues nextQueue ;
private plasmaWordIndex wordIndex ;
private boolean acceptLocalURLs , acceptGlobalURLs ;
// objects for the prefetch task
private final ArrayList < String > dnsfetchHosts = new ArrayList < String > ( ) ;
public CrawlStacker ( final plasmaSwitchboard sb , final File dbPath , final int dbtype , final boolean prequeue ) {
this . sb = sb ;
this . prequeue = prequeue ;
// this is the process that checks url for double-occurrences and for allowance/disallowance by robots.txt
public CrawlStacker ( CrawlQueues cq , plasmaWordIndex wordIndex , boolean acceptLocalURLs , boolean acceptGlobalURLs ) {
this . nextQueue = cq ;
this . wordIndex = wordIndex ;
this . dnsHit = 0 ;
this . dnsMiss = 0 ;
this . alternateCount = 0 ;
this . acceptLocalURLs = acceptLocalURLs ;
this . acceptGlobalURLs = acceptGlobalURLs ;
// init the message list
this . urlEntryHashCache = new LinkedList < String > ( ) ;
// create a stack for newly entered entries
this . cacheStacksPath = dbPath ;
this . dbtype = dbtype ;
openDB ( ) ;
try {
// loop through the list and fill the messageList with url hashs
final Iterator < kelondroRow . Entry > rows = this . urlEntryCache . rows ( true , null ) ;
kelondroRow . Entry entry ;
while ( rows . hasNext ( ) ) {
entry = rows . next ( ) ;
if ( entry = = null ) {
System . out . println ( "ERROR! null element found" ) ;
continue ;
}
this . urlEntryHashCache . add ( entry . getColString ( 0 , null ) ) ;
}
} catch ( final kelondroException e ) {
/* if we have an error, we start with a fresh database */
CrawlStacker . this . log . logSevere ( "Unable to initialize crawl stacker queue, kelondroException:" + e . getMessage ( ) + ". Reseting DB.\n" , e ) ;
// deleting old db and creating a new db
try { this . urlEntryCache . close ( ) ; } catch ( final Exception ex ) { }
deleteDB ( ) ;
openDB ( ) ;
} catch ( final IOException e ) {
/* if we have an error, we start with a fresh database */
CrawlStacker . this . log . logSevere ( "Unable to initialize crawl stacker queue, IOException:" + e . getMessage ( ) + ". Reseting DB.\n" , e ) ;
// deleting old db and creating a new db
try { this . urlEntryCache . close ( ) ; } catch ( final Exception ex ) { }
deleteDB ( ) ;
openDB ( ) ;
}
this . log . logInfo ( size ( ) + " entries in the stackCrawl queue." ) ;
this . start ( ) ; // start the prefetcher thread
this . urlEntryCache = new kelondroRowSet ( CrawlEntry . rowdef , 0 ) ;
this . log . logInfo ( "STACKCRAWL thread initialized." ) ;
}
public void run ( ) {
String nextHost ;
try {
while ( ! Thread . currentThread ( ) . isInterrupted ( ) ) { // action loop
if ( dnsfetchHosts . size ( ) = = 0 ) synchronized ( this ) { wait ( ) ; }
synchronized ( dnsfetchHosts ) {
nextHost = dnsfetchHosts . remove ( dnsfetchHosts . size ( ) - 1 ) ;
}
try {
serverDomains . dnsResolve ( nextHost ) ;
} catch ( final Exception e ) { }
}
} catch ( final InterruptedException e ) { }
}
public boolean prefetchHost ( final String host ) {
// returns true when the host was known in the dns cache.
// If not, the host is stacked on the fetch stack and false is returned
try {
serverDomains . dnsResolveFromCache ( host ) ;
return true ;
} catch ( final UnknownHostException e ) {
synchronized ( this ) {
dnsfetchHosts . add ( host ) ;
notifyAll ( ) ;
}
return false ;
}
}
public void terminateDNSPrefetcher ( ) {
synchronized ( this ) {
interrupt ( ) ;
public int size ( ) {
synchronized ( this . urlEntryHashCache ) {
return this . urlEntryHashCache . size ( ) ;
}
}
public void clear ( ) throws IOException {
this . urlEntryHashCache . clear ( ) ;
this . urlEntryCache . clear ( ) ;
}
public void close ( ) {
if ( this . dbtype = = QUEUE_DB_TYPE_RAM ) {
this . log . logInfo ( "Shutdown. Flushing remaining " + size ( ) + " crawl stacker job entries. please wait." ) ;
while ( size ( ) > 0 ) {
if ( ! job ( ) ) break ;
}
this . log . logInfo ( "Shutdown. Flushing remaining " + size ( ) + " crawl stacker job entries. please wait." ) ;
while ( size ( ) > 0 ) {
if ( ! job ( ) ) break ;
}
terminateDNSPrefetcher ( ) ;
this . log . logInfo ( "Shutdown. Closing stackCrawl queue." ) ;
@ -182,26 +104,68 @@ public final class CrawlStacker extends Thread {
// clearing the hash list
this . urlEntryHashCache . clear ( ) ;
}
private boolean prefetchHost ( final String host ) {
// returns true when the host was known in the dns cache.
// If not, the host is stacked on the fetch stack and false is returned
try {
serverDomains . dnsResolveFromCache ( host ) ;
return true ;
} catch ( final UnknownHostException e ) {
synchronized ( this ) {
dnsfetchHosts . add ( host ) ;
notifyAll ( ) ;
}
return false ;
}
}
public boolean job ( ) {
CrawlEntry entry ;
// this is the method that is called by the busy thread from outside
if ( this . urlEntryHashCache . size ( ) = = 0 ) return false ;
// get the next entry from the queue
String urlHash = null ;
kelondroRow . Entry ec = null ;
synchronized ( this . urlEntryHashCache ) {
urlHash = this . urlEntryHashCache . removeFirst ( ) ;
if ( urlHash = = null ) {
urlEntryHashCache . clear ( ) ;
try {
urlEntryCache . clear ( ) ;
} catch ( IOException e ) {
e . printStackTrace ( ) ;
}
return false ;
}
try {
ec = this . urlEntryCache . remove ( urlHash . getBytes ( ) ) ;
} catch ( IOException e ) {
e . printStackTrace ( ) ;
return false ;
}
}
if ( urlHash = = null | | ec = = null ) return false ;
// make a crawl Entry out of it
CrawlEntry entry = null ;
try {
entry = dequeueEntry ( ) ;
} catch ( final IOException e ) {
e . printStackTrace ( ) ;
entry = new CrawlEntry ( ec ) ;
} catch ( IOException e 1 ) {
e 1 . printStackTrace ( ) ;
return false ;
}
if ( entry = = null ) return false ;
try {
final String rejectReason = sb . crawlStacker . stackCrawl ( entry ) ;
final String rejectReason = stackCrawl ( entry ) ;
// if the url was rejected we store it into the error URL db
if ( rejectReason ! = null ) {
final ZURL . Entry ee = sb . crawlQueues . errorURL . newEntry ( entry , sb . webIndex . seedDB . mySeed ( ) . hash , new Date ( ) , 1 , rejectReason ) ;
final ZURL . Entry ee = nextQueue. errorURL . newEntry ( entry , word Index. seedDB . mySeed ( ) . hash , new Date ( ) , 1 , rejectReason ) ;
ee . store ( ) ;
sb . crawlQueues . errorURL . push ( ee ) ;
nextQueue . errorURL . push ( ee ) ;
}
} catch ( final Exception e ) {
CrawlStacker . this . log . logWarning ( "Error while processing stackCrawl entry.\n" + "Entry: " + entry . toString ( ) + "Error: " + e . toString ( ) , e ) ;
@ -270,8 +234,6 @@ public final class CrawlStacker extends Thread {
synchronized ( this . urlEntryHashCache ) {
kelondroRow . Entry oldValue ;
boolean hostknown = true ;
if ( prequeue ) hostknown = prefetchHost ( nexturl . getHost ( ) ) ;
try {
oldValue = this . urlEntryCache . put ( newEntryRow ) ;
} catch ( final IOException e ) {
@ -279,7 +241,7 @@ public final class CrawlStacker extends Thread {
}
if ( oldValue = = null ) {
//System.out.println("*** debug crawlStacker dnsHit=" + this.dnsHit + ", dnsMiss=" + this.dnsMiss + ", alternateCount=" + this.alternateCount + ((this.dnsMiss > 0) ? (", Q=" + (this.dnsHit / this.dnsMiss)) : ""));
if ( hostknown ) {
if ( prefetchHost( nexturl . getHost ( ) ) ) {
this . alternateCount + + ;
this . urlEntryHashCache . addFirst ( newEntry . url ( ) . hash ( ) ) ;
this . dnsHit + + ;
@ -297,79 +259,9 @@ public final class CrawlStacker extends Thread {
}
}
private void deleteDB ( ) {
if ( this . dbtype = = QUEUE_DB_TYPE_RAM ) {
// do nothing..
return ;
}
if ( this . dbtype = = QUEUE_DB_TYPE_ECO ) {
new File ( cacheStacksPath , stackfile ) . delete ( ) ;
//kelondroFlexWidthArray.delete(cacheStacksPath, stackfile);
}
if ( this . dbtype = = QUEUE_DB_TYPE_TREE ) {
final File cacheFile = new File ( cacheStacksPath , stackfile ) ;
cacheFile . delete ( ) ;
}
}
private void openDB ( ) {
if ( ! ( cacheStacksPath . exists ( ) ) ) cacheStacksPath . mkdir ( ) ; // make the path
if ( this . dbtype = = QUEUE_DB_TYPE_RAM ) {
this . urlEntryCache = new kelondroRowSet ( CrawlEntry . rowdef , 0 ) ;
}
if ( this . dbtype = = QUEUE_DB_TYPE_ECO ) {
cacheStacksPath . mkdirs ( ) ;
final File f = new File ( cacheStacksPath , stackfile ) ;
try {
this . urlEntryCache = new kelondroEcoTable ( f , CrawlEntry . rowdef , kelondroEcoTable . tailCacheUsageAuto , EcoFSBufferSize , 0 ) ;
//this.urlEntryCache = new kelondroCache(new kelondroFlexTable(cacheStacksPath, newCacheName, preloadTime, CrawlEntry.rowdef, 0, true));
} catch ( final Exception e ) {
e . printStackTrace ( ) ;
// kill DB and try again
f . delete ( ) ;
//kelondroFlexTable.delete(cacheStacksPath, newCacheName);
try {
this . urlEntryCache = new kelondroEcoTable ( f , CrawlEntry . rowdef , kelondroEcoTable . tailCacheUsageAuto , EcoFSBufferSize , 0 ) ;
//this.urlEntryCache = new kelondroCache(new kelondroFlexTable(cacheStacksPath, newCacheName, preloadTime, CrawlEntry.rowdef, 0, true));
} catch ( final Exception ee ) {
ee . printStackTrace ( ) ;
System . exit ( - 1 ) ;
}
}
}
if ( this . dbtype = = QUEUE_DB_TYPE_TREE ) {
final File cacheFile = new File ( cacheStacksPath , stackfile ) ;
cacheFile . getParentFile ( ) . mkdirs ( ) ;
this . urlEntryCache = new kelondroCache ( kelondroTree . open ( cacheFile , true , 0 , CrawlEntry . rowdef ) ) ;
}
}
public int size ( ) {
synchronized ( this . urlEntryHashCache ) {
return this . urlEntryHashCache . size ( ) ;
}
}
public int getDBType ( ) {
return this . dbtype ;
}
public CrawlEntry dequeueEntry ( ) throws IOException {
if ( this . urlEntryHashCache . size ( ) = = 0 ) return null ;
String urlHash = null ;
kelondroRow . Entry entry = null ;
synchronized ( this . urlEntryHashCache ) {
urlHash = this . urlEntryHashCache . removeFirst ( ) ;
if ( urlHash = = null ) throw new IOException ( "urlHash is null" ) ;
entry = this . urlEntryCache . remove ( urlHash . getBytes ( ) ) ;
}
if ( ( urlHash = = null ) | | ( entry = = null ) ) return null ;
return new CrawlEntry ( entry ) ;
}
public String stackCrawl ( final CrawlEntry entry ) {
private String stackCrawl ( final CrawlEntry entry ) {
// stacks a crawl item. The position can also be remote
// returns null if successful, a reason string if not successful
//this.log.logFinest("stackCrawl: nexturlString='" + nexturlString + "'");
@ -379,7 +271,7 @@ public final class CrawlStacker extends Thread {
// check if the protocol is supported
final String urlProtocol = entry . url ( ) . getProtocol ( ) ;
if ( ! sb. crawlQueues . isSupportedProtocol ( urlProtocol ) ) {
if ( ! nextQueue . isSupportedProtocol ( urlProtocol ) ) {
reason = "unsupported protocol" ;
this . log . logSevere ( "Unsupported protocol in URL '" + entry . url ( ) . toString ( ) + "'. " +
"Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) + "ms" ) ;
@ -387,9 +279,9 @@ public final class CrawlStacker extends Thread {
}
// check if ip is local ip address
final String urlRejectReason = sb. acceptURL ( entry . url ( ) ) ;
final String urlRejectReason = urlInAcceptedDomain ( entry . url ( ) ) ;
if ( urlRejectReason ! = null ) {
reason = "denied_(" + urlRejectReason + ") _domain= " + sb . getConfig ( "network.unit.domain" , "unknown" ) ;
reason = "denied_(" + urlRejectReason + ") ";
if ( this . log . isFine ( ) ) this . log . logFine ( reason + "Stack processing time: " + ( System . currentTimeMillis ( ) - startTime ) + "ms" ) ;
return reason ;
}
@ -402,7 +294,7 @@ public final class CrawlStacker extends Thread {
return reason ;
}
final CrawlProfile . entry profile = sb. web Index. profilesActiveCrawls . getEntry ( entry . profileHandle ( ) ) ;
final CrawlProfile . entry profile = word Index. profilesActiveCrawls . getEntry ( entry . profileHandle ( ) ) ;
if ( profile = = null ) {
final String errorMsg = "LOST STACKER PROFILE HANDLE '" + entry . profileHandle ( ) + "' for URL " + entry . url ( ) ;
log . logWarning ( errorMsg ) ;
@ -443,7 +335,7 @@ public final class CrawlStacker extends Thread {
return reason ;
}
final yacyURL referrerURL = ( entry . referrerhash ( ) = = null ) ? null : sb. crawlQueues . getURL ( entry . referrerhash ( ) ) ;
final yacyURL referrerURL = ( entry . referrerhash ( ) = = null ) ? null : nextQueue . getURL ( entry . referrerhash ( ) ) ;
// add domain to profile domain list
if ( ( profile . domFilterDepth ( ) ! = Integer . MAX_VALUE ) | | ( profile . domMaxPages ( ) ! = Integer . MAX_VALUE ) ) {
@ -467,8 +359,8 @@ public final class CrawlStacker extends Thread {
}
// check if the url is double registered
final String dbocc = sb. crawlQueues . urlExists ( entry . url ( ) . hash ( ) ) ;
final indexURLReference oldEntry = this . sb . web Index. getURL ( entry . url ( ) . hash ( ) , null , 0 ) ;
final String dbocc = nextQueue . urlExists ( entry . url ( ) . hash ( ) ) ;
final indexURLReference oldEntry = word Index. getURL ( entry . url ( ) . hash ( ) , null , 0 ) ;
final boolean recrawl = ( oldEntry ! = null ) & & ( profile . recrawlIfOlder ( ) > oldEntry . loaddate ( ) . getTime ( ) ) ;
// do double-check
if ( ( dbocc ! = null ) & & ( ! recrawl ) ) {
@ -489,16 +381,16 @@ public final class CrawlStacker extends Thread {
}
// store information
final boolean local = entry . initiator ( ) . equals ( sb. web Index. seedDB . mySeed ( ) . hash ) ;
final boolean proxy = ( entry . initiator ( ) = = null | | entry . initiator ( ) . equals ( "------------" ) ) & & profile . handle ( ) . equals ( this . sb . web Index. defaultProxyProfile . handle ( ) ) ;
final boolean remote = profile . handle ( ) . equals ( this . sb . web Index. defaultRemoteProfile . handle ( ) ) ;
final boolean local = entry . initiator ( ) . equals ( word Index. seedDB . mySeed ( ) . hash ) ;
final boolean proxy = ( entry . initiator ( ) = = null | | entry . initiator ( ) . equals ( "------------" ) ) & & profile . handle ( ) . equals ( word Index. defaultProxyProfile . handle ( ) ) ;
final boolean remote = profile . handle ( ) . equals ( word Index. defaultRemoteProfile . handle ( ) ) ;
final boolean global =
( profile . remoteIndexing ( ) ) /* granted */ & &
( entry . depth ( ) = = profile . depth ( ) ) /* leaf node */ & &
//(initiatorHash.equals(yacyCore.seedDB.mySeed.hash)) /* not proxy */ &&
(
( sb. web Index. seedDB . mySeed ( ) . isSenior ( ) ) | |
( sb. web Index. seedDB . mySeed ( ) . isPrincipal ( ) )
( word Index. seedDB . mySeed ( ) . isSenior ( ) ) | |
( word Index. seedDB . mySeed ( ) . isPrincipal ( ) )
) /* qualified */ ;
if ( ! local & & ! global & & ! remote & & ! proxy ) {
@ -508,23 +400,62 @@ public final class CrawlStacker extends Thread {
// it may be possible that global == true and local == true, so do not check an error case against it
if ( proxy ) this . log . logWarning ( "URL '" + entry . url ( ) . toString ( ) + "' has conflicting initiator properties: global = true, proxy = true, initiator = " + entry . initiator ( ) + ", profile.handle = " + profile . handle ( ) ) ;
if ( remote ) this . log . logWarning ( "URL '" + entry . url ( ) . toString ( ) + "' has conflicting initiator properties: global = true, remote = true, initiator = " + entry . initiator ( ) + ", profile.handle = " + profile . handle ( ) ) ;
sb. crawlQueues . noticeURL . push ( NoticedURL . STACK_TYPE_LIMIT , entry ) ;
nextQueue . noticeURL . push ( NoticedURL . STACK_TYPE_LIMIT , entry ) ;
}
if ( local ) {
if ( proxy ) this . log . logWarning ( "URL '" + entry . url ( ) . toString ( ) + "' has conflicting initiator properties: local = true, proxy = true, initiator = " + entry . initiator ( ) + ", profile.handle = " + profile . handle ( ) ) ;
if ( remote ) this . log . logWarning ( "URL '" + entry . url ( ) . toString ( ) + "' has conflicting initiator properties: local = true, remote = true, initiator = " + entry . initiator ( ) + ", profile.handle = " + profile . handle ( ) ) ;
sb. crawlQueues . noticeURL . push ( NoticedURL . STACK_TYPE_CORE , entry ) ;
nextQueue . noticeURL . push ( NoticedURL . STACK_TYPE_CORE , entry ) ;
}
if ( proxy ) {
if ( remote ) this . log . logWarning ( "URL '" + entry . url ( ) . toString ( ) + "' has conflicting initiator properties: proxy = true, remote = true, initiator = " + entry . initiator ( ) + ", profile.handle = " + profile . handle ( ) ) ;
sb. crawlQueues . noticeURL . push ( NoticedURL . STACK_TYPE_CORE , entry ) ;
nextQueue . noticeURL . push ( NoticedURL . STACK_TYPE_CORE , entry ) ;
}
if ( remote ) {
sb. crawlQueues . noticeURL . push ( NoticedURL . STACK_TYPE_REMOTE , entry ) ;
nextQueue . noticeURL . push ( NoticedURL . STACK_TYPE_REMOTE , entry ) ;
}
}
return null ;
}
/ * *
* Test a url if it can be used for crawling / indexing
* This mainly checks if the url is in the declared domain ( local / global )
* @param url
* @return null if the url can be accepted , a string containing a rejection reason if the url cannot be accepted
* /
public String urlInAcceptedDomain ( final yacyURL url ) {
// returns true if the url can be accepted accoring to network.unit.domain
if ( url = = null ) return "url is null" ;
final String host = url . getHost ( ) ;
if ( host = = null ) return "url.host is null" ;
if ( this . acceptGlobalURLs & & this . acceptLocalURLs ) return null ; // fast shortcut to avoid dnsResolve
/ *
InetAddress hostAddress = serverDomains . dnsResolve ( host ) ;
// if we don't know the host, we cannot load that resource anyway.
// But in case we use a proxy, it is possible that we dont have a DNS service.
final httpRemoteProxyConfig remoteProxyConfig = httpdProxyHandler . getRemoteProxyConfig ( ) ;
if ( hostAddress = = null ) {
if ( ( remoteProxyConfig ! = null ) & & ( remoteProxyConfig . useProxy ( ) ) ) return null ; else return "the dns of the host '" + host + "' cannot be resolved" ;
}
* /
// check if this is a local address and we are allowed to index local pages:
//boolean local = hostAddress.isSiteLocalAddress() || hostAddress.isLoopbackAddress();
final boolean local = url . isLocal ( ) ;
//assert local == yacyURL.isLocalDomain(url.hash()); // TODO: remove the dnsResolve above!
if ( ( this . acceptGlobalURLs & & ! local ) | | ( this . acceptLocalURLs & & local ) ) return null ;
return ( local ) ?
( "the host '" + host + "' is local, but local addresses are not accepted" ) :
( "the host '" + host + "' is global, but global addresses are not accepted" ) ;
}
public boolean acceptLocalURLs ( ) {
return this . acceptLocalURLs ;
}
public boolean acceptGlobalURLs ( ) {
return this . acceptGlobalURLs ;
}
}