diff --git a/source/net/yacy/repository/LoaderDispatcher.java b/source/net/yacy/repository/LoaderDispatcher.java index d1a280d4d..ad2d9c89d 100644 --- a/source/net/yacy/repository/LoaderDispatcher.java +++ b/source/net/yacy/repository/LoaderDispatcher.java @@ -33,10 +33,13 @@ import java.io.Writer; import java.net.MalformedURLException; import java.util.Arrays; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import net.yacy.cora.document.MultiProtocolURI; import net.yacy.cora.protocol.HeaderFramework; @@ -73,6 +76,7 @@ public final class LoaderDispatcher { private final FTPLoader ftpLoader; private final SMBLoader smbLoader; private final FileLoader fileLoader; + private final HashMap loaderSteering; // a map that delivers a 'finish' semaphore for urls private final Log log; public LoaderDispatcher(final Switchboard sb) { @@ -81,10 +85,11 @@ public final class LoaderDispatcher { // initiate loader objects this.log = new Log("LOADER"); - httpLoader = new HTTPLoader(sb, log); - ftpLoader = new FTPLoader(sb, log); - smbLoader = new SMBLoader(sb, log); - fileLoader = new FileLoader(sb, log); + this.httpLoader = new HTTPLoader(sb, log); + this.ftpLoader = new FTPLoader(sb, log); + this.smbLoader = new SMBLoader(sb, log); + this.fileLoader = new FileLoader(sb, log); + this.loaderSteering = new HashMap(); } public boolean isSupportedProtocol(final String protocol) { @@ -141,14 +146,38 @@ public final class LoaderDispatcher { tmp.renameTo(targetFile); } + public Response load(final Request request, CrawlProfile.CacheStrategy cacheStrategy, long maxFileSize) throws IOException { + String url = request.url().toNormalform(true, false); + Semaphore check = this.loaderSteering.get(url); + if (check != null) { + // a loading process may be going on for that url + try { check.tryAcquire(5, TimeUnit.SECONDS);} catch (InterruptedException e) {} + // now the process may have terminated and we run a normal loading + // which may be successful faster because of a cache hit + } + + try { + this.loaderSteering.put(url, new Semaphore(0)); + Response response = loadInternal(request, cacheStrategy, maxFileSize); + check = this.loaderSteering.remove(url); + if (check != null) check.release(1000); + return response; + } catch (Exception e) { + // release the semaphore anyway + check = this.loaderSteering.remove(url); + if (check != null) check.release(1000); + throw new IOException(e); + } + } + /** * load a resource from the web, from ftp, from smb or a file * @param request the request essentials - * @param cacheStratgy strategy according to CACHE_STRATEGY_NOCACHE,CACHE_STRATEGY_IFFRESH,CACHE_STRATEGY_IFEXIST,CACHE_STRATEGY_CACHEONLY + * @param cacheStratgy strategy according to NOCACHE, IFFRESH, IFEXIST, CACHEONLY * @return the loaded entity in a Response object * @throws IOException */ - public Response load(final Request request, CrawlProfile.CacheStrategy cacheStrategy, long maxFileSize) throws IOException { + public Response loadInternal(final Request request, CrawlProfile.CacheStrategy cacheStrategy, long maxFileSize) throws IOException { // get the protocol of the next URL final DigestURI url = request.url(); final String protocol = url.getProtocol();