*) Bugfix for Thread.getID() usagage + PeerPing-Shutdown Deadlock

See:
   - http://www.yacy-forum.de/viewtopic.php?p=4937
   - http://www.yacy-forum.de/viewtopic.php?p=4939

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@390 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
theli 20 years ago
parent 13eeaa08f3
commit 0b95c9c434

@ -114,7 +114,7 @@ public final class httpc {
// class variables // class variables
private Socket socket = null; // client socket for commands private Socket socket = null; // client socket for commands
private Long socketOwnerID = null; private Thread socketOwner = null;
private String host = null; private String host = null;
private long timeout; private long timeout;
private long handle; private long handle;
@ -326,7 +326,7 @@ public final class httpc {
: new Socket(hostip, port); : new Socket(hostip, port);
// registering the socket // registering the socket
this.socketOwnerID = this.registerOpenSocket(socket); this.socketOwner = this.registerOpenSocket(socket);
// setting socket timeout and keep alive behaviour // setting socket timeout and keep alive behaviour
socket.setSoTimeout(timeout); // waiting time for write socket.setSoTimeout(timeout); // waiting time for write
@ -339,10 +339,10 @@ public final class httpc {
// if we reached this point, we should have a connection // if we reached this point, we should have a connection
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
if (this.socket != null) { if (this.socket != null) {
this.unregisterOpenSocket(this.socket,this.socketOwnerID); this.unregisterOpenSocket(this.socket,this.socketOwner);
} }
this.socket = null; this.socket = null;
this.socketOwnerID = null; this.socketOwner = null;
throw new IOException("unknown host: " + server); throw new IOException("unknown host: " + server);
} }
} }
@ -358,9 +358,9 @@ public final class httpc {
} }
if (this.socket != null) { if (this.socket != null) {
try {this.socket.close();} catch (Exception e) {} try {this.socket.close();} catch (Exception e) {}
this.unregisterOpenSocket(this.socket,this.socketOwnerID); this.unregisterOpenSocket(this.socket,this.socketOwner);
this.socket = null; this.socket = null;
this.socketOwnerID = null; this.socketOwner = null;
} }
this.host = null; this.host = null;
@ -1110,20 +1110,20 @@ do upload
* @param openedSocket the socket that should be registered * @param openedSocket the socket that should be registered
* @return the id of the current thread * @return the id of the current thread
*/ */
private Long registerOpenSocket(Socket openedSocket) { private Thread registerOpenSocket(Socket openedSocket) {
Long currentThreadId = new Long(Thread.currentThread().getId()); Thread currentThread = Thread.currentThread();
synchronized (openSocketLookupTable) { synchronized (openSocketLookupTable) {
ArrayList openSockets = null; ArrayList openSockets = null;
if (openSocketLookupTable.containsKey(currentThreadId)) { if (openSocketLookupTable.containsKey(currentThread)) {
openSockets = (ArrayList) openSocketLookupTable.get(currentThreadId); openSockets = (ArrayList) openSocketLookupTable.get(currentThread);
} else { } else {
openSockets = new ArrayList(1); openSockets = new ArrayList(1);
openSocketLookupTable.put(currentThreadId,openSockets); openSocketLookupTable.put(currentThread,openSockets);
} }
synchronized (openSockets) { synchronized (openSockets) {
openSockets.add(openedSocket); openSockets.add(openedSocket);
} }
return currentThreadId; return currentThread;
} }
} }
@ -1132,26 +1132,24 @@ do upload
* with the given thread id * with the given thread id
* @param threadId * @param threadId
*/ */
public static int closeOpenSockets(Long threadId) { public static int closeOpenSockets(Thread thread) {
// getting all still opened sockets // getting all still opened sockets
ArrayList openSockets = httpc.getRegisteredOpenSockets(threadId); ArrayList openSockets = (ArrayList) httpc.getRegisteredOpenSockets(thread).clone();
int closedSocketCount = 0; int closedSocketCount = 0;
synchronized (openSockets) { // looping through the list of sockets and close each one
// looping through the list of sockets and close each one for (int socketCount = 0; socketCount < openSockets.size(); socketCount++) {
for (int socketCount = 0; socketCount < openSockets.size(); socketCount++) { Socket openSocket = (Socket) openSockets.get(0);
Socket openSocket = (Socket) openSockets.get(0); try {
try { // closing the socket
// closing the socket if (!openSocket.isClosed()) {
if (!openSocket.isClosed()) { openSocket.close();
openSocket.close(); closedSocketCount++;
closedSocketCount++; }
} // unregistering the socket
// unregistering the socket httpc.unregisterOpenSocket(openSocket,thread);
httpc.unregisterOpenSocket(openSocket,threadId); } catch (Exception ex) {}
} catch (Exception ex) {}
}
} }
return closedSocketCount; return closedSocketCount;
@ -1164,15 +1162,15 @@ do upload
* @param closedSocket the socket that should be unregistered * @param closedSocket the socket that should be unregistered
* @param threadId the id of the owner thread * @param threadId the id of the owner thread
*/ */
public static void unregisterOpenSocket(Socket closedSocket, Long threadId) { public static void unregisterOpenSocket(Socket closedSocket, Thread thread) {
synchronized (openSocketLookupTable) { synchronized (openSocketLookupTable) {
ArrayList openSockets = null; ArrayList openSockets = null;
if (openSocketLookupTable.containsKey(threadId)) { if (openSocketLookupTable.containsKey(thread)) {
openSockets = (ArrayList) openSocketLookupTable.get(threadId); openSockets = (ArrayList) openSocketLookupTable.get(thread);
synchronized (openSockets) { synchronized (openSockets) {
openSockets.remove(closedSocket); openSockets.remove(closedSocket);
if (openSockets.size() == 0) { if (openSockets.size() == 0) {
openSocketLookupTable.remove(threadId); openSocketLookupTable.remove(thread);
} }
} }
} }
@ -1185,8 +1183,8 @@ do upload
* @return the list of open sockets * @return the list of open sockets
*/ */
public static ArrayList getRegisteredOpenSockets() { public static ArrayList getRegisteredOpenSockets() {
Long currentThreadId = new Long(Thread.currentThread().getId()); Thread currentThread = Thread.currentThread();
return getRegisteredOpenSockets(currentThreadId); return getRegisteredOpenSockets(currentThread);
} }
/** /**
@ -1195,11 +1193,11 @@ do upload
* @param threadId the thread id of the owner thread * @param threadId the thread id of the owner thread
* @return the list of open sockets * @return the list of open sockets
*/ */
public static ArrayList getRegisteredOpenSockets(Long threadId) { public static ArrayList getRegisteredOpenSockets(Thread thread) {
synchronized (openSocketLookupTable) { synchronized (openSocketLookupTable) {
ArrayList openSockets = null; ArrayList openSockets = null;
if (openSocketLookupTable.containsKey(threadId)) { if (openSocketLookupTable.containsKey(thread)) {
openSockets = (ArrayList) openSocketLookupTable.get(threadId); openSockets = (ArrayList) openSocketLookupTable.get(thread);
} else { } else {
openSockets = new ArrayList(0); openSockets = new ArrayList(0);
} }

@ -524,14 +524,14 @@ public final class serverCore extends serverAbstractThread implements serverThre
// if there are some sessions that are blocking in IO, we simply close the socket // if there are some sessions that are blocking in IO, we simply close the socket
for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) {
serverCore.this.log.logInfo("Trying to shutdown session thread '" + threadList[currentThreadIdx].getName() + "' [ID=" + threadList[currentThreadIdx].getId() + "]."); serverCore.this.log.logInfo("Trying to shutdown session thread '" + threadList[currentThreadIdx].getName() + "'.");
((Session)threadList[currentThreadIdx]).close(); ((Session)threadList[currentThreadIdx]).close();
} }
// we need to use a timeout here because of missing interruptable session threads ... // we need to use a timeout here because of missing interruptable session threads ...
for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) {
if (threadList[currentThreadIdx].isAlive()) { if (threadList[currentThreadIdx].isAlive()) {
serverCore.this.log.logDebug("Waiting for session thread '" + threadList[currentThreadIdx].getName() + "' [ID=" + threadList[currentThreadIdx].getId() + "] to finish shutdown."); serverCore.this.log.logDebug("Waiting for session thread '" + threadList[currentThreadIdx].getName() + "' to finish shutdown.");
try { try {
threadList[currentThreadIdx].join(500); threadList[currentThreadIdx].join(500);
} catch (Exception ex) {} } catch (Exception ex) {}
@ -663,7 +663,7 @@ public final class serverCore extends serverAbstractThread implements serverThre
if (this.isAlive()) { if (this.isAlive()) {
try { try {
// trying to close all still open httpc-Sockets first // trying to close all still open httpc-Sockets first
int closedSockets = httpc.closeOpenSockets(new Long(this.getId())); int closedSockets = httpc.closeOpenSockets(this);
if (closedSockets > 0) { if (closedSockets > 0) {
serverCore.this.log.logInfo(closedSockets + " http-client sockets of thread '" + this.getName() + "' closed."); serverCore.this.log.logInfo(closedSockets + " http-client sockets of thread '" + this.getName() + "' closed.");
} }

@ -468,37 +468,36 @@ public class yacyCore {
return 0; return 0;
} catch (InterruptedException e) { } catch (InterruptedException e) {
try { try {
log.logInfo("publish: Interruption detected while publishing my seed."); log.logInfo("publish: Interruption detected while publishing my seed.");
// consuming the theads interrupted signal // consuming the theads interrupted signal
Thread.interrupted(); Thread.interrupted();
// interrupt all already started publishThreads // interrupt all already started publishThreads
log.logInfo("publish: Signaling shutdown to all remaining publishing threads ..."); log.logInfo("publish: Signaling shutdown to " + yacyCore.publishThreadGroup.activeCount() + " remaining publishing threads ...");
yacyCore.publishThreadGroup.interrupt(); yacyCore.publishThreadGroup.interrupt();
// waiting some time for the publishThreads to finish execution // waiting some time for the publishThreads to finish execution
Thread.sleep(500); try { Thread.sleep(500); } catch (Exception ex) {}
int threadCount = yacyCore.publishThreadGroup.activeCount(); int threadCount = yacyCore.publishThreadGroup.activeCount();
Thread[] threadList = new Thread[threadCount]; Thread[] threadList = new Thread[threadCount];
threadCount = yacyCore.publishThreadGroup.enumerate(threadList); threadCount = yacyCore.publishThreadGroup.enumerate(threadList);
// we need to use a timeout here because of missing interruptable session threads ... // we need to use a timeout here because of missing interruptable session threads ...
for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) {
Thread currentThread = threadList[currentThreadIdx]; Thread currentThread = threadList[currentThreadIdx];
Long currentThreadID = new Long(currentThread.getId());
if (currentThread.isAlive()) { if (currentThread.isAlive()) {
log.logInfo("publish: Closing socket of publishing thread '" + threadList[currentThreadIdx].getName() + "'."); log.logInfo("publish: Closing socket of publishing thread '" + threadList[currentThreadIdx].getName() + "'.");
httpc.closeOpenSockets(currentThreadID); httpc.closeOpenSockets(currentThread);
log.logInfo("publish: Waiting for remaining publishing thread '" + threadList[currentThreadIdx].getName() + "' to finish shutdown"); log.logInfo("publish: Waiting for remaining publishing thread '" + threadList[currentThreadIdx].getName() + "' to finish shutdown");
try { threadList[currentThreadIdx].join(500); }catch (Exception ex) {} try { threadList[currentThreadIdx].join(500); }catch (Exception ex) {}
} }
} }
} }
catch (InterruptedException ee) { catch (Exception ee) {
log.logWarning("publish: Interruption while trying to shutdown all remaining publishing threads."); log.logWarning("publish: Interruption while trying to shutdown all remaining publishing threads.");
} }

Loading…
Cancel
Save