*) more failsafe threadpools

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@1446 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
theli 19 years ago
parent 3feeba3d7b
commit f5abfe8d57

@ -48,9 +48,11 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import org.apache.commons.pool.impl.GenericObjectPool;
import de.anomic.server.serverSemaphore;
import de.anomic.server.logging.serverLog;
import org.apache.commons.pool.impl.GenericObjectPool;
public final class plasmaCrawlLoader extends Thread {
@ -291,8 +293,33 @@ final class CrawlerPool extends GenericObjectPool {
return super.borrowObject();
}
public void returnObject(Object obj) throws Exception {
super.returnObject(obj);
public void returnObject(Object obj) {
if (obj == null) return;
if (obj instanceof plasmaCrawlWorker) {
try {
((plasmaCrawlWorker)obj).setName(plasmaCrawlWorker.threadBaseName + "_inPool");
super.returnObject(obj);
} catch (Exception e) {
((plasmaCrawlWorker)obj).setStopped(true);
serverLog.logSevere("CRAWLER-POOL","Unable to return crawler thread to pool.",e);
}
} else {
serverLog.logSevere("CRAWLER-POOL","Object of wront type '" + obj.getClass().getName() +
"' returned to pool.");
}
}
public void invalidateObject(Object obj) {
if (obj == null) return;
if (this.isClosed) return;
if (obj instanceof plasmaCrawlWorker) {
try {
((plasmaCrawlWorker)obj).setStopped(true);
super.invalidateObject(obj);
} catch (Exception e) {
serverLog.logSevere("CRAWLER-POOL","Unable to invalidate crawling thread.",e);
}
}
}
public synchronized void close() throws Exception {
@ -395,6 +422,7 @@ final class CrawlerFactory implements org.apache.commons.pool.PoolableObjectFact
* @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object)
*/
public void destroyObject(Object obj) {
if (obj == null) return;
if (obj instanceof plasmaCrawlWorker) {
plasmaCrawlWorker theWorker = (plasmaCrawlWorker) obj;
theWorker.setStopped(true);
@ -405,8 +433,8 @@ final class CrawlerFactory implements org.apache.commons.pool.PoolableObjectFact
* @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object)
*/
public boolean validateObject(Object obj) {
if (obj instanceof plasmaCrawlWorker)
{
if (obj == null) return false;
if (obj instanceof plasmaCrawlWorker) {
plasmaCrawlWorker theWorker = (plasmaCrawlWorker) obj;
if (!theWorker.isAlive() || theWorker.isInterrupted()) return false;
if (theWorker.isRunning()) return true;

@ -414,7 +414,7 @@ public final class plasmaCrawlStacker {
this.handle = Integer.parseInt(new String(entryBytes[11], "UTF-8"));
} catch (Exception e) {
e.printStackTrace();
throw new IllegalStateException();
throw new IllegalStateException(e);
}
}
@ -596,13 +596,15 @@ public final class plasmaCrawlStacker {
String urlHash = null;
byte[][] entryBytes = null;
stackCrawlMessage newMessage = null;
synchronized(this.urlEntryHashCache) {
urlHash = (String) this.urlEntryHashCache.removeFirst();
entryBytes = this.urlEntryCache.remove(urlHash.getBytes());
try {
synchronized(this.urlEntryHashCache) {
urlHash = (String) this.urlEntryHashCache.removeFirst();
entryBytes = this.urlEntryCache.remove(urlHash.getBytes());
}
} finally {
this.writeSync.V();
}
this.writeSync.V();
newMessage = new stackCrawlMessage(urlHash,entryBytes);
return newMessage;
}
@ -693,11 +695,36 @@ public final class plasmaCrawlStacker {
public Object borrowObject() throws Exception {
return super.borrowObject();
}
public void returnObject(Object obj) throws Exception {
super.returnObject(obj);
public void returnObject(Object obj) {
if (obj == null) return;
if (obj instanceof Worker) {
try {
((Worker)obj).setName("stackCrawlThread_inPool");
super.returnObject(obj);
} catch (Exception e) {
((Worker)obj).setStopped(true);
serverLog.logSevere("STACKCRAWL-POOL","Unable to return stackcrawl thread to pool.",e);
}
} else {
serverLog.logSevere("STACKCRAWL-POOL","Object of wront type '" + obj.getClass().getName() +
"' returned to pool.");
}
}
public void invalidateObject(Object obj) {
if (obj == null) return;
if (this.isClosed) return;
if (obj instanceof Worker) {
try {
((Worker)obj).setStopped(true);
super.invalidateObject(obj);
} catch (Exception e) {
serverLog.logSevere("STACKCRAWL-POOL","Unable to invalidate stackcrawl thread.",e);
}
}
}
public synchronized void close() throws Exception {
/*
@ -807,38 +834,30 @@ public final class plasmaCrawlStacker {
public void run() {
this.running = true;
// The thread keeps running.
while (!this.stopped && !Thread.interrupted()) {
if (this.done) {
// We are waiting for a task now.
synchronized (this) {
try {
this.wait(); //Wait until we get a request to process.
} catch (InterruptedException e) {
this.stopped = true;
// log.error("", e);
}
}
} else {
//There is a task....let us execute it.
try {
execute();
} catch (Exception e) {
// log.error("", e);
} finally {
reset();
try {
// The thread keeps running.
while (!this.stopped && !this.isInterrupted() && !plasmaCrawlStacker.this.theWorkerPool.isClosed) {
if (this.done) {
// return thread back into pool
plasmaCrawlStacker.this.theWorkerPool.returnObject(this);
if (!this.stopped && !this.isInterrupted() && !plasmaCrawlStacker.this.theWorkerPool.isClosed) {
try {
this.setName("stackCrawlThread_inPool");
plasmaCrawlStacker.this.theWorkerPool.returnObject(this);
} catch (Exception e1) {
// e1.printStackTrace();
this.stopped = true;
}
// We are waiting for a new task now.
synchronized (this) { this.wait(); }
} else {
try {
// executing the new task
execute();
} finally {
// reset thread
reset();
}
}
}
} catch (InterruptedException ex) {
serverLog.logInfo("STACKCRAWL-POOL","Interruption of thread '" + this.getName() + "' detected.");
} finally {
if (plasmaCrawlStacker.this.theWorkerPool != null)
plasmaCrawlStacker.this.theWorkerPool.invalidateObject(this);
}
}

@ -65,7 +65,7 @@ import de.anomic.yacy.yacyCore;
public final class plasmaCrawlWorker extends Thread {
private static final int DEFAULT_CRAWLING_RETRY_COUNT = 5;
private static final String threadBaseName = "CrawlerWorker";
static final String threadBaseName = "CrawlerWorker";
private final CrawlerPool myPool;
private final plasmaSwitchboard sb;
@ -165,40 +165,29 @@ public final class plasmaCrawlWorker extends Thread {
public void run() {
this.running = true;
// The thread keeps running.
while (!this.stopped && !Thread.interrupted()) {
if (this.done) {
// We are waiting for a task now.
synchronized (this) {
try {
// The thread keeps running.
while (!this.stopped && !this.isInterrupted() && !this.myPool.isClosed) {
if (this.done) {
// return thread back into pool
this.myPool.returnObject(this);
// We are waiting for a new task now.
synchronized (this) { this.wait(); }
} else {
try {
this.wait(); //Wait until we get a request to process.
}
catch (InterruptedException e) {
this.stopped = true;
// log.error("", e);
}
}
} else {
//There is a task....let us execute it.
try {
execute();
} catch (Exception e) {
// log.error("", e);
}
finally {
reset();
if (!this.stopped && !this.isInterrupted()) {
try {
this.myPool.returnObject(this);
this.setName(plasmaCrawlWorker.threadBaseName + "_inPool");
}
catch (Exception e1) {
log.logSevere("pool error", e1);
}
// executing the new task
execute();
} finally {
reset();
}
}
}
} catch (InterruptedException ex) {
serverLog.logInfo("CRAWLER-POOL","Interruption of thread '" + this.getName() + "' detected.");
} finally {
if (this.myPool != null)
this.myPool.invalidateObject(this);
}
}

@ -65,26 +65,16 @@ import java.nio.channels.ClosedByInterruptException;
import java.util.Enumeration;
import java.util.Hashtable;
/*
import java.io.File;
import java.io.FileInputStream;
import java.security.KeyStore;
import javax.net.ServerSocketFactory;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLServerSocketFactory;
*/
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool.Config;
import de.anomic.http.httpc;
import de.anomic.icap.icapd;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.logging.serverLog;
import de.anomic.urlRedirector.urlRedirectord;
import de.anomic.yacy.yacyCore;
import de.anomic.yacy.yacySeed;
import de.anomic.plasma.plasmaSwitchboard;
public final class serverCore extends serverAbstractThread implements serverThread {
@ -604,12 +594,32 @@ public final class serverCore extends serverAbstractThread implements serverThre
/**
* @see org.apache.commons.pool.impl.GenericObjectPool#returnObject(java.lang.Object)
*/
public void returnObject(Object obj) throws Exception {
public void returnObject(Object obj) {
if (obj == null) return;
if (obj instanceof Session) {
super.returnObject(obj);
try {
((Session)obj).setName("Session_inPool");
super.returnObject(obj);
} catch (Exception e) {
((Session)obj).setStopped(true);
serverLog.logSevere("SESSION-POOL","Unable to return session thread to pool.",e);
}
} else {
serverLog.logSevere("SESSION-POOL","Object of wront type '" + obj.getClass().getName() +
"'returned to pool.");
"' returned to pool.");
}
}
public void invalidateObject(Object obj) {
if (obj == null) return;
if (this.isClosed) return;
if (obj instanceof Session) {
try {
((Session)obj).setStopped(true);
super.invalidateObject(obj);
} catch (Exception e) {
serverLog.logSevere("SESSION-POOL","Unable to invalidate session thread.",e);
}
}
}
@ -914,51 +924,35 @@ public final class serverCore extends serverAbstractThread implements serverThre
public void run() {
this.running = true;
// The thread keeps running.
while (!this.stopped && !this.isInterrupted()) {
if (this.done) {
// We are waiting for a task now.
synchronized (this) {
try {
this.wait(); //Wait until we get a request to process.
} catch (InterruptedException e) {
this.stopped = true;
// log.error("", e);
}
}
} else {
//There is a task....let us execute it.
try {
execute();
if (this.syncObject != null) {
synchronized (this.syncObject) {
//Notify the completion.
this.syncObject.notifyAll();
}
}
} catch (Exception e) {
// log.error("", e);
} finally {
reset();
try {
// The thread keeps running.
while (!this.stopped && !this.isInterrupted() && !serverCore.this.theSessionPool.isClosed) {
if (this.done) {
// return thread back into pool
serverCore.this.theSessionPool.returnObject(this);
if (!this.stopped && !this.isInterrupted() && !serverCore.this.theSessionPool.isClosed) {
try {
this.setName("Session_inPool");
serverCore.this.theSessionPool.returnObject(this);
} catch (Exception e1) {
// e1.printStackTrace();
this.stopped = true;
}
} else if (!serverCore.this.theSessionPool.isClosed) {
try {
serverCore.this.theSessionPool.invalidateObject(this);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
// We are waiting for a new task now.
synchronized (this) {this.wait();}
} else {
try {
// executing the new task
execute();
} finally {
// Notify the completion.
if (this.syncObject != null) {
synchronized (this.syncObject) { this.syncObject.notifyAll(); }
}
// reset thread
reset();
}
}
}
} catch (InterruptedException ex) {
serverLog.logInfo("SESSION-POOL","Interruption of thread '" + this.getName() + "' detected.");
} finally {
if (serverCore.this.theSessionPool != null)
serverCore.this.theSessionPool.invalidateObject(this);
}
}

Loading…
Cancel
Save