some fixes to socket connection time-out

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4111 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 18 years ago
parent 3c74014004
commit 2f1ff048ba

@ -49,7 +49,6 @@
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.URLEncoder;
import java.util.Iterator;
import java.util.Properties;
import org.apache.commons.pool.impl.GenericObjectPool;
@ -172,7 +171,6 @@ public final class Connections_p {
if (cmdObj instanceof httpd) {
prot = isSSL ? "https":"http";
// getting the http command object
httpd currentHttpd = (httpd)cmdObj;
@ -240,18 +238,19 @@ public final class Connections_p {
prop.put("numActiveRunning",Integer.toString(numActiveRunning));
prop.put("numActivePending",Integer.toString(numActivePending));
// client sessions
Iterator i = httpc.activeConnections.iterator();
httpc[] a = httpc.allConnections();
int c = 0;
while (i.hasNext()) {
httpc clientConnection = (httpc) i.next();
prop.put("clientList_" + c + "_clientProtocol", (clientConnection.ssl) ? "HTTPS" : "HTTP");
prop.put("clientList_" + c + "_clientLifetime", System.currentTimeMillis() - clientConnection.initTime);
prop.put("clientList_" + c + "_clientTargetHost", clientConnection.adressed_host + ":" + clientConnection.adressed_port);
prop.put("clientList_" + c + "_clientCommand", (clientConnection.command == null) ? "-" : clientConnection.command);
prop.put("clientList_" + c + "_clientID", clientConnection.hashCode());
c++;
for (int i = 0; i < a.length; i++) {
httpc clientConnection = (httpc) a[i];
if (clientConnection != null) {
prop.put("clientList_" + c + "_clientProtocol", (clientConnection.ssl) ? "HTTPS" : "HTTP");
prop.put("clientList_" + c + "_clientLifetime", System.currentTimeMillis() - clientConnection.initTime);
prop.put("clientList_" + c + "_clientTargetHost", clientConnection.adressed_host + ":" + clientConnection.adressed_port);
prop.put("clientList_" + c + "_clientCommand", (clientConnection.command == null) ? "-" : clientConnection.command);
prop.put("clientList_" + c + "_clientID", clientConnection.hashCode());
c++;
}
}
prop.put("clientList", c);
prop.put("clientActive", c);

@ -108,9 +108,8 @@ public final class httpc {
private static final TimeZone GMTTimeZone = TimeZone.getTimeZone("GMT");
private static final SimpleDateFormat HTTPGMTFormatter = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss zzz", Locale.US);
private static final HashMap reverseMappingCache = new HashMap();
private static final HashMap openSocketLookupTable = new HashMap();
public static final HashSet activeConnections = new HashSet(); // all connections are stored here and deleted when they are finished
public static int objCounter = 0; // will be increased with each object and is use to return a hash code
private static final HashSet activeConnections = new HashSet(); // all connections are stored here and deleted when they are finished
private static int objCounter = 0; // will be increased with each object and is use to return a hash code
// defined during set-up of switchboard
public static boolean yacyDebugMode = false;
@ -179,7 +178,6 @@ public final class httpc {
// class variables
private Socket socket = null; // client socket for commands
private Thread socketOwner = null;
public String adressed_host = null;
public int adressed_port = 80;
private String target_virtual_host = null;
@ -233,7 +231,7 @@ public final class httpc {
//System.out.println("*** DEBUG init httpc: " + activeConnections.size() + " connections online");
this.ssl = ssl;
this.initTime = System.currentTimeMillis();
this.initTime = Long.MAX_VALUE;
this.command = null;
this.timeout = timeout;
@ -358,7 +356,6 @@ public final class httpc {
) throws IOException {
//serverLog.logDebug("HTTPC", handle + " initialized");
this.remoteProxyUse = false;
//this.timeout = timeout;
try {
if (port == -1) {
@ -386,11 +383,9 @@ public final class httpc {
}
// trying to establish a connection to the address
this.initTime = System.currentTimeMillis();
this.socket.connect(address, timeout);
// registering the socket
this.socketOwner = this.registerOpenSocket(this.socket);
// setting socket timeout and keep alive behaviour
this.socket.setSoTimeout(timeout); // waiting time for read
//socket.setSoLinger(true, timeout);
@ -412,10 +407,10 @@ public final class httpc {
// if we reached this point, we should have a connection
} catch (UnknownHostException e) {
if (this.socket != null) {
httpc.unregisterOpenSocket(this.socket,this.socketOwner);
// no need to track this, the socket cannot be established
synchronized (activeConnections) {activeConnections.remove(this);}
}
this.socket = null;
this.socketOwner = null;
throw new IOException("unknown host: " + server);
}
}
@ -429,31 +424,49 @@ public final class httpc {
}
public static int checkIdleConnections() {
// try to find and close all connections that did not find a target server and are idle waiting for a server socket
Iterator i = httpc.activeConnections.iterator();
int c = 0;
while (i.hasNext()) {
httpc clientConnection = (httpc) i.next();
if ((clientConnection.command == null) && (clientConnection.initTime + clientConnection.timeout > System.currentTimeMillis())) {
// the time-out limit is reached. close the connection
clientConnection.close();
c++;
}
}
return c;
// try to find and close all connections that did not find a target server and are idle waiting for a server socket
httpc[] a = allConnections(); // put set into array to avoid ConcurrentModificationExceptions
int c = 0;
for (int i = 0; i < a.length; i++) {
httpc clientConnection = a[i];
if ((clientConnection != null) &&
(clientConnection.initTime != Long.MAX_VALUE) &&
(clientConnection.initTime + Math.max(60000, clientConnection.timeout) < System.currentTimeMillis())) {
// the time-out limit is reached. close the connection
clientConnection.close();
c++;
}
}
return c;
}
public static int closeAllConnections() {
Iterator i = httpc.activeConnections.iterator();
int c = 0;
while (i.hasNext()) {
httpc clientConnection = (httpc) i.next();
clientConnection.close();
c++;
httpc[] a = allConnections(); // put set into array to avoid ConcurrentModificationExceptions
int c = 0;
for (int i = 0; i < a.length; i++) {
httpc clientConnection = a[i];
if (clientConnection != null) {
clientConnection.close();
c++;
}
}
return c;
}
public static httpc[] allConnections() {
httpc[] a = null;
synchronized (activeConnections) {
a = new httpc[activeConnections.size()];
Iterator i = httpc.activeConnections.iterator();
int c = 0;
while (i.hasNext()) {
a[c++] = (httpc) i.next();
}
}
return a;
}
public void finalize() {
this.close();
}
@ -474,10 +487,8 @@ public final class httpc {
if (this.socket != null) {
try {this.socket.close();} catch (Exception e) {}
httpc.unregisterOpenSocket(this.socket, this.socketOwner);
this.socket = null;
}
this.socketOwner = null;
if (this.clientInputByteCount != null) {
this.clientInputByteCount.finish();
@ -1134,169 +1145,6 @@ public final class httpc {
while (i.hasNext()) System.out.println((String) i.next());
}
/**
* To register an open socket.
* This adds the socket to the list of open sockets where the current thread
* is the owner.
* @param openedSocket the socket that should be registered
* @return the id of the current thread
*/
private Thread registerOpenSocket(Socket openedSocket) {
Thread currentThread = Thread.currentThread();
synchronized (openSocketLookupTable) {
ArrayList openSockets = null;
if (openSocketLookupTable.containsKey(currentThread)) {
openSockets = (ArrayList) openSocketLookupTable.get(currentThread);
} else {
openSockets = new ArrayList(1);
openSocketLookupTable.put(currentThread,openSockets);
}
synchronized (openSockets) {
openSockets.add(openedSocket);
}
return currentThread;
}
}
/**
* Closing all sockets that were opened in the context of the thread
* with the given thread id
* @param threadId
*/
public static int closeOpenSockets(Thread thread) {
// getting all still opened sockets
ArrayList openSockets = (ArrayList) httpc.getRegisteredOpenSockets(thread).clone();
int closedSocketCount = 0;
// looping through the list of sockets and close each one
for (int socketCount = 0; socketCount < openSockets.size(); socketCount++) {
Socket openSocket = (Socket) openSockets.get(0);
try {
// closing the socket
if (!openSocket.isClosed()) {
openSocket.close();
closedSocketCount++;
}
// unregistering the socket
httpc.unregisterOpenSocket(openSocket,thread);
} catch (Exception ex) {}
}
return closedSocketCount;
}
/**
* Unregistering the socket.
* The socket will be removed from the list of sockets where the thread with the
* given thread id is the owner.
* @param closedSocket the socket that should be unregistered
* @param threadId the id of the owner thread
*/
public static void unregisterOpenSocket(Socket closedSocket, Thread thread) {
synchronized (openSocketLookupTable) {
ArrayList openSockets = null;
if (openSocketLookupTable.containsKey(thread)) {
openSockets = (ArrayList) openSocketLookupTable.get(thread);
synchronized (openSockets) {
openSockets.remove(closedSocket);
if (openSockets.size() == 0) {
openSocketLookupTable.remove(thread);
}
}
}
}
}
/**
* Getting a list of open sockets where the current thread is
* the owner
* @return the list of open sockets
*/
public static ArrayList getRegisteredOpenSockets() {
Thread currentThread = Thread.currentThread();
return getRegisteredOpenSockets(currentThread);
}
/**
* Getting a list of open sockets where the thread with the given
* thread id is the owner
* @param threadId the thread id of the owner thread
* @return the list of open sockets
*/
public static ArrayList getRegisteredOpenSockets(Thread thread) {
synchronized (openSocketLookupTable) {
ArrayList openSockets = null;
if (openSocketLookupTable.containsKey(thread)) {
openSockets = (ArrayList) openSocketLookupTable.get(thread);
} else {
openSockets = new ArrayList(0);
}
return openSockets;
}
}
// /**
// * This method outputs the input stream to either an output socket or an
// * file or both. If the length of the input stream is given in the
// * header, exactly that lenght is written. Otherwise the stream is
// * written, till it is closed. If this instance is zipped, stream the
// * input stream through gzip to unzip it on the fly.
// *
// * @param procOS OutputStream where the stream is to be written. If null
// * no write happens.
// * @param bufferOS OutputStream where the stream is to be written too.
// * If null no write happens.
// * @param clientInput InputStream where the content is to be read from.
// * @throws IOException
// */
// private static void writeContentX(InputStream clientInput, boolean usegzip, long length, OutputStream procOS, OutputStream bufferOS) throws IOException {
// // we write length bytes, but if length == -1 (or < 0) then we
// // write until the input stream closes
// // procOS == null -> no write to procOS
// // file == null -> no write to file
// // If the Content-Encoding is gzip, we gunzip on-the-fly
// // and change the Content-Encoding and Content-Length attributes in the header
// byte[] buffer = new byte[2048];
// int l;
// long len = 0;
//
// // using the proper intput stream
// InputStream dis = (usegzip) ? (InputStream) new GZIPInputStream(clientInput) : (InputStream) clientInput;
//
// // we have three methods of reading: length-based, length-based gzip and connection-close-based
// try {
// if (length > 0) {
// // we read exactly 'length' bytes
// while ((len < length) && ((l = dis.read(buffer)) >= 0)) {
// if (procOS != null) procOS.write(buffer, 0, l);
// if (bufferOS != null) bufferOS.write(buffer, 0, l);
// len += l;
// }
// } else {
// // no content-length was given, thus we read until the connection closes
// while ((l = dis.read(buffer, 0, buffer.length)) >= 0) {
// if (procOS != null) procOS.write(buffer, 0, l);
// if (bufferOS != null) bufferOS.write(buffer, 0, l);
// }
// }
// } catch (java.net.SocketException e) {
// throw new IOException("Socket exception: " + e.getMessage());
// } catch (java.net.SocketTimeoutException e) {
// throw new IOException("Socket time-out: " + e.getMessage());
// } finally {
// // close the streams
// if (procOS != null) {
// if (procOS instanceof httpChunkedOutputStream)
// ((httpChunkedOutputStream)procOS).finish();
// procOS.flush();
// }
// if (bufferOS != null) bufferOS.flush();
// buffer = null;
// }
// }
/**
* Inner Class to get the response of an http-request and parse it.
*/

@ -439,7 +439,7 @@ public final class CrawlWorker extends AbstractCrawlWorker {
this.log.logSevere("CRAWLER No trusted certificate found for URL '" + this.url.toString() + "'. ");
failreason = plasmaCrawlEURL.DENIED_SSL_UNTRUSTED_CERT;
} else {
this.log.logSevere("CRAWLER Unexpected Error with URL '" + this.url.toString() + "': " + e.toString(),e);
this.log.logSevere("CRAWLER Unexpected Error with URL '" + this.url.toString() + "': " + e.toString(), e);
failreason = plasmaCrawlEURL.DENIED_CONNECTION_ERROR;
}
@ -467,11 +467,7 @@ public final class CrawlWorker extends AbstractCrawlWorker {
public void close() {
if (this.isAlive()) {
try {
// trying to close all still open httpc-Sockets first
int closedSockets = httpc.closeOpenSockets(this);
if (closedSockets > 0) {
this.log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed.");
}
// TODO: this object should care of all open clien connections within this class and close them here
} catch (Exception e) {/* ignore this. shutdown in progress */}
}
}

@ -57,7 +57,6 @@ import java.util.LinkedList;
import org.apache.commons.pool.impl.GenericObjectPool;
import de.anomic.data.robotsParser;
import de.anomic.http.httpc;
import de.anomic.index.indexURLEntry;
import de.anomic.kelondro.kelondroCache;
import de.anomic.kelondro.kelondroException;
@ -783,11 +782,7 @@ public final class plasmaCrawlStacker {
public void close() {
if (this.isAlive()) {
try {
// trying to close all still open httpc-Sockets first
int closedSockets = httpc.closeOpenSockets(this);
if (closedSockets > 0) {
log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed.");
}
// TODO: this object should care of all open clien connections within this class and close them here
} catch (Exception e) {}
}
}

@ -76,7 +76,6 @@ import javax.net.ssl.SSLSocketFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool.Config;
import de.anomic.http.httpc;
import de.anomic.icap.icapd;
import de.anomic.server.logging.serverLog;
import de.anomic.server.portForwarding.serverPortForwarding;
@ -796,12 +795,6 @@ public final class serverCore extends serverAbstractThread implements serverThre
public void close() {
if (this.isAlive()) {
try {
// trying to close all still open httpc-Sockets first
int closedSockets = httpc.closeOpenSockets(this);
if (closedSockets > 0) {
serverCore.this.log.logInfo(closedSockets + " HTTP-client sockets of thread '" + this.getName() + "' closed.");
}
// closing the socket to the client
if ((this.controlSocket != null)&&(this.controlSocket.isConnected())) {
this.controlSocket.close();

@ -69,7 +69,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import de.anomic.http.httpc;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.serverCore;
import de.anomic.server.serverSemaphore;
@ -609,8 +608,7 @@ public class yacyCore {
Thread currentThread = threadList[currentThreadIdx];
if (currentThread.isAlive()) {
log.logFine("publish: Closing socket of publishing thread '" + currentThread.getName() + "' [" + currentThreadIdx + "].");
httpc.closeOpenSockets(currentThread);
// TODO: this object should care of all open clien connections within this class and close them here
}
}

Loading…
Cancel
Save