From af9cd67334ce877c9ee39b5b5dfe27efd34eefc2 Mon Sep 17 00:00:00 2001 From: theli Date: Thu, 9 Jun 2005 10:51:56 +0000 Subject: [PATCH] *) migration to Java NIO - to avoid buzy waiting and - make socket blocking interruptable *) changing reference to logger *) introduce commandObjMethodCache to improve performance *) doing a stream shutdown before closing the connection to aviod problems when using persistent connections *) calling method UNKNOWN of the server-handler class when receiving an unknown command *) calling method EMPTY of the server-handler class when receiving an empty command git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@254 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/de/anomic/server/serverCore.java | 266 +++++++++++------------- 1 file changed, 127 insertions(+), 139 deletions(-) diff --git a/source/de/anomic/server/serverCore.java b/source/de/anomic/server/serverCore.java index ba875917d..1da5ea5af 100644 --- a/source/de/anomic/server/serverCore.java +++ b/source/de/anomic/server/serverCore.java @@ -50,12 +50,22 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.PushbackInputStream; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.InetAddress; +import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.URL; import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; import java.security.KeyStore; +import java.util.HashMap; +import java.util.HashSet; import java.util.Hashtable; import javax.net.ServerSocketFactory; @@ -66,23 +76,27 @@ import javax.net.ssl.SSLServerSocketFactory; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.commons.pool.impl.GenericObjectPool.Config; -public final class serverCore extends serverAbstractThread implements serverThread { +import de.anomic.http.httpd; +import de.anomic.server.logging.serverLog; +public final class serverCore extends serverAbstractThread implements serverThread { + // generic input/output static methods public static final byte cr = 13; public static final byte lf = 10; public static final byte[] crlf = {cr, lf}; public static final String crlfString = new String(crlf); - + // static variables public static final Boolean TERMINATE_CONNECTION = Boolean.FALSE; public static final Boolean RESUME_CONNECTION = Boolean.TRUE; public static Hashtable bfHost = new Hashtable(); // for brute-force prevention - + // class variables private int port; // the listening port + private ServerSocketChannel channel; private ServerSocket socket; // listener - public int maxSessions = 0; // max. number of sessions; 0=unlimited + public int maxSessions = 0; // max. number of sessions; 0=unlimited serverLog log; // log object private int timeout; // connection time-out of the socket @@ -103,6 +117,7 @@ public final class serverCore extends serverAbstractThread implements serverThre final SessionPool theSessionPool; final ThreadGroup theSessionThreadGroup = new ThreadGroup("sessionThreadGroup"); private Config cralwerPoolConfig = null; + private Selector channelSel; private static ServerSocketFactory getServerSocketFactory(boolean dflt, File keyfile, String passphrase) { // see doc's at @@ -172,7 +187,14 @@ public final class serverCore extends serverAbstractThread implements serverThre try { - this.socket = new ServerSocket(port); + // Open a new server-socket channel + this.channel = ServerSocketChannel.open(); + + // Binds the ServerSocket to a specific address + this.socket = this.channel.socket(); + this.socket.bind(new InetSocketAddress(port)); + + // this.socket = new ServerSocket(port); } catch (java.net.BindException e) { System.out.println("FATAL ERROR: " + e.getMessage() + " - probably root access rights needed. check port number"); System.exit(0); } @@ -186,8 +208,6 @@ public final class serverCore extends serverAbstractThread implements serverThre this.timeout = timeout; this.termSleepingThreads = termSleepingThreads; this.log = new serverLog("SERVER", logl); -// activeThreads = new Hashtable(); -// sleepingThreads = new Hashtable(); } catch (java.lang.ClassNotFoundException e) { System.out.println("FATAL ERROR: " + e.getMessage() + " - Class Not Found"); System.exit(0); } @@ -307,46 +327,29 @@ public final class serverCore extends serverAbstractThread implements serverThre "* waiting for connections, " + this.theSessionPool.getNumActive() + " sessions running, " + this.theSessionPool.getNumIdle() + " sleeping"); - // list all connection (debug) - /* - if (activeThreads.size() > 0) { - Enumeration threadEnum = activeThreads.keys(); - Session se; - long time; - while (threadEnum.hasMoreElements()) { - se = (Session) threadEnum.nextElement(); - time = System.currentTimeMillis() - ((Long) activeThreads.get(se)).longValue(); - log.logDebug("* ACTIVE SESSION (" + ((se.isAlive()) ? "alive" : "dead") + ", " + time + "): " + se.request); - } - } - */ - // wait for new connection announceThreadBlockApply(); + Socket controlSocket = this.socket.accept(); + announceThreadBlockRelease(); String cIP = clientAddress(controlSocket); //System.out.println("server bfHosts=" + bfHost.toString()); if (bfHost.get(cIP) != null) { - log.logInfo("SLOWING DOWN ACCESS FOR BRUTE-FORCE PREVENTION FROM " + cIP); - // add a delay to make brute-force harder - try {Thread.currentThread().sleep(3000);} catch (InterruptedException e) {} - } + this.log.logInfo("SLOWING DOWN ACCESS FOR BRUTE-FORCE PREVENTION FROM " + cIP); + // add a delay to make brute-force harder + try {Thread.currentThread().sleep(3000);} catch (InterruptedException e) {} + } + if ((this.denyHost == null) || (this.denyHost.get(cIP) == null)) { controlSocket.setSoTimeout(this.timeout); Session connection = (Session) this.theSessionPool.borrowObject(); - connection.execute(controlSocket); + connection.execute(controlSocket,this.timeout); //log.logDebug("* NEW SESSION: " + connection.request + " from " + clientIP); } else { System.out.println("ACCESS FROM " + cIP + " DENIED"); } - // idle until number of maximal threads is (again) reached - //synchronized(this) { -// while ((maxSessions > 0) && (activeThreads.size() >= maxSessions)) try { -// log.logDebug("* Waiting for activeThreads=" + activeThreads.size() + " < maxSessions=" + maxSessions); -// Thread.currentThread().sleep(2000); -// idleThreadCheck(); -// } catch (InterruptedException e) {} + return true; } @@ -355,6 +358,10 @@ public final class serverCore extends serverAbstractThread implements serverThre // consuming the isInterrupted Flag. Otherwise we could not properly close the session pool Thread.interrupted(); + // closing the serverchannel and socket + this.socket.close(); + this.channel.close(); + // close the session pool this.theSessionPool.close(); } @@ -525,6 +532,8 @@ public final class serverCore extends serverAbstractThread implements serverThre private long start; // startup time private serverHandler commandObj; + private Hashtable commandObjMethodCache = new Hashtable(5); + private String request; // current command line private int commandCounter; // for logging: number of commands in this session private String identity; // a string that identifies the client (i.e. ftp: account name) @@ -533,8 +542,10 @@ public final class serverCore extends serverAbstractThread implements serverThre public InetAddress userAddress; // the address of the client public PushbackInputStream in; // on control input stream public OutputStream out; // on control output stream, autoflush + private int socketTimeout; private final serverByteBuffer readLineBuffer = new serverByteBuffer(256); + public Session(ThreadGroup theThreadGroup) { super(theThreadGroup,"Session"); @@ -544,11 +555,12 @@ public final class serverCore extends serverAbstractThread implements serverThre this.stopped = stopped; } - public void execute(Socket controlSocket) { - this.execute(controlSocket, null); + public void execute(Socket controlSocket, int socketTimeout) { + this.execute(controlSocket, socketTimeout, null); } - public synchronized void execute(Socket controlSocket, Object synObj) { + public synchronized void execute(Socket controlSocket, int socketTimeout, Object synObj) { + this.socketTimeout = socketTimeout; this.controlSocket = controlSocket; this.syncObject = synObj; this.done = false; @@ -588,7 +600,7 @@ public final class serverCore extends serverAbstractThread implements serverThre } public byte[] readLine() { - return receive(in, this.readLineBuffer, timeout, commandMaxLength, false); + return receive(in, this.readLineBuffer, commandMaxLength, false); } @@ -694,9 +706,8 @@ public final class serverCore extends serverAbstractThread implements serverThre if ((this.commandObj != null) && (this.commandObj.getClass().getName().equals(serverCore.this.handlerPrototype.getClass().getName()))) { this.commandObj.reset(); - } - else { - this.commandObj = (serverHandler) serverCore.this.handlerPrototype.clone(); + } else { + this.commandObj = (serverHandler) serverCore.this.handlerPrototype.clone(); } this.commandObj.initSession(this); @@ -706,11 +717,17 @@ public final class serverCore extends serverAbstractThread implements serverThre System.err.println("ERROR: (internal) " + e); } finally { try { - this.out.flush(); - // close everything - this.out.close(); - this.in.close(); - this.controlSocket.close(); + this.out.flush(); + + this.controlSocket.shutdownInput(); + this.controlSocket.shutdownOutput(); + + this.in.close(); + this.out.close(); + + // close everything + this.controlSocket.close(); this.controlSocket = null; + } catch (IOException e) { System.err.println("ERROR: (internal) " + e); } @@ -739,23 +756,42 @@ public final class serverCore extends serverAbstractThread implements serverThre String cmd; String tmp; Object[] stringParameter = new String[1]; - while ((this.in != null) && ((requestBytes = readLine()) != null)) { - commandCounter++; - request = new String(requestBytes); + while ((this.in != null) && ((requestBytes = readLine()) != null)) { + this.commandCounter++; + this.setName("Session_" + this.userAddress.getHostAddress() + ":" + this.controlSocket.getPort() + "#" + commandCounter); + + this.request = new String(requestBytes); //log.logDebug("* session " + handle + " received command '" + request + "'. time = " + (System.currentTimeMillis() - handle)); - log(false, request); + log(false, this.request); try { - pos = request.indexOf(' '); + // if we can not determine the proper command string we try to call function emptyRequest + // of the commandObject + if (this.request.trim().length() == 0) this.request = "EMPTY"; + + pos = this.request.indexOf(' '); if (pos < 0) { - cmd = request.trim().toUpperCase(); + cmd = this.request.trim().toUpperCase(); stringParameter[0] = ""; } else { - cmd = request.substring(0, pos).trim().toUpperCase(); - stringParameter[0] = request.substring(pos).trim(); + cmd = this.request.substring(0, pos).trim().toUpperCase(); + stringParameter[0] = this.request.substring(pos).trim(); } + // setting the socket timeout for reading of the request content + this.controlSocket.setSoTimeout(this.socketTimeout); + // exec command and return value - result = this.commandObj.getClass().getMethod(cmd, stringType).invoke(this.commandObj, stringParameter); + Object commandMethod = this.commandObjMethodCache.get(cmd); + if (commandMethod == null) { + try { + commandMethod = this.commandObj.getClass().getMethod(cmd, stringType); + this.commandObjMethodCache.put(cmd,commandMethod); + } catch (NoSuchMethodException noMethod) { + commandMethod = this.commandObj.getClass().getMethod("UNKNOWN", stringType); + stringParameter[0] = this.request.trim(); + } + } + result = ((Method)commandMethod).invoke(this.commandObj, stringParameter); //log.logDebug("* session " + handle + " completed command '" + request + "'. time = " + (System.currentTimeMillis() - handle)); this.out.flush(); if (result == null) { @@ -763,7 +799,9 @@ public final class serverCore extends serverAbstractThread implements serverThre log(2, true, "(NULL RETURNED/STREAM PASSED)"); */ } else if (result instanceof Boolean) { - if (((Boolean) result) == TERMINATE_CONNECTION) break; + if (((Boolean) result).equals(TERMINATE_CONNECTION)) break; + // deactivating timeout. this is needed because of persistent connections + this.controlSocket.setSoTimeout(0); } else if (result instanceof String) { if (((String) result).startsWith("!")) { result = ((String) result).substring(1); @@ -785,28 +823,28 @@ public final class serverCore extends serverAbstractThread implements serverThre System.out.println("ERROR A " + userAddress.getHostAddress()); // we extract a target exception and let the thread survive writeLine((String) commandObj.error(ite.getTargetException())); - } catch (NoSuchMethodException nsme) { - System.out.println("ERROR B " + userAddress.getHostAddress()); - if (isNotLocal(userAddress.getHostAddress().toString())) { - if (denyHost != null) - denyHost.put((""+userAddress.getHostAddress()), "deny"); // block client: hacker attempt - } - break; - // the client requested a command that does not exist - //Object[] errorParameter = { nsme }; - //writeLine((String) error.invoke(this.cmdObject, errorParameter)); - } catch (IllegalAccessException iae) { - System.out.println("ERROR C " + userAddress.getHostAddress()); - // wrong parameters: this an only be an internal problem - writeLine((String) commandObj.error(iae)); - } catch (java.lang.ClassCastException e) { - System.out.println("ERROR D " + userAddress.getHostAddress()); - // ?? - writeLine((String) commandObj.error(e)); - } catch (Exception e) { - System.out.println("ERROR E " + userAddress.getHostAddress()); - // whatever happens: the thread has to survive! - writeLine("UNKNOWN REASON:" + (String) commandObj.error(e)); + } catch (NoSuchMethodException nsme) { + System.out.println("ERROR B " + userAddress.getHostAddress()); + if (isNotLocal(userAddress.getHostAddress().toString())) { + if (denyHost != null) + denyHost.put((""+userAddress.getHostAddress()), "deny"); // block client: hacker attempt + } + break; + // the client requested a command that does not exist + //Object[] errorParameter = { nsme }; + //writeLine((String) error.invoke(this.cmdObject, errorParameter)); + } catch (IllegalAccessException iae) { + System.out.println("ERROR C " + userAddress.getHostAddress()); + // wrong parameters: this an only be an internal problem + writeLine((String) commandObj.error(iae)); + } catch (java.lang.ClassCastException e) { + System.out.println("ERROR D " + userAddress.getHostAddress()); + // ?? + writeLine((String) commandObj.error(e)); + } catch (Exception e) { + System.out.println("ERROR E " + userAddress.getHostAddress()); + // whatever happens: the thread has to survive! + writeLine("UNKNOWN REASON:" + (String) commandObj.error(e)); } } // end of while } catch (java.lang.ClassNotFoundException e) { @@ -819,82 +857,32 @@ public final class serverCore extends serverAbstractThread implements serverThre } - public static byte[] receive(PushbackInputStream pbis, serverByteBuffer readLineBuffer, long timeout, int maxSize, boolean logerr) { - - // this is essentially a readln on a PushbackInputStream - int bufferSize = 0; - bufferSize = 10; + public static byte[] receive(PushbackInputStream pbis, serverByteBuffer readLineBuffer, int maxSize, boolean logerr) { - // reuse an existing linebuffer or create a new one ... - if (readLineBuffer == null) { - readLineBuffer = new serverByteBuffer(256); - } else { - readLineBuffer.reset(); - } + // reuse an existing linebuffer + readLineBuffer.reset(); - - // TODO: we should remove this statements because calling the available function is very time consuming - // we better should use nio sockets instead because they are interruptable ... - try { - long t = timeout; - while (((bufferSize = pbis.available()) == 0) && (t > 0)) try { - Thread.currentThread().sleep(100); - t -= 100; - } catch (InterruptedException e) {} - if (t <= 0) { - if (logerr) serverLog.logError("SERVER", "receive interrupted - timeout"); - return null; - } - if (bufferSize == 0) { - if (logerr) serverLog.logError("SERVER", "receive interrupted - buffer empty"); - return null; - } - } catch (IOException e) { - if (logerr) serverLog.logError("SERVER", "receive interrupted - exception 1 = " + e.getMessage()); - return null; - } - - // byte[] buffer = new byte[bufferSize]; - // byte[] bufferBkp; - bufferSize = 0; - int b = 0; - + int bufferSize = 0, b = 0; try { while ((b = pbis.read()) > 31) { -// // we have a valid byte in b, add it to the buffer -// if (buffer.length == bufferSize) { -// // the buffer is full, double its size -// bufferBkp = buffer; -// buffer = new byte[bufferSize * 2]; -// java.lang.System.arraycopy(bufferBkp, 0, buffer, 0, bufferSize); -// bufferBkp = null; -// } -// //if (bufferSize > 10000) {System.out.println("***ERRORDEBUG***:" + new String(buffer));} // debug -// buffer[bufferSize++] = (byte) b; // error hier: ArrayIndexOutOfBoundsException: -2007395416 oder 0 - readLineBuffer.write(b); if (bufferSize++ > maxSize) break; } - // we have caught a possible line end + // we have catched a possible line end if (b == cr) { // maybe a lf follows, read it: if ((b = pbis.read()) != lf) if (b >= 0) pbis.unread(b); // we push back the byte } - // finally shrink buffer -// bufferBkp = buffer; -// buffer = new byte[bufferSize]; -// java.lang.System.arraycopy(bufferBkp, 0, buffer, 0, bufferSize); -// bufferBkp = null; - - // return only the byte[] - // return buffer; return readLineBuffer.toByteArray(); - } catch (IOException e) { - if (logerr) serverLog.logError("SERVER", "receive interrupted - exception 2 = " + e.getMessage()); - return null; - } + } catch (ClosedByInterruptException e) { + if (logerr) serverLog.logError("SERVER", "receive interrupted - timeout"); + return null; + } catch (IOException e) { + if (logerr) serverLog.logError("SERVER", "receive interrupted - exception 2 = " + e.getMessage()); + return null; + } } public static void send(OutputStream os, String buf) throws IOException {