From 2f1ff048ba829768c83dfe1aaf329e75d1de549b Mon Sep 17 00:00:00 2001 From: orbiter Date: Tue, 25 Sep 2007 23:45:05 +0000 Subject: [PATCH] some fixes to socket connection time-out git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4111 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- htroot/Connections_p.java | 23 +- source/de/anomic/http/httpc.java | 244 ++++-------------- .../plasma/crawler/http/CrawlWorker.java | 8 +- .../de/anomic/plasma/plasmaCrawlStacker.java | 7 +- source/de/anomic/server/serverCore.java | 7 - source/de/anomic/yacy/yacyCore.java | 4 +- 6 files changed, 61 insertions(+), 232 deletions(-) diff --git a/htroot/Connections_p.java b/htroot/Connections_p.java index ba80e85a7..34d6cc1ad 100644 --- a/htroot/Connections_p.java +++ b/htroot/Connections_p.java @@ -49,7 +49,6 @@ import java.io.UnsupportedEncodingException; import java.net.InetAddress; import java.net.URLEncoder; -import java.util.Iterator; import java.util.Properties; import org.apache.commons.pool.impl.GenericObjectPool; @@ -172,7 +171,6 @@ public final class Connections_p { if (cmdObj instanceof httpd) { prot = isSSL ? "https":"http"; - // getting the http command object httpd currentHttpd = (httpd)cmdObj; @@ -240,18 +238,19 @@ public final class Connections_p { prop.put("numActiveRunning",Integer.toString(numActiveRunning)); prop.put("numActivePending",Integer.toString(numActivePending)); - // client sessions - Iterator i = httpc.activeConnections.iterator(); + httpc[] a = httpc.allConnections(); int c = 0; - while (i.hasNext()) { - httpc clientConnection = (httpc) i.next(); - prop.put("clientList_" + c + "_clientProtocol", (clientConnection.ssl) ? "HTTPS" : "HTTP"); - prop.put("clientList_" + c + "_clientLifetime", System.currentTimeMillis() - clientConnection.initTime); - prop.put("clientList_" + c + "_clientTargetHost", clientConnection.adressed_host + ":" + clientConnection.adressed_port); - prop.put("clientList_" + c + "_clientCommand", (clientConnection.command == null) ? "-" : clientConnection.command); - prop.put("clientList_" + c + "_clientID", clientConnection.hashCode()); - c++; + for (int i = 0; i < a.length; i++) { + httpc clientConnection = (httpc) a[i]; + if (clientConnection != null) { + prop.put("clientList_" + c + "_clientProtocol", (clientConnection.ssl) ? "HTTPS" : "HTTP"); + prop.put("clientList_" + c + "_clientLifetime", System.currentTimeMillis() - clientConnection.initTime); + prop.put("clientList_" + c + "_clientTargetHost", clientConnection.adressed_host + ":" + clientConnection.adressed_port); + prop.put("clientList_" + c + "_clientCommand", (clientConnection.command == null) ? "-" : clientConnection.command); + prop.put("clientList_" + c + "_clientID", clientConnection.hashCode()); + c++; + } } prop.put("clientList", c); prop.put("clientActive", c); diff --git a/source/de/anomic/http/httpc.java b/source/de/anomic/http/httpc.java index 946f3f9de..46a555447 100644 --- a/source/de/anomic/http/httpc.java +++ b/source/de/anomic/http/httpc.java @@ -108,9 +108,8 @@ public final class httpc { private static final TimeZone GMTTimeZone = TimeZone.getTimeZone("GMT"); private static final SimpleDateFormat HTTPGMTFormatter = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz", Locale.US); private static final HashMap reverseMappingCache = new HashMap(); - private static final HashMap openSocketLookupTable = new HashMap(); - public static final HashSet activeConnections = new HashSet(); // all connections are stored here and deleted when they are finished - public static int objCounter = 0; // will be increased with each object and is use to return a hash code + private static final HashSet activeConnections = new HashSet(); // all connections are stored here and deleted when they are finished + private static int objCounter = 0; // will be increased with each object and is use to return a hash code // defined during set-up of switchboard public static boolean yacyDebugMode = false; @@ -179,7 +178,6 @@ public final class httpc { // class variables private Socket socket = null; // client socket for commands - private Thread socketOwner = null; public String adressed_host = null; public int adressed_port = 80; private String target_virtual_host = null; @@ -233,20 +231,20 @@ public final class httpc { //System.out.println("*** DEBUG init httpc: " + activeConnections.size() + " connections online"); this.ssl = ssl; - this.initTime = System.currentTimeMillis(); + this.initTime = Long.MAX_VALUE; this.command = null; this.timeout = timeout; if ((theRemoteProxyConfig == null) || (!theRemoteProxyConfig.useProxy())) { initN( - server, + server, vhost, - port, - timeout, + port, + timeout, ssl, incomingByteCountAccounting, - outgoingByteCountAccounting + outgoingByteCountAccounting ); return; } @@ -358,7 +356,6 @@ public final class httpc { ) throws IOException { //serverLog.logDebug("HTTPC", handle + " initialized"); this.remoteProxyUse = false; - //this.timeout = timeout; try { if (port == -1) { @@ -386,10 +383,8 @@ public final class httpc { } // trying to establish a connection to the address + this.initTime = System.currentTimeMillis(); this.socket.connect(address, timeout); - - // registering the socket - this.socketOwner = this.registerOpenSocket(this.socket); // setting socket timeout and keep alive behaviour this.socket.setSoTimeout(timeout); // waiting time for read @@ -412,10 +407,10 @@ public final class httpc { // if we reached this point, we should have a connection } catch (UnknownHostException e) { if (this.socket != null) { - httpc.unregisterOpenSocket(this.socket,this.socketOwner); + // no need to track this, the socket cannot be established + synchronized (activeConnections) {activeConnections.remove(this);} } this.socket = null; - this.socketOwner = null; throw new IOException("unknown host: " + server); } } @@ -429,31 +424,49 @@ public final class httpc { } public static int checkIdleConnections() { - // try to find and close all connections that did not find a target server and are idle waiting for a server socket - Iterator i = httpc.activeConnections.iterator(); - int c = 0; - while (i.hasNext()) { - httpc clientConnection = (httpc) i.next(); - if ((clientConnection.command == null) && (clientConnection.initTime + clientConnection.timeout > System.currentTimeMillis())) { - // the time-out limit is reached. close the connection - clientConnection.close(); - c++; - } - } - return c; + // try to find and close all connections that did not find a target server and are idle waiting for a server socket + + httpc[] a = allConnections(); // put set into array to avoid ConcurrentModificationExceptions + int c = 0; + for (int i = 0; i < a.length; i++) { + httpc clientConnection = a[i]; + if ((clientConnection != null) && + (clientConnection.initTime != Long.MAX_VALUE) && + (clientConnection.initTime + Math.max(60000, clientConnection.timeout) < System.currentTimeMillis())) { + // the time-out limit is reached. close the connection + clientConnection.close(); + c++; + } + } + return c; } public static int closeAllConnections() { - Iterator i = httpc.activeConnections.iterator(); - int c = 0; - while (i.hasNext()) { - httpc clientConnection = (httpc) i.next(); - clientConnection.close(); - c++; + httpc[] a = allConnections(); // put set into array to avoid ConcurrentModificationExceptions + int c = 0; + for (int i = 0; i < a.length; i++) { + httpc clientConnection = a[i]; + if (clientConnection != null) { + clientConnection.close(); + c++; + } } return c; } + public static httpc[] allConnections() { + httpc[] a = null; + synchronized (activeConnections) { + a = new httpc[activeConnections.size()]; + Iterator i = httpc.activeConnections.iterator(); + int c = 0; + while (i.hasNext()) { + a[c++] = (httpc) i.next(); + } + } + return a; + } + public void finalize() { this.close(); } @@ -474,10 +487,8 @@ public final class httpc { if (this.socket != null) { try {this.socket.close();} catch (Exception e) {} - httpc.unregisterOpenSocket(this.socket, this.socketOwner); this.socket = null; } - this.socketOwner = null; if (this.clientInputByteCount != null) { this.clientInputByteCount.finish(); @@ -1134,169 +1145,6 @@ public final class httpc { while (i.hasNext()) System.out.println((String) i.next()); } - /** - * To register an open socket. - * This adds the socket to the list of open sockets where the current thread - * is the owner. - * @param openedSocket the socket that should be registered - * @return the id of the current thread - */ - private Thread registerOpenSocket(Socket openedSocket) { - Thread currentThread = Thread.currentThread(); - synchronized (openSocketLookupTable) { - ArrayList openSockets = null; - if (openSocketLookupTable.containsKey(currentThread)) { - openSockets = (ArrayList) openSocketLookupTable.get(currentThread); - } else { - openSockets = new ArrayList(1); - openSocketLookupTable.put(currentThread,openSockets); - } - synchronized (openSockets) { - openSockets.add(openedSocket); - } - return currentThread; - } - } - - /** - * Closing all sockets that were opened in the context of the thread - * with the given thread id - * @param threadId - */ - public static int closeOpenSockets(Thread thread) { - - // getting all still opened sockets - ArrayList openSockets = (ArrayList) httpc.getRegisteredOpenSockets(thread).clone(); - int closedSocketCount = 0; - - // looping through the list of sockets and close each one - for (int socketCount = 0; socketCount < openSockets.size(); socketCount++) { - Socket openSocket = (Socket) openSockets.get(0); - try { - // closing the socket - if (!openSocket.isClosed()) { - openSocket.close(); - closedSocketCount++; - } - // unregistering the socket - httpc.unregisterOpenSocket(openSocket,thread); - } catch (Exception ex) {} - } - - return closedSocketCount; - } - - /** - * Unregistering the socket. - * The socket will be removed from the list of sockets where the thread with the - * given thread id is the owner. - * @param closedSocket the socket that should be unregistered - * @param threadId the id of the owner thread - */ - public static void unregisterOpenSocket(Socket closedSocket, Thread thread) { - synchronized (openSocketLookupTable) { - ArrayList openSockets = null; - if (openSocketLookupTable.containsKey(thread)) { - openSockets = (ArrayList) openSocketLookupTable.get(thread); - synchronized (openSockets) { - openSockets.remove(closedSocket); - if (openSockets.size() == 0) { - openSocketLookupTable.remove(thread); - } - } - } - } - } - - /** - * Getting a list of open sockets where the current thread is - * the owner - * @return the list of open sockets - */ - public static ArrayList getRegisteredOpenSockets() { - Thread currentThread = Thread.currentThread(); - return getRegisteredOpenSockets(currentThread); - } - - /** - * Getting a list of open sockets where the thread with the given - * thread id is the owner - * @param threadId the thread id of the owner thread - * @return the list of open sockets - */ - public static ArrayList getRegisteredOpenSockets(Thread thread) { - synchronized (openSocketLookupTable) { - ArrayList openSockets = null; - if (openSocketLookupTable.containsKey(thread)) { - openSockets = (ArrayList) openSocketLookupTable.get(thread); - } else { - openSockets = new ArrayList(0); - } - return openSockets; - } - } - -// /** -// * This method outputs the input stream to either an output socket or an -// * file or both. If the length of the input stream is given in the -// * header, exactly that lenght is written. Otherwise the stream is -// * written, till it is closed. If this instance is zipped, stream the -// * input stream through gzip to unzip it on the fly. -// * -// * @param procOS OutputStream where the stream is to be written. If null -// * no write happens. -// * @param bufferOS OutputStream where the stream is to be written too. -// * If null no write happens. -// * @param clientInput InputStream where the content is to be read from. -// * @throws IOException -// */ -// private static void writeContentX(InputStream clientInput, boolean usegzip, long length, OutputStream procOS, OutputStream bufferOS) throws IOException { -// // we write length bytes, but if length == -1 (or < 0) then we -// // write until the input stream closes -// // procOS == null -> no write to procOS -// // file == null -> no write to file -// // If the Content-Encoding is gzip, we gunzip on-the-fly -// // and change the Content-Encoding and Content-Length attributes in the header -// byte[] buffer = new byte[2048]; -// int l; -// long len = 0; -// -// // using the proper intput stream -// InputStream dis = (usegzip) ? (InputStream) new GZIPInputStream(clientInput) : (InputStream) clientInput; -// -// // we have three methods of reading: length-based, length-based gzip and connection-close-based -// try { -// if (length > 0) { -// // we read exactly 'length' bytes -// while ((len < length) && ((l = dis.read(buffer)) >= 0)) { -// if (procOS != null) procOS.write(buffer, 0, l); -// if (bufferOS != null) bufferOS.write(buffer, 0, l); -// len += l; -// } -// } else { -// // no content-length was given, thus we read until the connection closes -// while ((l = dis.read(buffer, 0, buffer.length)) >= 0) { -// if (procOS != null) procOS.write(buffer, 0, l); -// if (bufferOS != null) bufferOS.write(buffer, 0, l); -// } -// } -// } catch (java.net.SocketException e) { -// throw new IOException("Socket exception: " + e.getMessage()); -// } catch (java.net.SocketTimeoutException e) { -// throw new IOException("Socket time-out: " + e.getMessage()); -// } finally { -// // close the streams -// if (procOS != null) { -// if (procOS instanceof httpChunkedOutputStream) -// ((httpChunkedOutputStream)procOS).finish(); -// procOS.flush(); -// } -// if (bufferOS != null) bufferOS.flush(); -// buffer = null; -// } -// } - - /** * Inner Class to get the response of an http-request and parse it. */ diff --git a/source/de/anomic/plasma/crawler/http/CrawlWorker.java b/source/de/anomic/plasma/crawler/http/CrawlWorker.java index 831835323..3df38b207 100644 --- a/source/de/anomic/plasma/crawler/http/CrawlWorker.java +++ b/source/de/anomic/plasma/crawler/http/CrawlWorker.java @@ -439,7 +439,7 @@ public final class CrawlWorker extends AbstractCrawlWorker { this.log.logSevere("CRAWLER No trusted certificate found for URL '" + this.url.toString() + "'. "); failreason = plasmaCrawlEURL.DENIED_SSL_UNTRUSTED_CERT; } else { - this.log.logSevere("CRAWLER Unexpected Error with URL '" + this.url.toString() + "': " + e.toString(),e); + this.log.logSevere("CRAWLER Unexpected Error with URL '" + this.url.toString() + "': " + e.toString(), e); failreason = plasmaCrawlEURL.DENIED_CONNECTION_ERROR; } @@ -467,11 +467,7 @@ public final class CrawlWorker extends AbstractCrawlWorker { public void close() { if (this.isAlive()) { try { - // trying to close all still open httpc-Sockets first - int closedSockets = httpc.closeOpenSockets(this); - if (closedSockets > 0) { - this.log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed."); - } + // TODO: this object should care of all open clien connections within this class and close them here } catch (Exception e) {/* ignore this. shutdown in progress */} } } diff --git a/source/de/anomic/plasma/plasmaCrawlStacker.java b/source/de/anomic/plasma/plasmaCrawlStacker.java index 5d2fcfc79..3cb9517df 100644 --- a/source/de/anomic/plasma/plasmaCrawlStacker.java +++ b/source/de/anomic/plasma/plasmaCrawlStacker.java @@ -57,7 +57,6 @@ import java.util.LinkedList; import org.apache.commons.pool.impl.GenericObjectPool; import de.anomic.data.robotsParser; -import de.anomic.http.httpc; import de.anomic.index.indexURLEntry; import de.anomic.kelondro.kelondroCache; import de.anomic.kelondro.kelondroException; @@ -783,11 +782,7 @@ public final class plasmaCrawlStacker { public void close() { if (this.isAlive()) { try { - // trying to close all still open httpc-Sockets first - int closedSockets = httpc.closeOpenSockets(this); - if (closedSockets > 0) { - log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed."); - } + // TODO: this object should care of all open clien connections within this class and close them here } catch (Exception e) {} } } diff --git a/source/de/anomic/server/serverCore.java b/source/de/anomic/server/serverCore.java index effdf4886..bbb9fd929 100644 --- a/source/de/anomic/server/serverCore.java +++ b/source/de/anomic/server/serverCore.java @@ -76,7 +76,6 @@ import javax.net.ssl.SSLSocketFactory; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.commons.pool.impl.GenericObjectPool.Config; -import de.anomic.http.httpc; import de.anomic.icap.icapd; import de.anomic.server.logging.serverLog; import de.anomic.server.portForwarding.serverPortForwarding; @@ -796,12 +795,6 @@ public final class serverCore extends serverAbstractThread implements serverThre public void close() { if (this.isAlive()) { try { - // trying to close all still open httpc-Sockets first - int closedSockets = httpc.closeOpenSockets(this); - if (closedSockets > 0) { - serverCore.this.log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed."); - } - // closing the socket to the client if ((this.controlSocket != null)&&(this.controlSocket.isConnected())) { this.controlSocket.close(); diff --git a/source/de/anomic/yacy/yacyCore.java b/source/de/anomic/yacy/yacyCore.java index a47a00ffc..a7ee2c15d 100644 --- a/source/de/anomic/yacy/yacyCore.java +++ b/source/de/anomic/yacy/yacyCore.java @@ -69,7 +69,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import de.anomic.http.httpc; import de.anomic.plasma.plasmaSwitchboard; import de.anomic.server.serverCore; import de.anomic.server.serverSemaphore; @@ -609,8 +608,7 @@ public class yacyCore { Thread currentThread = threadList[currentThreadIdx]; if (currentThread.isAlive()) { - log.logFine("publish: Closing socket of publishing thread '" + currentThread.getName() + "' [" + currentThreadIdx + "]."); - httpc.closeOpenSockets(currentThread); + // TODO: this object should care of all open clien connections within this class and close them here } }