*) migration to Java NIO

- to avoid buzy waiting and
   - make socket blocking interruptable
*) changing reference to logger
*) introduce commandObjMethodCache to improve performance
*) doing a stream shutdown before closing the connection
   to aviod problems when using persistent connections
*) calling method UNKNOWN of the server-handler class when 
   receiving an unknown command
*) calling method EMPTY of the server-handler class when
   receiving an empty command

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@254 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
theli 20 years ago
parent 1eff96f471
commit af9cd67334

@ -50,12 +50,22 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.KeyStore;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import javax.net.ServerSocketFactory;
@ -66,23 +76,27 @@ import javax.net.ssl.SSLServerSocketFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool.Config;
public final class serverCore extends serverAbstractThread implements serverThread {
import de.anomic.http.httpd;
import de.anomic.server.logging.serverLog;
public final class serverCore extends serverAbstractThread implements serverThread {
// generic input/output static methods
public static final byte cr = 13;
public static final byte lf = 10;
public static final byte[] crlf = {cr, lf};
public static final String crlfString = new String(crlf);
// static variables
public static final Boolean TERMINATE_CONNECTION = Boolean.FALSE;
public static final Boolean RESUME_CONNECTION = Boolean.TRUE;
public static Hashtable bfHost = new Hashtable(); // for brute-force prevention
// class variables
private int port; // the listening port
private ServerSocketChannel channel;
private ServerSocket socket; // listener
public int maxSessions = 0; // max. number of sessions; 0=unlimited
public int maxSessions = 0; // max. number of sessions; 0=unlimited
serverLog log; // log object
private int timeout; // connection time-out of the socket
@ -103,6 +117,7 @@ public final class serverCore extends serverAbstractThread implements serverThre
final SessionPool theSessionPool;
final ThreadGroup theSessionThreadGroup = new ThreadGroup("sessionThreadGroup");
private Config cralwerPoolConfig = null;
private Selector channelSel;
private static ServerSocketFactory getServerSocketFactory(boolean dflt, File keyfile, String passphrase) {
// see doc's at
@ -172,7 +187,14 @@ public final class serverCore extends serverAbstractThread implements serverThre
try {
this.socket = new ServerSocket(port);
// Open a new server-socket channel
this.channel = ServerSocketChannel.open();
// Binds the ServerSocket to a specific address
this.socket = this.channel.socket();
this.socket.bind(new InetSocketAddress(port));
// this.socket = new ServerSocket(port);
} catch (java.net.BindException e) {
System.out.println("FATAL ERROR: " + e.getMessage() + " - probably root access rights needed. check port number"); System.exit(0);
}
@ -186,8 +208,6 @@ public final class serverCore extends serverAbstractThread implements serverThre
this.timeout = timeout;
this.termSleepingThreads = termSleepingThreads;
this.log = new serverLog("SERVER", logl);
// activeThreads = new Hashtable();
// sleepingThreads = new Hashtable();
} catch (java.lang.ClassNotFoundException e) {
System.out.println("FATAL ERROR: " + e.getMessage() + " - Class Not Found"); System.exit(0);
}
@ -307,46 +327,29 @@ public final class serverCore extends serverAbstractThread implements serverThre
"* waiting for connections, " + this.theSessionPool.getNumActive() + " sessions running, " +
this.theSessionPool.getNumIdle() + " sleeping");
// list all connection (debug)
/*
if (activeThreads.size() > 0) {
Enumeration threadEnum = activeThreads.keys();
Session se;
long time;
while (threadEnum.hasMoreElements()) {
se = (Session) threadEnum.nextElement();
time = System.currentTimeMillis() - ((Long) activeThreads.get(se)).longValue();
log.logDebug("* ACTIVE SESSION (" + ((se.isAlive()) ? "alive" : "dead") + ", " + time + "): " + se.request);
}
}
*/
// wait for new connection
announceThreadBlockApply();
Socket controlSocket = this.socket.accept();
announceThreadBlockRelease();
String cIP = clientAddress(controlSocket);
//System.out.println("server bfHosts=" + bfHost.toString());
if (bfHost.get(cIP) != null) {
log.logInfo("SLOWING DOWN ACCESS FOR BRUTE-FORCE PREVENTION FROM " + cIP);
// add a delay to make brute-force harder
try {Thread.currentThread().sleep(3000);} catch (InterruptedException e) {}
}
this.log.logInfo("SLOWING DOWN ACCESS FOR BRUTE-FORCE PREVENTION FROM " + cIP);
// add a delay to make brute-force harder
try {Thread.currentThread().sleep(3000);} catch (InterruptedException e) {}
}
if ((this.denyHost == null) || (this.denyHost.get(cIP) == null)) {
controlSocket.setSoTimeout(this.timeout);
Session connection = (Session) this.theSessionPool.borrowObject();
connection.execute(controlSocket);
connection.execute(controlSocket,this.timeout);
//log.logDebug("* NEW SESSION: " + connection.request + " from " + clientIP);
} else {
System.out.println("ACCESS FROM " + cIP + " DENIED");
}
// idle until number of maximal threads is (again) reached
//synchronized(this) {
// while ((maxSessions > 0) && (activeThreads.size() >= maxSessions)) try {
// log.logDebug("* Waiting for activeThreads=" + activeThreads.size() + " < maxSessions=" + maxSessions);
// Thread.currentThread().sleep(2000);
// idleThreadCheck();
// } catch (InterruptedException e) {}
return true;
}
@ -355,6 +358,10 @@ public final class serverCore extends serverAbstractThread implements serverThre
// consuming the isInterrupted Flag. Otherwise we could not properly close the session pool
Thread.interrupted();
// closing the serverchannel and socket
this.socket.close();
this.channel.close();
// close the session pool
this.theSessionPool.close();
}
@ -525,6 +532,8 @@ public final class serverCore extends serverAbstractThread implements serverThre
private long start; // startup time
private serverHandler commandObj;
private Hashtable commandObjMethodCache = new Hashtable(5);
private String request; // current command line
private int commandCounter; // for logging: number of commands in this session
private String identity; // a string that identifies the client (i.e. ftp: account name)
@ -533,8 +542,10 @@ public final class serverCore extends serverAbstractThread implements serverThre
public InetAddress userAddress; // the address of the client
public PushbackInputStream in; // on control input stream
public OutputStream out; // on control output stream, autoflush
private int socketTimeout;
private final serverByteBuffer readLineBuffer = new serverByteBuffer(256);
public Session(ThreadGroup theThreadGroup) {
super(theThreadGroup,"Session");
@ -544,11 +555,12 @@ public final class serverCore extends serverAbstractThread implements serverThre
this.stopped = stopped;
}
public void execute(Socket controlSocket) {
this.execute(controlSocket, null);
public void execute(Socket controlSocket, int socketTimeout) {
this.execute(controlSocket, socketTimeout, null);
}
public synchronized void execute(Socket controlSocket, Object synObj) {
public synchronized void execute(Socket controlSocket, int socketTimeout, Object synObj) {
this.socketTimeout = socketTimeout;
this.controlSocket = controlSocket;
this.syncObject = synObj;
this.done = false;
@ -588,7 +600,7 @@ public final class serverCore extends serverAbstractThread implements serverThre
}
public byte[] readLine() {
return receive(in, this.readLineBuffer, timeout, commandMaxLength, false);
return receive(in, this.readLineBuffer, commandMaxLength, false);
}
@ -694,9 +706,8 @@ public final class serverCore extends serverAbstractThread implements serverThre
if ((this.commandObj != null) &&
(this.commandObj.getClass().getName().equals(serverCore.this.handlerPrototype.getClass().getName()))) {
this.commandObj.reset();
}
else {
this.commandObj = (serverHandler) serverCore.this.handlerPrototype.clone();
} else {
this.commandObj = (serverHandler) serverCore.this.handlerPrototype.clone();
}
this.commandObj.initSession(this);
@ -706,11 +717,17 @@ public final class serverCore extends serverAbstractThread implements serverThre
System.err.println("ERROR: (internal) " + e);
} finally {
try {
this.out.flush();
// close everything
this.out.close();
this.in.close();
this.controlSocket.close();
this.out.flush();
this.controlSocket.shutdownInput();
this.controlSocket.shutdownOutput();
this.in.close();
this.out.close();
// close everything
this.controlSocket.close(); this.controlSocket = null;
} catch (IOException e) {
System.err.println("ERROR: (internal) " + e);
}
@ -739,23 +756,42 @@ public final class serverCore extends serverAbstractThread implements serverThre
String cmd;
String tmp;
Object[] stringParameter = new String[1];
while ((this.in != null) && ((requestBytes = readLine()) != null)) {
commandCounter++;
request = new String(requestBytes);
while ((this.in != null) && ((requestBytes = readLine()) != null)) {
this.commandCounter++;
this.setName("Session_" + this.userAddress.getHostAddress() + ":" + this.controlSocket.getPort() + "#" + commandCounter);
this.request = new String(requestBytes);
//log.logDebug("* session " + handle + " received command '" + request + "'. time = " + (System.currentTimeMillis() - handle));
log(false, request);
log(false, this.request);
try {
pos = request.indexOf(' ');
// if we can not determine the proper command string we try to call function emptyRequest
// of the commandObject
if (this.request.trim().length() == 0) this.request = "EMPTY";
pos = this.request.indexOf(' ');
if (pos < 0) {
cmd = request.trim().toUpperCase();
cmd = this.request.trim().toUpperCase();
stringParameter[0] = "";
} else {
cmd = request.substring(0, pos).trim().toUpperCase();
stringParameter[0] = request.substring(pos).trim();
cmd = this.request.substring(0, pos).trim().toUpperCase();
stringParameter[0] = this.request.substring(pos).trim();
}
// setting the socket timeout for reading of the request content
this.controlSocket.setSoTimeout(this.socketTimeout);
// exec command and return value
result = this.commandObj.getClass().getMethod(cmd, stringType).invoke(this.commandObj, stringParameter);
Object commandMethod = this.commandObjMethodCache.get(cmd);
if (commandMethod == null) {
try {
commandMethod = this.commandObj.getClass().getMethod(cmd, stringType);
this.commandObjMethodCache.put(cmd,commandMethod);
} catch (NoSuchMethodException noMethod) {
commandMethod = this.commandObj.getClass().getMethod("UNKNOWN", stringType);
stringParameter[0] = this.request.trim();
}
}
result = ((Method)commandMethod).invoke(this.commandObj, stringParameter);
//log.logDebug("* session " + handle + " completed command '" + request + "'. time = " + (System.currentTimeMillis() - handle));
this.out.flush();
if (result == null) {
@ -763,7 +799,9 @@ public final class serverCore extends serverAbstractThread implements serverThre
log(2, true, "(NULL RETURNED/STREAM PASSED)");
*/
} else if (result instanceof Boolean) {
if (((Boolean) result) == TERMINATE_CONNECTION) break;
if (((Boolean) result).equals(TERMINATE_CONNECTION)) break;
// deactivating timeout. this is needed because of persistent connections
this.controlSocket.setSoTimeout(0);
} else if (result instanceof String) {
if (((String) result).startsWith("!")) {
result = ((String) result).substring(1);
@ -785,28 +823,28 @@ public final class serverCore extends serverAbstractThread implements serverThre
System.out.println("ERROR A " + userAddress.getHostAddress());
// we extract a target exception and let the thread survive
writeLine((String) commandObj.error(ite.getTargetException()));
} catch (NoSuchMethodException nsme) {
System.out.println("ERROR B " + userAddress.getHostAddress());
if (isNotLocal(userAddress.getHostAddress().toString())) {
if (denyHost != null)
denyHost.put((""+userAddress.getHostAddress()), "deny"); // block client: hacker attempt
}
break;
// the client requested a command that does not exist
//Object[] errorParameter = { nsme };
//writeLine((String) error.invoke(this.cmdObject, errorParameter));
} catch (IllegalAccessException iae) {
System.out.println("ERROR C " + userAddress.getHostAddress());
// wrong parameters: this an only be an internal problem
writeLine((String) commandObj.error(iae));
} catch (java.lang.ClassCastException e) {
System.out.println("ERROR D " + userAddress.getHostAddress());
// ??
writeLine((String) commandObj.error(e));
} catch (Exception e) {
System.out.println("ERROR E " + userAddress.getHostAddress());
// whatever happens: the thread has to survive!
writeLine("UNKNOWN REASON:" + (String) commandObj.error(e));
} catch (NoSuchMethodException nsme) {
System.out.println("ERROR B " + userAddress.getHostAddress());
if (isNotLocal(userAddress.getHostAddress().toString())) {
if (denyHost != null)
denyHost.put((""+userAddress.getHostAddress()), "deny"); // block client: hacker attempt
}
break;
// the client requested a command that does not exist
//Object[] errorParameter = { nsme };
//writeLine((String) error.invoke(this.cmdObject, errorParameter));
} catch (IllegalAccessException iae) {
System.out.println("ERROR C " + userAddress.getHostAddress());
// wrong parameters: this an only be an internal problem
writeLine((String) commandObj.error(iae));
} catch (java.lang.ClassCastException e) {
System.out.println("ERROR D " + userAddress.getHostAddress());
// ??
writeLine((String) commandObj.error(e));
} catch (Exception e) {
System.out.println("ERROR E " + userAddress.getHostAddress());
// whatever happens: the thread has to survive!
writeLine("UNKNOWN REASON:" + (String) commandObj.error(e));
}
} // end of while
} catch (java.lang.ClassNotFoundException e) {
@ -819,82 +857,32 @@ public final class serverCore extends serverAbstractThread implements serverThre
}
public static byte[] receive(PushbackInputStream pbis, serverByteBuffer readLineBuffer, long timeout, int maxSize, boolean logerr) {
// this is essentially a readln on a PushbackInputStream
int bufferSize = 0;
bufferSize = 10;
public static byte[] receive(PushbackInputStream pbis, serverByteBuffer readLineBuffer, int maxSize, boolean logerr) {
// reuse an existing linebuffer or create a new one ...
if (readLineBuffer == null) {
readLineBuffer = new serverByteBuffer(256);
} else {
readLineBuffer.reset();
}
// reuse an existing linebuffer
readLineBuffer.reset();
// TODO: we should remove this statements because calling the available function is very time consuming
// we better should use nio sockets instead because they are interruptable ...
try {
long t = timeout;
while (((bufferSize = pbis.available()) == 0) && (t > 0)) try {
Thread.currentThread().sleep(100);
t -= 100;
} catch (InterruptedException e) {}
if (t <= 0) {
if (logerr) serverLog.logError("SERVER", "receive interrupted - timeout");
return null;
}
if (bufferSize == 0) {
if (logerr) serverLog.logError("SERVER", "receive interrupted - buffer empty");
return null;
}
} catch (IOException e) {
if (logerr) serverLog.logError("SERVER", "receive interrupted - exception 1 = " + e.getMessage());
return null;
}
// byte[] buffer = new byte[bufferSize];
// byte[] bufferBkp;
bufferSize = 0;
int b = 0;
int bufferSize = 0, b = 0;
try {
while ((b = pbis.read()) > 31) {
// // we have a valid byte in b, add it to the buffer
// if (buffer.length == bufferSize) {
// // the buffer is full, double its size
// bufferBkp = buffer;
// buffer = new byte[bufferSize * 2];
// java.lang.System.arraycopy(bufferBkp, 0, buffer, 0, bufferSize);
// bufferBkp = null;
// }
// //if (bufferSize > 10000) {System.out.println("***ERRORDEBUG***:" + new String(buffer));} // debug
// buffer[bufferSize++] = (byte) b; // error hier: ArrayIndexOutOfBoundsException: -2007395416 oder 0
readLineBuffer.write(b);
if (bufferSize++ > maxSize) break;
}
// we have caught a possible line end
// we have catched a possible line end
if (b == cr) {
// maybe a lf follows, read it:
if ((b = pbis.read()) != lf) if (b >= 0) pbis.unread(b); // we push back the byte
}
// finally shrink buffer
// bufferBkp = buffer;
// buffer = new byte[bufferSize];
// java.lang.System.arraycopy(bufferBkp, 0, buffer, 0, bufferSize);
// bufferBkp = null;
// return only the byte[]
// return buffer;
return readLineBuffer.toByteArray();
} catch (IOException e) {
if (logerr) serverLog.logError("SERVER", "receive interrupted - exception 2 = " + e.getMessage());
return null;
}
} catch (ClosedByInterruptException e) {
if (logerr) serverLog.logError("SERVER", "receive interrupted - timeout");
return null;
} catch (IOException e) {
if (logerr) serverLog.logError("SERVER", "receive interrupted - exception 2 = " + e.getMessage());
return null;
}
}
public static void send(OutputStream os, String buf) throws IOException {

Loading…
Cancel
Save