@ -28,14 +28,20 @@ package de.anomic.crawler.retrieval;
import java.io.IOException ;
import java.util.Arrays ;
import java.util.Date ;
import java.util.HashSet ;
import java.util.Iterator ;
import java.util.Map ;
import java.util.concurrent.ConcurrentHashMap ;
import de.anomic.crawler.CrawlProfile ;
import de.anomic.http.client.Cache ;
import de.anomic.http.metadata.HeaderFramework ;
import de.anomic.http.metadata.RequestHeader ;
import de.anomic.http.metadata.ResponseHeader ;
import de.anomic.search.Switchboard ;
import de.anomic.server.serverCore ;
import de.anomic.server.serverProcessorJob ;
import de.anomic. yacy.yacyURL ;
import de.anomic.yacy.logging.Log ;
public final class LoaderDispatcher {
@ -44,17 +50,17 @@ public final class LoaderDispatcher {
private static final ConcurrentHashMap < String , Long > accessTime = new ConcurrentHashMap < String , Long > ( ) ; // to protect targets from DDoS
private final Switchboard sb ;
private final Log log ;
private final HashSet < String > supportedProtocols ;
private final HTTPLoader httpLoader ;
private final FTPLoader ftpLoader ;
private final Log log ;
public LoaderDispatcher ( final Switchboard sb , final Log log ) {
public LoaderDispatcher ( final Switchboard sb ) {
this . sb = sb ;
this . log = log ;
this . supportedProtocols = new HashSet < String > ( Arrays . asList ( new String [ ] { "http" , "https" , "ftp" } ) ) ;
// initiate loader objects
this . log = new Log ( "LOADER" ) ;
httpLoader = new HTTPLoader ( sb , log ) ;
ftpLoader = new FTPLoader ( sb , log ) ;
}
@ -69,17 +75,100 @@ public final class LoaderDispatcher {
return ( HashSet < String > ) this . supportedProtocols . clone ( ) ;
}
public Response load ( final Request entry ) throws IOException {
// getting the protocol of the next URL
final String protocol = entry . url ( ) . getProtocol ( ) ;
final String host = entry . url ( ) . getHost ( ) ;
public Response load (
final yacyURL url ,
final boolean forText ,
final boolean global
) throws IOException {
final Request centry = new Request (
sb . peers . mySeed ( ) . hash ,
url ,
"" ,
"" ,
new Date ( ) ,
new Date ( ) ,
( forText ) ?
( ( global ) ?
sb . crawler . defaultTextSnippetGlobalProfile . handle ( ) :
sb . crawler . defaultTextSnippetLocalProfile . handle ( ) )
:
( ( global ) ?
sb . crawler . defaultMediaSnippetGlobalProfile . handle ( ) :
sb . crawler . defaultMediaSnippetLocalProfile . handle ( ) ) , // crawl profile
0 ,
0 ,
0 ) ;
return load ( centry ) ;
}
public Response load ( final Request request ) throws IOException {
// get the protocol of the next URL
final String protocol = request . url ( ) . getProtocol ( ) ;
final String host = request . url ( ) . getHost ( ) ;
// check if this loads a page from localhost, which must be prevented to protect the server
// against attacks to the administration interface when localhost access is granted
if ( serverCore . isLocalhost ( host ) & & sb . getConfigBool ( "adminAccountForLocalhost" , false ) ) throw new IOException ( "access to localhost not granted for url " + entry . url ( ) ) ;
if ( serverCore . isLocalhost ( host ) & & sb . getConfigBool ( "adminAccountForLocalhost" , false ) ) throw new IOException ( "access to localhost not granted for url " + request . url ( ) ) ;
// check if we have the page in the cache
CrawlProfile . entry crawlProfile = sb . crawler . profilesActiveCrawls . getEntry ( request . profileHandle ( ) ) ;
int cacheStrategy = CrawlProfile . CACHE_STRATEGY_NOCACHE ;
if ( crawlProfile ! = null & & ( cacheStrategy = crawlProfile . cacheStrategy ( ) ) ! = CrawlProfile . CACHE_STRATEGY_NOCACHE ) {
// we have passed a first test if caching is allowed
// now see if there is a cache entry
ResponseHeader cachedResponse = ( request . url ( ) . isLocal ( ) ) ? null : Cache . getResponseHeader ( request . url ( ) ) ;
byte [ ] content = ( cachedResponse = = null ) ? null : Cache . getContent ( request . url ( ) ) ;
if ( cachedResponse ! = null & & content ! = null ) {
// yes we have the content
// create request header values and a response object because we need that
// in case that we want to return the cached content in the next step
final RequestHeader requestHeader = new RequestHeader ( ) ;
requestHeader . put ( HeaderFramework . USER_AGENT , HTTPLoader . crawlerUserAgent ) ;
yacyURL refererURL = null ;
if ( request . referrerhash ( ) ! = null ) refererURL = sb . getURL ( request . referrerhash ( ) ) ;
if ( refererURL ! = null ) requestHeader . put ( RequestHeader . REFERER , refererURL . toNormalform ( true , true ) ) ;
Response response = new Response (
request ,
requestHeader ,
cachedResponse ,
"200" ,
crawlProfile ,
content ) ;
// check which caching strategy shall be used
if ( cacheStrategy = = CrawlProfile . CACHE_STRATEGY_IFEXIST | | cacheStrategy = = CrawlProfile . CACHE_STRATEGY_CACHEONLY ) {
// well, just take the cache and don't care about freshness of the content
log . logInfo ( "cache hit/useall for: " + request . url ( ) . toNormalform ( true , false ) ) ;
return response ;
}
// now the cacheStrategy must be CACHE_STRATEGY_IFFRESH, that means we should do a proxy freshness test
assert cacheStrategy = = CrawlProfile . CACHE_STRATEGY_IFFRESH : "cacheStrategy = " + cacheStrategy ;
if ( response . isFreshForProxy ( ) ) {
log . logInfo ( "cache hit/fresh for: " + request . url ( ) . toNormalform ( true , false ) ) ;
return response ;
} else {
log . logInfo ( "cache hit/stale for: " + request . url ( ) . toNormalform ( true , false ) ) ;
}
}
}
// check access time
if ( ! entry . url ( ) . isLocal ( ) ) {
// check case where we want results from the cache exclusively, and never from the internet (offline mode)
if ( cacheStrategy = = CrawlProfile . CACHE_STRATEGY_CACHEONLY ) {
// we had a chance to get the content from the cache .. its over. We don't have it.
return null ;
}
// now forget about the cache, nothing there. Try to load the content from the internet
// check access time: this is a double-check (we checked possibly already in the balancer)
// to make shure that we don't DoS the target by mistake
if ( ! request . url ( ) . isLocal ( ) ) {
final Long lastAccess = accessTime . get ( host ) ;
long wait = 0 ;
if ( lastAccess ! = null ) wait = Math . max ( 0 , minDelay + lastAccess . longValue ( ) - System . currentTimeMillis ( ) ) ;
@ -91,13 +180,26 @@ public final class LoaderDispatcher {
try { Thread . sleep ( untilTime - System . currentTimeMillis ( ) ) ; } catch ( final InterruptedException ee ) { }
}
}
// now it's for shure that we will access the target. Remember the access time
accessTime . put ( host , System . currentTimeMillis ( ) ) ;
// load resource
if ( ( protocol . equals ( "http" ) | | ( protocol . equals ( "https" ) ) ) ) return httpLoader . load ( entry ) ;
if ( protocol . equals ( "ftp" ) ) return ftpLoader . load ( entry ) ;
// load resource from the internet
Response response = null ;
if ( ( protocol . equals ( "http" ) | | ( protocol . equals ( "https" ) ) ) ) response = httpLoader . load ( request ) ;
if ( protocol . equals ( "ftp" ) ) response = ftpLoader . load ( request ) ;
if ( response ! = null ) {
// we got something. Now check if we want to store that to the cache
String storeError = response . shallStoreCache ( ) ;
if ( storeError = = null ) {
Cache . store ( request . url ( ) , response . getResponseHeader ( ) , response . getContent ( ) ) ;
} else {
if ( Cache . log . isFine ( ) ) Cache . log . logFine ( "no storage of url " + request . url ( ) + ": " + storeError ) ;
}
return response ;
}
throw new IOException ( "Unsupported protocol '" + protocol + "' in url " + entry . url ( ) ) ;
throw new IOException ( "Unsupported protocol '" + protocol + "' in url " + request . url ( ) ) ;
}
public synchronized void cleanupAccessTimeTable ( long timeout ) {
@ -109,24 +211,4 @@ public final class LoaderDispatcher {
if ( System . currentTimeMillis ( ) - e . getValue ( ) . longValue ( ) > minDelay ) i . remove ( ) ;
}
}
public String process ( final Request entry ) {
// load a resource, store it to htcache and push queue entry to switchboard queue
// returns null if everything went fine, a fail reason string if a problem occurred
Response h ;
try {
entry . setStatus ( "loading" , serverProcessorJob . STATUS_RUNNING ) ;
h = load ( entry ) ;
assert h ! = null ;
entry . setStatus ( "loaded" , serverProcessorJob . STATUS_RUNNING ) ;
final boolean stored = sb . htEntryStoreProcess ( h ) ;
entry . setStatus ( "stored-" + ( ( stored ) ? "ok" : "fail" ) , serverProcessorJob . STATUS_FINISHED ) ;
return ( stored ) ? null : "not stored" ;
} catch ( IOException e ) {
entry . setStatus ( "error" , serverProcessorJob . STATUS_FINISHED ) ;
if ( log . isFine ( ) ) log . logFine ( "problem loading " + entry . url ( ) . toString ( ) + ": " + e . getMessage ( ) ) ;
return "load error - " + e . getMessage ( ) ;
}
}
}