From a995b953678955f75da6f6abdf5d7423175ffc2c Mon Sep 17 00:00:00 2001 From: orbiter Date: Wed, 30 Sep 2009 13:18:02 +0000 Subject: [PATCH] tried a fix for the httpd access bug (too many unclosed sessions) git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6362 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- htroot/Connections_p.java | 68 +++++-------------- source/de/anomic/http/server/HTTPDemon.java | 2 +- .../anomic/kelondro/text/AbstractIndex.java | 10 +-- .../de/anomic/kelondro/text/TermSearch.java | 8 +-- .../anomic/server/serverAbstractSwitch.java | 57 ++++++++++++++++ source/de/anomic/server/serverCore.java | 63 +++++++++-------- 6 files changed, 118 insertions(+), 90 deletions(-) diff --git a/htroot/Connections_p.java b/htroot/Connections_p.java index 06568f90e..8cd043d48 100644 --- a/htroot/Connections_p.java +++ b/htroot/Connections_p.java @@ -57,7 +57,7 @@ public final class Connections_p { // get the virtualHost string - final String virtualHost = sb.getConfig("fileHost","localhost"); + final String virtualHost = sb.getConfig("fileHost", "localhost"); // server sessions @@ -65,9 +65,9 @@ public final class Connections_p { final serverThread httpd = sb.getThread("10_httpd"); /* waiting for all threads to finish */ - int threadCount = serverCore.sessionThreadGroup.activeCount(); + int count = serverCore.sessionThreadGroup.activeCount(); final Thread[] threadList = new Thread[((serverCore) httpd).getJobCount()]; - threadCount = serverCore.sessionThreadGroup.enumerate(threadList); + count = serverCore.sessionThreadGroup.enumerate(threadList); // determines if name lookup should be done or not boolean doNameLookup = false; @@ -76,39 +76,8 @@ public final class Connections_p { doNameLookup = true; } if (post.containsKey("closeServerSession")) { - final String sessionName = post.get("closeServerSession",null); - if (sessionName != null) { - for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { - final Thread currentThread = threadList[currentThreadIdx]; - if ( - (currentThread != null) && - (currentThread instanceof serverCore.Session) && - (currentThread.isAlive()) && - (currentThread.getName().equals(sessionName)) - ){ - // trying to gracefull stop session - ((Session)currentThread).setStopped(true); - try { Thread.sleep(100); } catch (final InterruptedException ex) {} - - // trying to interrupt session - if (currentThread.isAlive()) { - currentThread.interrupt(); - try { Thread.sleep(100); } catch (final InterruptedException ex) {} - } - - // trying to close socket - if (currentThread.isAlive()) { - ((Session)currentThread).close(); - } - - // waiting for session to finish - if (currentThread.isAlive()) { - try { currentThread.join(500); } catch (final InterruptedException ex) {} - } - break; - } - } - } + final String sessionName = post.get("closeServerSession", null); + sb.closeSessions("10_httpd", sessionName); prop.put("LOCATION",""); return prop; } @@ -116,31 +85,31 @@ public final class Connections_p { int idx = 0, numActiveRunning = 0, numActivePending = 0; boolean dark = true; - for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { - final Thread currentThread = threadList[currentThreadIdx]; - if ((currentThread != null) && (currentThread instanceof serverCore.Session) && (currentThread.isAlive())) { + for ( int currentThreadIdx = 0; currentThreadIdx < count; currentThreadIdx++ ) { + final Thread t = threadList[currentThreadIdx]; + if ((t != null) && (t instanceof serverCore.Session) && (t.isAlive())) { // getting the session object - final Session currentSession = ((Session) currentThread); + final Session s = ((Session) t); // getting the session runtime - final long sessionTime = currentSession.getTime(); + final long sessionTime = s.getTime(); // getting the request command line boolean blockingRequest = false; - String commandLine = currentSession.getCommandLine(); + String commandLine = s.getCommandLine(); if (commandLine == null) blockingRequest = true; - final int commandCount = currentSession.getCommandCount(); + final int commandCount = s.getCommandCount(); // getting the source ip address and port - final InetAddress userAddress = currentSession.getUserAddress(); - final int userPort = currentSession.getUserPort(); + final InetAddress userAddress = s.getUserAddress(); + final int userPort = s.getUserPort(); if (userAddress == null) continue; - final boolean isSSL = currentSession.isSSL(); + final boolean isSSL = s.isSSL(); String dest = null; String prot = null; - final serverHandler cmdObj = currentSession.getCommandObj(); + final serverHandler cmdObj = s.getCommandObj(); if (cmdObj instanceof HTTPDemon) { prot = isSSL ? "https":"http"; @@ -172,12 +141,12 @@ public final class Connections_p { prop.put("list_" + idx + "_dark", dark ? "1" : "0"); dark=!dark; try { - prop.put("list_" + idx + "_serverSessionID",URLEncoder.encode(currentSession.getName(),"UTF8")); + prop.put("list_" + idx + "_serverSessionID",URLEncoder.encode(s.getName(),"UTF8")); } catch (final UnsupportedEncodingException e) { // TODO Auto-generated catch block e.printStackTrace(); } - prop.putHTML("list_" + idx + "_sessionName", currentSession.getName()); + prop.putHTML("list_" + idx + "_sessionName", s.getName()); prop.put("list_" + idx + "_proto", prot); if (sessionTime > 1000*60) { prop.put("list_" + idx + "_ms", "0"); @@ -207,7 +176,6 @@ public final class Connections_p { prop.putNum("numActiveRunning", numActiveRunning); prop.putNum("numActivePending", numActivePending); - // client sessions final Set allConnections = ConnectionInfo.getAllConnections(); // TODO sorting diff --git a/source/de/anomic/http/server/HTTPDemon.java b/source/de/anomic/http/server/HTTPDemon.java index 4d75e63c8..c596f5207 100644 --- a/source/de/anomic/http/server/HTTPDemon.java +++ b/source/de/anomic/http/server/HTTPDemon.java @@ -209,7 +209,7 @@ public final class HTTPDemon implements serverHandler, Cloneable { // check if we want to allow this socket to connect us if (!(this.allowProxy || this.allowServer || this.allowYaCyHop)) { - final String errorMsg = "CONNECTION FROM " + this.userAddress.getHostName() + " [" + this.clientIP + "] FORBIDDEN"; + final String errorMsg = "CONNECTION FROM " + this.clientIP + " FORBIDDEN"; log.logWarning(errorMsg); throw new IOException(errorMsg); } diff --git a/source/de/anomic/kelondro/text/AbstractIndex.java b/source/de/anomic/kelondro/text/AbstractIndex.java index a0a0840cc..083e750cd 100644 --- a/source/de/anomic/kelondro/text/AbstractIndex.java +++ b/source/de/anomic/kelondro/text/AbstractIndex.java @@ -82,7 +82,8 @@ public abstract class AbstractIndex implements // methods to search in the index /** - * collect containers for given word hashes. This collection stops if a single container does not contain any references. + * collect containers for given word hashes. + * This collection stops if a single container does not contain any references. * In that case only a empty result is returned. * @param wordHashes * @param urlselection @@ -124,8 +125,8 @@ public abstract class AbstractIndex implements /** * collect containers for given word hashes and join them as they are retrieved. - * This collection stops if a single container does not contain any references or the current result - * of the container join results in an empty container. + * This collection stops if a single container does not contain any references + * or the current result of the container join results in an empty container. * In any fail case only a empty result container is returned. * @param wordHashes * @param urlselection @@ -133,7 +134,8 @@ public abstract class AbstractIndex implements * @return ReferenceContainer the join result */ public ReferenceContainer searchJoin(final TreeSet wordHashes, final Set urlselection, int maxDistance) { - // first check if there is any entry that has no match; this uses only operations in ram + // first check if there is any entry that has no match; + // this uses only operations in ram for (byte[] wordHash: wordHashes) { if (!this.has(wordHash)) return ReferenceContainer.emptyContainer(factory, null, 0); } diff --git a/source/de/anomic/kelondro/text/TermSearch.java b/source/de/anomic/kelondro/text/TermSearch.java index 749564281..75cd013f8 100644 --- a/source/de/anomic/kelondro/text/TermSearch.java +++ b/source/de/anomic/kelondro/text/TermSearch.java @@ -34,7 +34,7 @@ import java.util.TreeSet; public class TermSearch { private ReferenceContainer joinResult; - HashMap> inclusionContainers, exclusionContainers; + HashMap> inclusionContainers; public TermSearch( Index base, @@ -53,7 +53,7 @@ public class TermSearch { (inclusionContainers.size() < queryHashes.size())) inclusionContainers = new HashMap>(0); // prevent that only a subset is returned - this.exclusionContainers = + HashMap> exclusionContainers = (inclusionContainers.size() == 0) ? new HashMap>(0) : base.searchConjunction(excludeHashes, urlselection); @@ -74,8 +74,4 @@ public class TermSearch { return this.inclusionContainers; } - public HashMap> exclusion() { - return this.exclusionContainers; - } - } diff --git a/source/de/anomic/server/serverAbstractSwitch.java b/source/de/anomic/server/serverAbstractSwitch.java index dbd52ab3d..71170f41b 100644 --- a/source/de/anomic/server/serverAbstractSwitch.java +++ b/source/de/anomic/server/serverAbstractSwitch.java @@ -24,6 +24,7 @@ package de.anomic.server; import java.io.File; import java.io.IOException; import java.net.InetAddress; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -31,6 +32,7 @@ import java.util.SortedMap; import java.util.TreeMap; import de.anomic.kelondro.util.FileUtils; +import de.anomic.server.serverCore.Session; import de.anomic.yacy.logging.Log; public abstract class serverAbstractSwitch implements serverSwitch { @@ -409,6 +411,61 @@ public abstract class serverAbstractSwitch implements serverSwitch { } } + public String[] sessionsOlderThan(String threadName, long timeout) { + ArrayList list = new ArrayList(); + final serverThread st = getThread(threadName); + final Thread[] threadList = new Thread[((serverCore) st).getJobCount()]; + serverCore.sessionThreadGroup.enumerate(threadList); + + for (Thread t: threadList) { + if (t == null) continue; + if (!(t instanceof serverCore.Session)) continue; + if (!t.isAlive()) continue; + if (t == null) continue; + final Session s = (Session) t; + if (s.getTime() > timeout) { + list.add(s.getName()); + } + } + return (String[]) list.toArray(); + } + + public void closeSessions(String threadName, String sessionName) { + if (sessionName == null) return; + final serverThread st = getThread(threadName); + final Thread[] threadList = new Thread[((serverCore) st).getJobCount()]; + serverCore.sessionThreadGroup.enumerate(threadList); + + for (Thread t: threadList) { + if ( + (t != null) && + (t instanceof serverCore.Session) && + (t.isAlive()) && + (t.getName().equals(sessionName)) + ) { + // try to stop session + ((Session)t).setStopped(true); + try { Thread.sleep(100); } catch (final InterruptedException ex) {} + + // try to interrupt session + if (t.isAlive()) { + t.interrupt(); + try { Thread.sleep(100); } catch (final InterruptedException ex) {} + } + + // try to close socket + if (t.isAlive()) { + ((Session)t).close(); + } + + // wait for session to finish + if (t.isAlive()) { + try { t.join(500); } catch (final InterruptedException ex) {} + } + } + } + } + public Iterator /*of serverThread-Names (String)*/ threadNames() { return workerThreads.keySet().iterator(); } diff --git a/source/de/anomic/server/serverCore.java b/source/de/anomic/server/serverCore.java index ed06214b3..154f4c204 100644 --- a/source/de/anomic/server/serverCore.java +++ b/source/de/anomic/server/serverCore.java @@ -138,30 +138,33 @@ public final class serverCore extends serverAbstractBusyThread implements server public final void terminateOldSessions(long minage) { if (System.currentTimeMillis() - lastAutoTermination < 30000) return; this.lastAutoTermination = System.currentTimeMillis(); + //if (serverCore.sessionThreadGroup.activeCount() < maxBusySessions - 10) return; // don't panic + final Thread[] threadList = new Thread[this.getJobCount()]; + serverCore.sessionThreadGroup.enumerate(threadList); - int threadCount = serverCore.sessionThreadGroup.activeCount(); - if (threadCount < maxBusySessions - 10) return; // don't panic - final Thread[] threadList = new Thread[this.getJobCount()]; - threadCount = serverCore.sessionThreadGroup.enumerate(threadList); - for (int threadIdx = 0; threadIdx < threadCount; threadIdx++ ) { - final Thread t = threadList[threadIdx]; - if (t != null && t instanceof Session && t.isAlive() && ((Session) t).getTime() > minage) { - this.log.logInfo("check for " + t.getName() + ": " + ((Session) t).getTime() + " ms alive, stopping thread"); - - // trying to stop session - ((Session) t).setStopped(true); - try { Thread.sleep(100); } catch (final InterruptedException ex) {} - - // trying to interrupt session - if (t.isAlive()) { - t.interrupt(); - try {Thread.sleep(10);} catch (final InterruptedException ex) {} - } - - // trying to close socket - if (t.isAlive()) { - ((Session) t).close(); - } + for (Thread t: threadList) { + if (t == null) continue; + if (!(t instanceof Session)) continue; + if (!t.isAlive()) continue; + Session s = (Session) t; + if (s.getTime() < minage) continue; + + // stop thread + this.log.logInfo("check for " + t.getName() + ": " + ((Session) t).getTime() + " ms alive, stopping thread"); + + // trying to stop session + s.setStopped(true); + try { Thread.sleep(100); } catch (final InterruptedException ex) {} + + // trying to interrupt session + if (t.isAlive()) { + t.interrupt(); + try {Thread.sleep(10);} catch (final InterruptedException ex) {} + } + + // trying to close socket + if (t.isAlive()) { + s.close(); } } } @@ -330,12 +333,14 @@ public final class serverCore extends serverAbstractBusyThread implements server terminateOldSessions(30000); } + /* if (this.busySessions.size() >= this.maxBusySessions) { // immediately close connection if too much sessions are still running this.log.logWarning("* connections (" + this.busySessions.size() + ") exceeding limit (" + this.maxBusySessions + "), closing new incoming connection from "+ controlSocket.getRemoteSocketAddress()); controlSocket.close(); return false; } + */ final String cIP = clientAddress(controlSocket); //System.out.println("server bfHosts=" + bfHost.toString()); @@ -642,13 +647,13 @@ public final class serverCore extends serverAbstractBusyThread implements server } } catch (final IOException e) { e.printStackTrace(); + } finally { + if (busySessions != null) { + busySessions.remove(this); + if (serverCore.this.log.isFinest()) serverCore.this.log.logFinest("* removed session "+ this.controlSocket.getRemoteSocketAddress() + " " + this.request); + } + this.controlSocket = null; } - if (busySessions != null) - { - busySessions.remove(this); - if(serverCore.this.log.isFinest()) serverCore.this.log.logFinest("* removed session "+ this.controlSocket.getRemoteSocketAddress() + " " + this.request); - } - this.controlSocket = null; } }