diff --git a/source/de/anomic/plasma/plasmaCrawlLoader.java b/source/de/anomic/plasma/plasmaCrawlLoader.java index 676c0f7fc..4c77c1f62 100644 --- a/source/de/anomic/plasma/plasmaCrawlLoader.java +++ b/source/de/anomic/plasma/plasmaCrawlLoader.java @@ -48,9 +48,11 @@ import java.net.URL; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; + +import org.apache.commons.pool.impl.GenericObjectPool; + import de.anomic.server.serverSemaphore; import de.anomic.server.logging.serverLog; -import org.apache.commons.pool.impl.GenericObjectPool; public final class plasmaCrawlLoader extends Thread { @@ -291,8 +293,33 @@ final class CrawlerPool extends GenericObjectPool { return super.borrowObject(); } - public void returnObject(Object obj) throws Exception { - super.returnObject(obj); + public void returnObject(Object obj) { + if (obj == null) return; + if (obj instanceof plasmaCrawlWorker) { + try { + ((plasmaCrawlWorker)obj).setName(plasmaCrawlWorker.threadBaseName + "_inPool"); + super.returnObject(obj); + } catch (Exception e) { + ((plasmaCrawlWorker)obj).setStopped(true); + serverLog.logSevere("CRAWLER-POOL","Unable to return crawler thread to pool.",e); + } + } else { + serverLog.logSevere("CRAWLER-POOL","Object of wront type '" + obj.getClass().getName() + + "' returned to pool."); + } + } + + public void invalidateObject(Object obj) { + if (obj == null) return; + if (this.isClosed) return; + if (obj instanceof plasmaCrawlWorker) { + try { + ((plasmaCrawlWorker)obj).setStopped(true); + super.invalidateObject(obj); + } catch (Exception e) { + serverLog.logSevere("CRAWLER-POOL","Unable to invalidate crawling thread.",e); + } + } } public synchronized void close() throws Exception { @@ -395,6 +422,7 @@ final class CrawlerFactory implements org.apache.commons.pool.PoolableObjectFact * @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object) */ public void destroyObject(Object obj) { + if (obj == null) return; if (obj instanceof plasmaCrawlWorker) { plasmaCrawlWorker theWorker = (plasmaCrawlWorker) obj; theWorker.setStopped(true); @@ -405,8 +433,8 @@ final class CrawlerFactory implements org.apache.commons.pool.PoolableObjectFact * @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object) */ public boolean validateObject(Object obj) { - if (obj instanceof plasmaCrawlWorker) - { + if (obj == null) return false; + if (obj instanceof plasmaCrawlWorker) { plasmaCrawlWorker theWorker = (plasmaCrawlWorker) obj; if (!theWorker.isAlive() || theWorker.isInterrupted()) return false; if (theWorker.isRunning()) return true; diff --git a/source/de/anomic/plasma/plasmaCrawlStacker.java b/source/de/anomic/plasma/plasmaCrawlStacker.java index 9bdef67d7..1ab775bc3 100644 --- a/source/de/anomic/plasma/plasmaCrawlStacker.java +++ b/source/de/anomic/plasma/plasmaCrawlStacker.java @@ -414,7 +414,7 @@ public final class plasmaCrawlStacker { this.handle = Integer.parseInt(new String(entryBytes[11], "UTF-8")); } catch (Exception e) { e.printStackTrace(); - throw new IllegalStateException(); + throw new IllegalStateException(e); } } @@ -596,13 +596,15 @@ public final class plasmaCrawlStacker { String urlHash = null; byte[][] entryBytes = null; stackCrawlMessage newMessage = null; - synchronized(this.urlEntryHashCache) { - urlHash = (String) this.urlEntryHashCache.removeFirst(); - entryBytes = this.urlEntryCache.remove(urlHash.getBytes()); + try { + synchronized(this.urlEntryHashCache) { + urlHash = (String) this.urlEntryHashCache.removeFirst(); + entryBytes = this.urlEntryCache.remove(urlHash.getBytes()); + } + } finally { + this.writeSync.V(); } - this.writeSync.V(); - newMessage = new stackCrawlMessage(urlHash,entryBytes); return newMessage; } @@ -693,11 +695,36 @@ public final class plasmaCrawlStacker { public Object borrowObject() throws Exception { return super.borrowObject(); } - - public void returnObject(Object obj) throws Exception { - super.returnObject(obj); + + public void returnObject(Object obj) { + if (obj == null) return; + if (obj instanceof Worker) { + try { + ((Worker)obj).setName("stackCrawlThread_inPool"); + super.returnObject(obj); + } catch (Exception e) { + ((Worker)obj).setStopped(true); + serverLog.logSevere("STACKCRAWL-POOL","Unable to return stackcrawl thread to pool.",e); + } + } else { + serverLog.logSevere("STACKCRAWL-POOL","Object of wront type '" + obj.getClass().getName() + + "' returned to pool."); + } } + public void invalidateObject(Object obj) { + if (obj == null) return; + if (this.isClosed) return; + if (obj instanceof Worker) { + try { + ((Worker)obj).setStopped(true); + super.invalidateObject(obj); + } catch (Exception e) { + serverLog.logSevere("STACKCRAWL-POOL","Unable to invalidate stackcrawl thread.",e); + } + } + } + public synchronized void close() throws Exception { /* @@ -807,38 +834,30 @@ public final class plasmaCrawlStacker { public void run() { this.running = true; - // The thread keeps running. - while (!this.stopped && !Thread.interrupted()) { - if (this.done) { - // We are waiting for a task now. - synchronized (this) { - try { - this.wait(); //Wait until we get a request to process. - } catch (InterruptedException e) { - this.stopped = true; - // log.error("", e); - } - } - } else { - //There is a task....let us execute it. - try { - execute(); - } catch (Exception e) { - // log.error("", e); - } finally { - reset(); + try { + // The thread keeps running. + while (!this.stopped && !this.isInterrupted() && !plasmaCrawlStacker.this.theWorkerPool.isClosed) { + if (this.done) { + // return thread back into pool + plasmaCrawlStacker.this.theWorkerPool.returnObject(this); - if (!this.stopped && !this.isInterrupted() && !plasmaCrawlStacker.this.theWorkerPool.isClosed) { - try { - this.setName("stackCrawlThread_inPool"); - plasmaCrawlStacker.this.theWorkerPool.returnObject(this); - } catch (Exception e1) { - // e1.printStackTrace(); - this.stopped = true; - } + // We are waiting for a new task now. + synchronized (this) { this.wait(); } + } else { + try { + // executing the new task + execute(); + } finally { + // reset thread + reset(); } } } + } catch (InterruptedException ex) { + serverLog.logInfo("STACKCRAWL-POOL","Interruption of thread '" + this.getName() + "' detected."); + } finally { + if (plasmaCrawlStacker.this.theWorkerPool != null) + plasmaCrawlStacker.this.theWorkerPool.invalidateObject(this); } } diff --git a/source/de/anomic/plasma/plasmaCrawlWorker.java b/source/de/anomic/plasma/plasmaCrawlWorker.java index e2c5786c1..b04c2f9b1 100644 --- a/source/de/anomic/plasma/plasmaCrawlWorker.java +++ b/source/de/anomic/plasma/plasmaCrawlWorker.java @@ -65,7 +65,7 @@ import de.anomic.yacy.yacyCore; public final class plasmaCrawlWorker extends Thread { private static final int DEFAULT_CRAWLING_RETRY_COUNT = 5; - private static final String threadBaseName = "CrawlerWorker"; + static final String threadBaseName = "CrawlerWorker"; private final CrawlerPool myPool; private final plasmaSwitchboard sb; @@ -165,40 +165,29 @@ public final class plasmaCrawlWorker extends Thread { public void run() { this.running = true; - // The thread keeps running. - while (!this.stopped && !Thread.interrupted()) { - if (this.done) { - // We are waiting for a task now. - synchronized (this) { + try { + // The thread keeps running. + while (!this.stopped && !this.isInterrupted() && !this.myPool.isClosed) { + if (this.done) { + // return thread back into pool + this.myPool.returnObject(this); + + // We are waiting for a new task now. + synchronized (this) { this.wait(); } + } else { try { - this.wait(); //Wait until we get a request to process. - } - catch (InterruptedException e) { - this.stopped = true; - // log.error("", e); - } - } - } else { - //There is a task....let us execute it. - try { - execute(); - } catch (Exception e) { - // log.error("", e); - } - finally { - reset(); - - if (!this.stopped && !this.isInterrupted()) { - try { - this.myPool.returnObject(this); - this.setName(plasmaCrawlWorker.threadBaseName + "_inPool"); - } - catch (Exception e1) { - log.logSevere("pool error", e1); - } + // executing the new task + execute(); + } finally { + reset(); } } } + } catch (InterruptedException ex) { + serverLog.logInfo("CRAWLER-POOL","Interruption of thread '" + this.getName() + "' detected."); + } finally { + if (this.myPool != null) + this.myPool.invalidateObject(this); } } diff --git a/source/de/anomic/server/serverCore.java b/source/de/anomic/server/serverCore.java index bc210ed3b..f509fd65c 100644 --- a/source/de/anomic/server/serverCore.java +++ b/source/de/anomic/server/serverCore.java @@ -65,26 +65,16 @@ import java.nio.channels.ClosedByInterruptException; import java.util.Enumeration; import java.util.Hashtable; -/* -import java.io.File; -import java.io.FileInputStream; -import java.security.KeyStore; -import javax.net.ServerSocketFactory; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLServerSocketFactory; -*/ - import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.commons.pool.impl.GenericObjectPool.Config; import de.anomic.http.httpc; import de.anomic.icap.icapd; +import de.anomic.plasma.plasmaSwitchboard; import de.anomic.server.logging.serverLog; import de.anomic.urlRedirector.urlRedirectord; import de.anomic.yacy.yacyCore; import de.anomic.yacy.yacySeed; -import de.anomic.plasma.plasmaSwitchboard; public final class serverCore extends serverAbstractThread implements serverThread { @@ -604,12 +594,32 @@ public final class serverCore extends serverAbstractThread implements serverThre /** * @see org.apache.commons.pool.impl.GenericObjectPool#returnObject(java.lang.Object) */ - public void returnObject(Object obj) throws Exception { + public void returnObject(Object obj) { + if (obj == null) return; if (obj instanceof Session) { - super.returnObject(obj); + try { + ((Session)obj).setName("Session_inPool"); + super.returnObject(obj); + } catch (Exception e) { + ((Session)obj).setStopped(true); + serverLog.logSevere("SESSION-POOL","Unable to return session thread to pool.",e); + } } else { serverLog.logSevere("SESSION-POOL","Object of wront type '" + obj.getClass().getName() + - "'returned to pool."); + "' returned to pool."); + } + } + + public void invalidateObject(Object obj) { + if (obj == null) return; + if (this.isClosed) return; + if (obj instanceof Session) { + try { + ((Session)obj).setStopped(true); + super.invalidateObject(obj); + } catch (Exception e) { + serverLog.logSevere("SESSION-POOL","Unable to invalidate session thread.",e); + } } } @@ -914,51 +924,35 @@ public final class serverCore extends serverAbstractThread implements serverThre public void run() { this.running = true; - // The thread keeps running. - while (!this.stopped && !this.isInterrupted()) { - if (this.done) { - // We are waiting for a task now. - synchronized (this) { - try { - this.wait(); //Wait until we get a request to process. - } catch (InterruptedException e) { - this.stopped = true; - // log.error("", e); - } - } - } else { - //There is a task....let us execute it. - try { - execute(); - if (this.syncObject != null) { - synchronized (this.syncObject) { - //Notify the completion. - this.syncObject.notifyAll(); - } - } - } catch (Exception e) { - // log.error("", e); - } finally { - reset(); + try { + // The thread keeps running. + while (!this.stopped && !this.isInterrupted() && !serverCore.this.theSessionPool.isClosed) { + if (this.done) { + // return thread back into pool + serverCore.this.theSessionPool.returnObject(this); - if (!this.stopped && !this.isInterrupted() && !serverCore.this.theSessionPool.isClosed) { - try { - this.setName("Session_inPool"); - serverCore.this.theSessionPool.returnObject(this); - } catch (Exception e1) { - // e1.printStackTrace(); - this.stopped = true; - } - } else if (!serverCore.this.theSessionPool.isClosed) { - try { - serverCore.this.theSessionPool.invalidateObject(this); - } catch (Exception e) { - // TODO Auto-generated catch block - e.printStackTrace(); + // We are waiting for a new task now. + synchronized (this) {this.wait();} + } else { + try { + // executing the new task + execute(); + } finally { + // Notify the completion. + if (this.syncObject != null) { + synchronized (this.syncObject) { this.syncObject.notifyAll(); } } + + // reset thread + reset(); } } } + } catch (InterruptedException ex) { + serverLog.logInfo("SESSION-POOL","Interruption of thread '" + this.getName() + "' detected."); + } finally { + if (serverCore.this.theSessionPool != null) + serverCore.this.theSessionPool.invalidateObject(this); } }