diff --git a/htroot/IndexCreate_p.java b/htroot/IndexCreate_p.java index 717a8dca7..c76f2c1ec 100644 --- a/htroot/IndexCreate_p.java +++ b/htroot/IndexCreate_p.java @@ -319,16 +319,36 @@ public class IndexCreate_p { } else { prop.put("loader-set", 1); prop.put("loader-set_num", loaderThreadsSize); - dark = true; - plasmaCrawlLoader.Exec[] loaderThreads = switchboard.cacheLoader.threadStatus(); - for (i = 0; i < loaderThreads.length; i++) { - initiator = yacyCore.seedDB.getConnected(loaderThreads[i].initiator); + dark = true; + //plasmaCrawlLoader.Exec[] loaderThreads = switchboard.cacheLoader.threadStatus(); +// for (i = 0; i < loaderThreads.length; i++) { +// initiator = yacyCore.seedDB.getConnected(loaderThreads[i].initiator); +// prop.put("loader-set_list_"+i+"_dark", ((dark) ? 1 : 0) ); +// prop.put("loader-set_list_"+i+"_initiator", ((initiator == null) ? "proxy" : initiator.getName()) ); +// prop.put("loader-set_list_"+i+"_depth", loaderThreads[i].depth ); +// prop.put("loader-set_list_"+i+"_url", loaderThreads[i].url ); // null pointer exception here !!! maybe url = null; check reason. +// dark = !dark; +// } +// prop.put("loader-set_list", i ); + + ThreadGroup loaderThreads = switchboard.cacheLoader.threadStatus(); + + int threadCount = loaderThreads.activeCount(); + Thread[] threadList = new Thread[threadCount*2]; + threadCount = loaderThreads.enumerate(threadList); + + for (i = 0; i < threadCount; i++) { + plasmaCrawlWorker theWorker = (plasmaCrawlWorker)threadList[i]; + plasmaCrawlLoaderMessage theMsg = theWorker.theMsg; + if (theMsg == null) continue; + + initiator = yacyCore.seedDB.getConnected(theMsg.initiator); prop.put("loader-set_list_"+i+"_dark", ((dark) ? 1 : 0) ); prop.put("loader-set_list_"+i+"_initiator", ((initiator == null) ? "proxy" : initiator.getName()) ); - prop.put("loader-set_list_"+i+"_depth", loaderThreads[i].depth ); - prop.put("loader-set_list_"+i+"_url", loaderThreads[i].url ); // null pointer exception here !!! maybe url = null; check reason. - dark = !dark; - } + prop.put("loader-set_list_"+i+"_depth", theMsg.depth ); + prop.put("loader-set_list_"+i+"_url", theMsg.url ); // null pointer exception here !!! maybe url = null; check reason. + dark = !dark; + } prop.put("loader-set_list", i ); } diff --git a/source/de/anomic/http/httpc.java b/source/de/anomic/http/httpc.java index cc3dea88d..56ceaf675 100644 --- a/source/de/anomic/http/httpc.java +++ b/source/de/anomic/http/httpc.java @@ -166,17 +166,20 @@ public final class httpc { public static httpc getInstance(String server, int port, int timeout, boolean ssl) throws IOException { + httpc newHttpc = null; try { // fetching a new httpc from the object pool - httpc newHttpc = (httpc) httpc.theHttpcPool.borrowObject(); - - // initialize it - newHttpc.init(server,port,timeout,ssl); - return newHttpc; + newHttpc = (httpc) httpc.theHttpcPool.borrowObject(); } catch (Exception e) { throw new IOException("Unable to initialize a new httpc. " + e.getMessage()); - } + } + + // initialize it + newHttpc.init(server,port,timeout,ssl); + return newHttpc; + + } public static void returnInstance(httpc theHttpc) { @@ -320,7 +323,7 @@ public final class httpc { userAgent = "yacy (www.yacy.net; v" + vDATE + "; " + systemOST + ")"; } - public class response { + public final class response { // Response-Header = Date | Pragma | Allow | Content-Encoding | Content-Length | Content-Type | // Expires | Last-Modified | HTTP-header /* @@ -438,7 +441,8 @@ public final class httpc { } public byte[] writeContent(OutputStream procOS) throws IOException { - serverByteBuffer sbb = new serverByteBuffer(); + int contentLength = (int) this.responseHeader.contentLength(); + serverByteBuffer sbb = new serverByteBuffer((contentLength==-1)?8192:contentLength); writeContentX(procOS, sbb, httpc.this.clientInput); return sbb.getBytes(); } diff --git a/source/de/anomic/http/httpd.java b/source/de/anomic/http/httpd.java index 1971ae600..59276e64f 100644 --- a/source/de/anomic/http/httpd.java +++ b/source/de/anomic/http/httpd.java @@ -647,7 +647,7 @@ public final class httpd implements serverHandler { // we replace all "+" by spaces // and resolve %-escapes with two-digit hex attributes int pos = 0; - StringBuffer result = new StringBuffer(); + StringBuffer result = new StringBuffer(s.length()); while (pos < s.length()) { if (s.charAt(pos) == '+') { result.append(" "); diff --git a/source/de/anomic/http/httpdProxyHandler.java b/source/de/anomic/http/httpdProxyHandler.java index 9c992d994..75dcd4d77 100644 --- a/source/de/anomic/http/httpdProxyHandler.java +++ b/source/de/anomic/http/httpdProxyHandler.java @@ -910,7 +910,7 @@ public final class httpdProxyHandler extends httpdAbstractHandler implements htt if (!(header.containsKey("date"))) header.put("Date", httpc.dateString(httpc.nowDate())); if (!(header.containsKey("content-type"))) header.put("Content-type", "text/html"); // fix this - StringBuffer headerStringBuffer = new StringBuffer(); + StringBuffer headerStringBuffer = new StringBuffer(200); // write status line headerStringBuffer.append("HTTP/1.1 ") diff --git a/source/de/anomic/plasma/plasmaCrawlLoader.java b/source/de/anomic/plasma/plasmaCrawlLoader.java index c1dcaf58b..67c1e5865 100644 --- a/source/de/anomic/plasma/plasmaCrawlLoader.java +++ b/source/de/anomic/plasma/plasmaCrawlLoader.java @@ -44,218 +44,445 @@ package de.anomic.plasma; import java.io.*; import java.util.*; import java.net.*; + +import org.apache.commons.pool.impl.GenericObjectPool; + import de.anomic.net.*; import de.anomic.http.*; import de.anomic.server.*; import de.anomic.tools.*; import de.anomic.htmlFilter.*; -public final class plasmaCrawlLoader { +public final class plasmaCrawlLoader extends Thread { - private plasmaHTCache cacheManager; - private int socketTimeout; - private int loadTimeout; - private boolean remoteProxyUse; - private String remoteProxyHost; - private int remoteProxyPort; - private int maxSlots; - private List slots; - private serverLog log; - private HashSet acceptMimeTypes; + private final plasmaHTCache cacheManager; + private final int socketTimeout; + private final int loadTimeout; + private final int maxSlots; + private final serverLog log; + + private final CrawlerMessageQueue theQueue; + private final CrawlerPool crawlwerPool; + private final ThreadGroup theThreadGroup = new ThreadGroup("CrawlerThreads"); + private boolean stopped = false; + + public plasmaCrawlLoader( + plasmaHTCache cacheManager, + serverLog log, + int socketTimeout, + int loadTimeout, + int mslots, + boolean proxyUse, + String proxyHost, + int proxyPort, + HashSet acceptMimeTypes) { + this.setName("plasmaCrawlLoader"); + + this.cacheManager = cacheManager; + this.log = log; + this.socketTimeout = socketTimeout; + this.loadTimeout = loadTimeout; + this.maxSlots = mslots; + + // configuring the crawler messagequeue + this.theQueue = new CrawlerMessageQueue(); + + // configuring the crawler thread pool + // implementation of session thread pool + GenericObjectPool.Config config = new GenericObjectPool.Config(); + + // The maximum number of active connections that can be allocated from pool at the same time, + // 0 for no limit + config.maxActive = this.maxSlots; + + // The maximum number of idle connections connections in the pool + // 0 = no limit. + config.maxIdle = this.maxSlots / 2; + config.minIdle = this.maxSlots / 4; + + // block undefinitely + config.maxWait = -1; + + // Action to take in case of an exhausted DBCP statement pool + // 0 = fail, 1 = block, 2= grow + config.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_BLOCK; + config.minEvictableIdleTimeMillis = 30000; +// config.testOnReturn = true; + + CrawlerFactory theFactory = new CrawlerFactory( + this.theThreadGroup, + cacheManager, + socketTimeout, + proxyUse, + proxyHost, + proxyPort, + acceptMimeTypes, + log); + + this.crawlwerPool = new CrawlerPool(theFactory,config,this.theThreadGroup); + + // start the crawl loader + this.start(); + } - public plasmaCrawlLoader(plasmaHTCache cacheManager, serverLog log, int socketTimeout, int loadTimeout, int mslots, boolean proxyUse, String proxyHost, int proxyPort, - HashSet acceptMimeTypes) { - this.cacheManager = cacheManager; - this.log = log; - this.socketTimeout = socketTimeout; - this.loadTimeout = loadTimeout; - this.remoteProxyUse = proxyUse; - this.remoteProxyHost = proxyHost; - this.remoteProxyPort = proxyPort; - this.maxSlots = mslots; - this.slots = new LinkedList(); - this.acceptMimeTypes = acceptMimeTypes; + public ThreadGroup threadStatus() { + return this.theThreadGroup; + } + + public void run() { + + while (!this.stopped && !Thread.interrupted()) { + try { + // getting a new message from the crawler queue + plasmaCrawlLoaderMessage theMsg = this.theQueue.waitForMessage(); + + // getting a new crawler from the crawler pool + plasmaCrawlWorker theWorker = (plasmaCrawlWorker) this.crawlwerPool.borrowObject(); + theWorker.execute(theMsg); + + } catch (InterruptedException e) { + Thread.interrupted(); + this.stopped = true; + } + catch (Exception e) { + e.printStackTrace(); + } + } + + // consuming the is interrupted flag + this.isInterrupted(); + + // closing the pool + try { + this.crawlwerPool.close(); + } + catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + public void loadParallel( + URL url, + String referer, + String initiator, + int depth, + plasmaCrawlProfile.entry profile) { - private void killTimeouts() { - Exec thread; - for (int i = slots.size() - 1; i >= 0; i--) { - // check if thread is alive - thread = (Exec) slots.get(i); - if (thread.isAlive()) { - // check the age of the thread - if (System.currentTimeMillis() - thread.startdate > loadTimeout) { - // we kill that thread - thread.interrupt(); // hopefully this wakes him up. - slots.remove(i); - log.logDebug("IGNORING SLEEPING DOWNLOAD SLOT " + thread.url.toString()); - } - } else { - // thread i is dead, remove it - slots.remove(i); + if (!this.crawlwerPool.isClosed) { + int crawlingPriority = 5; + + // creating a new crawler queue object + plasmaCrawlLoaderMessage theMsg = new plasmaCrawlLoaderMessage(url, referer,initiator,depth,profile, crawlingPriority); + + // adding the message to the queue + try { + this.theQueue.addMessage(theMsg); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); } } } + + public int size() { + return crawlwerPool.getNumActive(); + } +} + + + +final class Semaphore { + private long currentValue = 0; + private long maximumValue = Long.MAX_VALUE; - public synchronized void loadParallel(URL url, String referer, String initiator, int depth, plasmaCrawlProfile.entry profile) { + protected Semaphore() { + this(0,Long.MAX_VALUE); + } + + public Semaphore(long initialValue) { + this(initialValue,Long.MAX_VALUE); + } - // wait until there is space in the download slots - Exec thread; - while (slots.size() >= maxSlots) { - killTimeouts(); + protected Semaphore(long initialValue, long maxValue) { + /* some errorhandling */ + if (maxValue < initialValue) { + throw new IllegalArgumentException("The semaphore maximum value must not be " + + "greater than the semaphore init value."); + } + + if (maxValue < 1) { + throw new IllegalArgumentException("The semaphore maximum value must be greater or equal 1."); + } + + if (initialValue < 0) { + throw new IllegalArgumentException("The semaphore initial value must be greater or equal 0."); + } + + + // setting the initial Sempahore Values + this.currentValue = initialValue; + this.maximumValue = maxValue; + } + + public synchronized void P() throws InterruptedException + { + this.currentValue-- ; + + if (this.currentValue < 0) { + try { + wait(); + } catch(InterruptedException e) { + this.currentValue++; + throw e; + } + } + } - // wait a while - try { - Thread.currentThread().sleep(1000); - } catch (InterruptedException e) { - break; - } - } + public synchronized void V() { + if (this.currentValue+1 == this.maximumValue) { + throw new IndexOutOfBoundsException("The maximum value of the semaphore was reached"); + } + + this.currentValue++; + + if (this.currentValue <= 0) { + notify(); + } + } +} - // we found space in the download slots - thread = new Exec(url, referer, initiator, depth, profile); - thread.start(); - slots.add(thread); +class CrawlerMessageQueue { + private final Semaphore readSync; + private final Semaphore writeSync; + private final ArrayList messageList; + + public CrawlerMessageQueue() { + this.readSync = new Semaphore (0); + this.writeSync = new Semaphore (1); + + this.messageList = new ArrayList(10); } + + /** + * + * @param newMessage + * @throws MessageQueueLockedException + * @throws InterruptedException + */ + public void addMessage(plasmaCrawlLoaderMessage newMessage) + throws InterruptedException, NullPointerException + { + if (newMessage == null) throw new NullPointerException(); + + this.writeSync.P(); + + boolean insertionDoneSuccessfully = false; + synchronized(this.messageList) { + insertionDoneSuccessfully = this.messageList.add(newMessage); + } + + if (insertionDoneSuccessfully) { + this.sortMessages(); + this.readSync.V(); + } + + this.writeSync.V(); + } + + public plasmaCrawlLoaderMessage waitForMessage() throws InterruptedException { + this.readSync.P(); + this.writeSync.P(); + + plasmaCrawlLoaderMessage newMessage = null; + synchronized(this.messageList) { + newMessage = (plasmaCrawlLoaderMessage) this.messageList.remove(0); + } - public int size() { - killTimeouts(); - return slots.size(); + this.writeSync.V(); + return newMessage; } - public Exec[] threadStatus() { - killTimeouts(); - Exec[] result = new Exec[slots.size()]; - for (int i = 0; i < slots.size(); i++) result[i] = (Exec) slots.get(i); - return result; + protected void sortMessages() { + Collections.sort(this.messageList, new Comparator() { + public int compare(Object o1, Object o2) + { + plasmaCrawlLoaderMessage message1 = (plasmaCrawlLoaderMessage) o1; + plasmaCrawlLoaderMessage message2 = (plasmaCrawlLoaderMessage) o2; + + int message1Priority = message1.crawlingPriority; + int message2Priority = message2.crawlingPriority; + + if (message1Priority > message2Priority){ + return -1; + } else if (message1Priority < message2Priority) { + return 1; + } else { + return 0; + } + } + }); } +} + + +final class CrawlerPool extends GenericObjectPool +{ + private final ThreadGroup theThreadGroup; + public boolean isClosed = false; - public final class Exec extends Thread { + + public CrawlerPool(CrawlerFactory objFactory, + GenericObjectPool.Config config, + ThreadGroup threadGroup) { + super(objFactory, config); + this.theThreadGroup = threadGroup; + objFactory.setPool(this); + } + + public Object borrowObject() throws Exception { + return super.borrowObject(); + } - public URL url; - public String referer; - public String initiator; - public int depth; - public long startdate; - public plasmaCrawlProfile.entry profile; - public String error; + public void returnObject(Object obj) throws Exception { + super.returnObject(obj); + } + + public synchronized void close() throws Exception { + /* + * shutdown all still running session threads ... + */ + // interrupting all still running or pooled threads ... + this.theThreadGroup.interrupt(); + + /* waiting for all threads to finish */ + int threadCount = this.theThreadGroup.activeCount(); + Thread[] threadList = new Thread[threadCount]; + threadCount = this.theThreadGroup.enumerate(threadList); - public Exec(URL url, String referer, String initiator, int depth, plasmaCrawlProfile.entry profile) { - this.url = url; // the url to crawl - this.referer = referer; // the url that contained this url as link - this.initiator = initiator; - this.depth = depth; // distance from start-url - this.startdate = System.currentTimeMillis(); - this.profile = profile; - this.error = null; + try { + for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { + ((plasmaCrawlWorker)threadList[currentThreadIdx]).setStopped(true); + } + + for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { + // we need to use a timeout here because of missing interruptable session threads ... + if (threadList[currentThreadIdx].isAlive()) threadList[currentThreadIdx].join(500); + } } + catch (InterruptedException e) { + System.err.println("Interruption while trying to shutdown all crawler threads."); + } - public void run() { - try { - load(url, referer, initiator, depth, profile); - } catch (IOException e) { - } - } + this.isClosed = true; + super.close(); + + } + +} - private httpc newhttpc(String server, int port, boolean ssl) throws IOException { - // a new httpc connection, combined with possible remote proxy - if (remoteProxyUse) - return httpc.getInstance(server, port, socketTimeout, ssl, remoteProxyHost, remoteProxyPort); - else return httpc.getInstance(server, port, socketTimeout, ssl); - } +final class CrawlerFactory implements org.apache.commons.pool.PoolableObjectFactory { - private void load(URL url, String referer, String initiator, int depth, plasmaCrawlProfile.entry profile) throws IOException { - if (url == null) return; - Date requestDate = new Date(); // remember the time... - String host = url.getHost(); - String path = url.getPath(); - int port = url.getPort(); - boolean ssl = url.getProtocol().equals("https"); - if (port < 0) port = (ssl) ? 443 : 80; - - // set referrer; in some case advertise a little bit: - referer = referer.trim(); - if (referer.length() == 0) referer = "http://www.yacy.net/yacy/"; - - // take a file from the net - httpc remote = null; - try { - // create a request header - httpHeader requestHeader = new httpHeader(); - requestHeader.put("User-Agent", httpdProxyHandler.userAgent); - requestHeader.put("Referer", referer); - requestHeader.put("Accept-Encoding", "gzip,deflate"); + private CrawlerPool thePool; + private final ThreadGroup theThreadGroup; + private final plasmaHTCache cacheManager; + private final int socketTimeout; + private final boolean remoteProxyUse; + private final String remoteProxyHost; + private final int remoteProxyPort; + private final HashSet acceptMimeTypes; + private final serverLog theLog; - //System.out.println("CRAWLER_REQUEST_HEADER=" + requestHeader.toString()); // DEBUG - - // open the connection - remote = newhttpc(host, port, ssl); - - // send request - httpc.response res = remote.GET(path, requestHeader); - - if (res.status.startsWith("200")) { - // the transfer is ok - long contentLength = res.responseHeader.contentLength(); - - // reserve cache entry - plasmaHTCache.Entry htCache = cacheManager.newEntry(requestDate, depth, url, requestHeader, res.status, res.responseHeader, initiator, profile); - - // request has been placed and result has been returned. work off response - File cacheFile = cacheManager.getCachePath(url); - try { - if (!(httpd.isTextMime(res.responseHeader.mime().toLowerCase(), acceptMimeTypes))) { - // if the response has not the right file type then reject file - remote.close(); - log.logInfo("REJECTED WRONG MIME TYPE " + res.responseHeader.mime() + " for url " + url.toString()); - htCache.status = plasmaHTCache.CACHE_UNFILLED; - } else if ((profile.storeHTCache()) && ((error = htCache.shallStoreCache()) == null)) { - // we write the new cache entry to file system directly - cacheFile.getParentFile().mkdirs(); - FileOutputStream fos = new FileOutputStream(cacheFile); - htCache.cacheArray = res.writeContent(fos); // writes in cacheArray and cache file - fos.close(); - htCache.status = plasmaHTCache.CACHE_FILL; - } else { - if (error != null) log.logDebug("CRAWLER NOT STORED RESOURCE " + url.toString() + ": " + error); - // anyway, the content still lives in the content scraper - htCache.cacheArray = res.writeContent(null); // writes only into cacheArray - htCache.status = plasmaHTCache.CACHE_PASSING; - } - // enQueue new entry with response header - if ((initiator == null) || (initiator.length() == 0)) { - // enqueued for proxy writings - cacheManager.stackProcess(htCache); - } else { - // direct processing for crawling - cacheManager.process(htCache); - } - } catch (SocketException e) { - // this may happen if the client suddenly closes its connection - // maybe the user has stopped loading - // in that case, we are not responsible and just forget it - // but we clean the cache also, since it may be only partial - // and most possible corrupted - if (cacheFile.exists()) cacheFile.delete(); - log.logError("CRAWLER LOADER ERROR1: with url=" + url.toString() + ": " + e.toString()); - } - } else { - // if the response has not the right response type then reject file - log.logInfo("REJECTED WRONG STATUS TYPE '" + res.status + "' for url " + url.toString()); - // not processed any further - } - remote.close(); - } catch (Exception e) { - // this may happen if the targeted host does not exist or anything with the - // remote server was wrong. - log.logError("CRAWLER LOADER ERROR2 with url=" + url.toString() + ": " + e.toString()); - e.printStackTrace(); - } finally { - if (remote != null) httpc.returnInstance(remote); - } + public CrawlerFactory( + ThreadGroup theThreadGroup, + plasmaHTCache cacheManager, + int socketTimeout, + boolean remoteProxyUse, + String remoteProxyHost, + int remoteProxyPort, + HashSet acceptMimeTypes, + serverLog theLog) { + + super(); + + if (theThreadGroup == null) + throw new IllegalArgumentException("The threadgroup object must not be null."); + + this.theThreadGroup = theThreadGroup; + this.cacheManager = cacheManager; + this.socketTimeout = socketTimeout; + this.remoteProxyUse = remoteProxyUse; + this.remoteProxyHost = remoteProxyHost; + this.remoteProxyPort = remoteProxyPort; + this.acceptMimeTypes = acceptMimeTypes; + this.theLog = theLog; + } + + public void setPool(CrawlerPool thePool) { + this.thePool = thePool; + } + + /** + * @see org.apache.commons.pool.PoolableObjectFactory#makeObject() + */ + public Object makeObject() { + return new plasmaCrawlWorker( + this.theThreadGroup, + this.thePool, + this.cacheManager, + this.socketTimeout, + this.remoteProxyUse, + this.remoteProxyHost, + this.remoteProxyPort, + this.acceptMimeTypes, + this.theLog); + } + + /** + * @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object) + */ + public void destroyObject(Object obj) { + if (obj instanceof plasmaCrawlWorker) { + plasmaCrawlWorker theWorker = (plasmaCrawlWorker) obj; + theWorker.setStopped(true); } - } - + /** + * @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object) + */ + public boolean validateObject(Object obj) { + if (obj instanceof plasmaCrawlWorker) + { + plasmaCrawlWorker theWorker = (plasmaCrawlWorker) obj; + if (!theWorker.isAlive() || theWorker.isInterrupted()) return false; + if (theWorker.isRunning()) return true; + return false; + } + return true; + } + + /** + * @param obj + * + */ + public void activateObject(Object obj) { + //log.debug(" activateObject..."); + } + + /** + * @param obj + * + */ + public void passivateObject(Object obj) { + //log.debug(" passivateObject..." + obj); + if (obj instanceof plasmaCrawlWorker) { + plasmaCrawlWorker theWorker = (plasmaCrawlWorker) obj; + } + } } + + + + diff --git a/source/de/anomic/plasma/plasmaCrawlLoaderMessage.java b/source/de/anomic/plasma/plasmaCrawlLoaderMessage.java new file mode 100644 index 000000000..0f939453a --- /dev/null +++ b/source/de/anomic/plasma/plasmaCrawlLoaderMessage.java @@ -0,0 +1,70 @@ +//plasmaCrawlLoaderMessage.java +//------------------------ +//part of YaCy +//(C) by Michael Peter Christen; mc@anomic.de +//first published on http://www.anomic.de +//Frankfurt, Germany, 2004 +//last major change: 21.04.2005 by Martin Thelian +// +//This program is free software; you can redistribute it and/or modify +//it under the terms of the GNU General Public License as published by +//the Free Software Foundation; either version 2 of the License, or +//(at your option) any later version. +// +//This program is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//GNU General Public License for more details. +// +//You should have received a copy of the GNU General Public License +//along with this program; if not, write to the Free Software +//Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +//Using this software in any meaning (reading, learning, copying, compiling, +//running) means that you agree that the Author(s) is (are) not responsible +//for cost, loss of data or any harm that may be caused directly or indirectly +//by usage of this softare or this documentation. The usage of this software +//is on your own risk. The installation and usage (starting/running) of this +//software may allow other people or application to access your computer and +//any attached devices and is highly dependent on the configuration of the +//software which must be done by the user of the software; the author(s) is +//(are) also not responsible for proper configuration and usage of the +//software, even if provoked by documentation provided together with +//the software. +// +//Any changes to this file according to the GPL as documented in the file +//gpl.txt aside this file in the shipment you received can be done to the +//lines that follows this copyright notice here, but changes must not be +//done inside the copyright notive above. A re-distribution must contain +//the intact and unchanged copyright notice. +//Contributions and changes to the program code must be marked as such. + + +package de.anomic.plasma; + +import java.net.URL; + +public final class plasmaCrawlLoaderMessage { + public final int crawlingPriority; + public final URL url; + public final String referer; + public final String initiator; + public final int depth; + public final plasmaCrawlProfile.entry profile; + + // loadParallel(URL url, String referer, String initiator, int depth, plasmaCrawlProfile.entry profile) { + public plasmaCrawlLoaderMessage( + URL url, + String referer, + String initiator, + int depth, + plasmaCrawlProfile.entry profile, + int crawlingPriority) { + this.url = url; + this.referer = referer; + this.initiator = initiator; + this.depth = depth; + this.profile = profile; + this.crawlingPriority = crawlingPriority; + } +} \ No newline at end of file diff --git a/source/de/anomic/plasma/plasmaCrawlWorker.java b/source/de/anomic/plasma/plasmaCrawlWorker.java new file mode 100644 index 000000000..fbe38eef8 --- /dev/null +++ b/source/de/anomic/plasma/plasmaCrawlWorker.java @@ -0,0 +1,306 @@ +//plasmaCrawlWorker.java +//------------------------ +//part of YaCy +//(C) by Michael Peter Christen; mc@anomic.de +//first published on http://www.anomic.de +//Frankfurt, Germany, 2004 +//last major change: 21.04.2005 by Martin Thelian +// +//This program is free software; you can redistribute it and/or modify +//it under the terms of the GNU General Public License as published by +//the Free Software Foundation; either version 2 of the License, or +//(at your option) any later version. +// +//This program is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//GNU General Public License for more details. +// +//You should have received a copy of the GNU General Public License +//along with this program; if not, write to the Free Software +//Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +//Using this software in any meaning (reading, learning, copying, compiling, +//running) means that you agree that the Author(s) is (are) not responsible +//for cost, loss of data or any harm that may be caused directly or indirectly +//by usage of this softare or this documentation. The usage of this software +//is on your own risk. The installation and usage (starting/running) of this +//software may allow other people or application to access your computer and +//any attached devices and is highly dependent on the configuration of the +//software which must be done by the user of the software; the author(s) is +//(are) also not responsible for proper configuration and usage of the +//software, even if provoked by documentation provided together with +//the software. +// +//Any changes to this file according to the GPL as documented in the file +//gpl.txt aside this file in the shipment you received can be done to the +//lines that follows this copyright notice here, but changes must not be +//done inside the copyright notive above. A re-distribution must contain +//the intact and unchanged copyright notice. +//Contributions and changes to the program code must be marked as such. + +package de.anomic.plasma; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.SocketException; +import java.net.URL; +import java.util.Date; +import java.util.HashSet; + +import de.anomic.http.httpHeader; +import de.anomic.http.httpc; +import de.anomic.http.httpd; +import de.anomic.http.httpdProxyHandler; +import de.anomic.server.serverLog; + +public final class plasmaCrawlWorker extends Thread { + + private static final String threadBaseName = "CrawlerWorker"; + + private final CrawlerPool myPool; + private final plasmaHTCache cacheManager; + private final int socketTimeout; + private final boolean remoteProxyUse; + private final String remoteProxyHost; + private final int remoteProxyPort; + private final HashSet acceptMimeTypes; + private final serverLog log; + + public plasmaCrawlLoaderMessage theMsg; + private URL url; + private String referer; + private String initiator; + private int depth; + private long startdate; + private plasmaCrawlProfile.entry profile; + private String error; + + private boolean running = false; + private boolean stopped = false; + private boolean done = false; + + + public plasmaCrawlWorker( + ThreadGroup theTG, + CrawlerPool thePool, + plasmaHTCache cacheManager, + int socketTimeout, + boolean remoteProxyUse, + String remoteProxyHost, + int remoteProxyPort, + HashSet acceptMimeTypes, + serverLog log) { + super(theTG,threadBaseName + "_inPool"); + + this.myPool = thePool; + this.cacheManager = cacheManager; + this.socketTimeout = socketTimeout; + this.remoteProxyUse = remoteProxyUse; + this.remoteProxyHost = remoteProxyHost; + this.remoteProxyPort = remoteProxyPort; + this.acceptMimeTypes = acceptMimeTypes; + this.log = log; + } + + public synchronized void execute(plasmaCrawlLoaderMessage theMsg) { + this.theMsg = theMsg; + + this.url = theMsg.url; + this.referer = theMsg.referer; + this.initiator = theMsg.initiator; + this.depth = theMsg.depth; + this.profile = theMsg.profile; + + this.startdate = System.currentTimeMillis(); + this.error = null; + + + this.done = false; + if (!this.running) { + // this.setDaemon(true); + this.start(); + } else { + this.notifyAll(); + } + } + + public void reset() { + this.url = null; + this.referer = null; + this.initiator = null; + this.depth = 0; + this.startdate = 0; + this.profile = null; + this.error = null; + } + + public void run() { + this.running = true; + + // The thread keeps running. + while (!this.stopped && !Thread.interrupted()) { + if (this.done) { + // We are waiting for a task now. + synchronized (this) { + try { + this.wait(); //Wait until we get a request to process. + } + catch (InterruptedException e) { + this.stopped = true; + // log.error("", e); + } + } + } + else + { + //There is a task....let us execute it. + try { + execute(); + } catch (Exception e) { + // log.error("", e); + } + finally { + reset(); + + if (!this.stopped && !this.isInterrupted()) { + try { + this.myPool.returnObject(this); + this.setName(this.threadBaseName + "_inPool"); + } + catch (Exception e1) { + e1.printStackTrace(); + } + } + } + } + } + } + + public void execute() throws IOException { + try { + this.setName(this.threadBaseName + "_" + this.url); + load(this.url, this.referer, this.initiator, this.depth, this.profile); + } catch (IOException e) { + throw e; + } + finally { + this.done = true; + } + } + + private httpc newhttpc(String server, int port, boolean ssl) throws IOException { + // a new httpc connection, combined with possible remote proxy + if (remoteProxyUse) + return httpc.getInstance(server, port, socketTimeout, ssl, remoteProxyHost, remoteProxyPort); + else return httpc.getInstance(server, port, socketTimeout, ssl); + } + + private void load( + URL url, + String referer, + String initiator, + int depth, + plasmaCrawlProfile.entry profile + ) throws IOException { + if (url == null) return; + Date requestDate = new Date(); // remember the time... + String host = url.getHost(); + String path = url.getPath(); + int port = url.getPort(); + boolean ssl = url.getProtocol().equals("https"); + if (port < 0) port = (ssl) ? 443 : 80; + + // set referrer; in some case advertise a little bit: + referer = referer.trim(); + if (referer.length() == 0) referer = "http://www.yacy.net/yacy/"; + + // take a file from the net + httpc remote = null; + try { + // create a request header + httpHeader requestHeader = new httpHeader(); + requestHeader.put("User-Agent", httpdProxyHandler.userAgent); + requestHeader.put("Referer", referer); + requestHeader.put("Accept-Encoding", "gzip,deflate"); + + //System.out.println("CRAWLER_REQUEST_HEADER=" + requestHeader.toString()); // DEBUG + + // open the connection + remote = newhttpc(host, port, ssl); + + // send request + httpc.response res = remote.GET(path, requestHeader); + + if (res.status.startsWith("200")) { + // the transfer is ok + long contentLength = res.responseHeader.contentLength(); + + // reserve cache entry + plasmaHTCache.Entry htCache = cacheManager.newEntry(requestDate, depth, url, requestHeader, res.status, res.responseHeader, initiator, profile); + + // request has been placed and result has been returned. work off response + File cacheFile = cacheManager.getCachePath(url); + try { + if (!(httpd.isTextMime(res.responseHeader.mime().toLowerCase(), acceptMimeTypes))) { + // if the response has not the right file type then reject file + remote.close(); + log.logInfo("REJECTED WRONG MIME TYPE " + res.responseHeader.mime() + " for url " + url.toString()); + htCache.status = plasmaHTCache.CACHE_UNFILLED; + } else if ((profile.storeHTCache()) && ((error = htCache.shallStoreCache()) == null)) { + // we write the new cache entry to file system directly + cacheFile.getParentFile().mkdirs(); + FileOutputStream fos = new FileOutputStream(cacheFile); + htCache.cacheArray = res.writeContent(fos); // writes in cacheArray and cache file + fos.close(); + htCache.status = plasmaHTCache.CACHE_FILL; + } else { + if (error != null) log.logDebug("CRAWLER NOT STORED RESOURCE " + url.toString() + ": " + error); + // anyway, the content still lives in the content scraper + htCache.cacheArray = res.writeContent(null); // writes only into cacheArray + htCache.status = plasmaHTCache.CACHE_PASSING; + } + // enQueue new entry with response header + if ((initiator == null) || (initiator.length() == 0)) { + // enqueued for proxy writings + cacheManager.stackProcess(htCache); + } else { + // direct processing for crawling + cacheManager.process(htCache); + } + } catch (SocketException e) { + // this may happen if the client suddenly closes its connection + // maybe the user has stopped loading + // in that case, we are not responsible and just forget it + // but we clean the cache also, since it may be only partial + // and most possible corrupted + if (cacheFile.exists()) cacheFile.delete(); + log.logError("CRAWLER LOADER ERROR1: with url=" + url.toString() + ": " + e.toString()); + } + } else { + // if the response has not the right response type then reject file + log.logInfo("REJECTED WRONG STATUS TYPE '" + res.status + "' for url " + url.toString()); + // not processed any further + } + remote.close(); + } catch (Exception e) { + // this may happen if the targeted host does not exist or anything with the + // remote server was wrong. + log.logError("CRAWLER LOADER ERROR2 with url=" + url.toString() + ": " + e.toString()); + e.printStackTrace(); + } finally { + if (remote != null) httpc.returnInstance(remote); + } + } + + public void setStopped(boolean stopped) { + this.stopped = stopped; + } + + public boolean isRunning() { + return this.running; + } + +} + diff --git a/source/de/anomic/server/serverCore.java b/source/de/anomic/server/serverCore.java index bc02ade5b..199672be2 100644 --- a/source/de/anomic/server/serverCore.java +++ b/source/de/anomic/server/serverCore.java @@ -395,19 +395,21 @@ public final class serverCore extends serverAbstractThread implements serverThre threadCount = serverCore.this.theSessionThreadGroup.enumerate(threadList); try { + for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { + ((Session)threadList[currentThreadIdx]).setStopped(true); + } + for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { // we need to use a timeout here because of missing interruptable session threads ... - threadList[currentThreadIdx].join(500); + if (threadList[currentThreadIdx].isAlive()) threadList[currentThreadIdx].join(500); } } catch (InterruptedException e) { serverCore.this.log.logWarning("Interruption while trying to shutdown all session threads."); } - finally { - this.isClosed = true; - } - super.close(); + this.isClosed = true; + super.close(); } } @@ -626,10 +628,12 @@ public final class serverCore extends serverAbstractThread implements serverThre if (!this.stopped && !this.isInterrupted()) { try { + this.setName("Session_inPool"); serverCore.this.theSessionPool.returnObject(this); } catch (Exception e1) { - e1.printStackTrace(); + // e1.printStackTrace(); + this.stopped = true; } } } @@ -637,7 +641,7 @@ public final class serverCore extends serverAbstractThread implements serverThre } } - private void execute() { + private void execute() throws InterruptedException { try { // setting the session startup time @@ -648,6 +652,7 @@ public final class serverCore extends serverAbstractThread implements serverThre // getting some client information this.userAddress = this.controlSocket.getInetAddress(); + this.setName("Session_" + this.userAddress.getHostAddress() + ":" + this.controlSocket.getPort()); // TODO: check if we want to allow this socket to connect us @@ -669,6 +674,7 @@ public final class serverCore extends serverAbstractThread implements serverThre listen(); } catch (Exception e) { + if (e instanceof InterruptedException) throw (InterruptedException) e; System.err.println("ERROR: (internal) " + e); } finally { try {