@ -41,19 +41,30 @@
package de.anomic.server ;
// standard server
import java.io.* ;
import java.net.* ;
import java.lang.* ;
import java.util.* ;
import java.lang.reflect.* ;
import java.io.ByteArrayOutputStream ;
import java.io.File ;
import java.io.FileInputStream ;
import java.io.IOException ;
import java.io.InputStream ;
import java.io.OutputStream ;
import java.io.PushbackInputStream ;
import java.lang.reflect.InvocationTargetException ;
import java.net.InetAddress ;
import java.net.ServerSocket ;
import java.net.Socket ;
import java.net.URL ;
import java.net.UnknownHostException ;
import java.util.Hashtable ;
// needed for ssl
import javax.net.* ;
import javax.net.ssl.* ;
import java.security.KeyStore ;
import javax.security.cert.X509Certificate ;
public class serverCore extends serverAbstractThread implements serverThread {
import org.apache.commons.pool.impl.GenericObjectPool ;
public final class serverCore extends serverAbstractThread implements serverThread {
// generic input/output static methods
public static final byte cr = 13 ;
@ -69,22 +80,30 @@ public class serverCore extends serverAbstractThread implements serverThread {
private int port ; // the listening port
private ServerSocket socket ; // listener
private int maxSessions = 0 ; // max. number of sessions; 0=unlimited
private serverLog log ; // log object
serverLog log ; // log object
//private serverSwitch switchboard; // external values
private int timeout ; // connection time-out of the socket
private Hashtable activeThreads ; // contains the active threads
private Hashtable sleepingThreads ; // contains the threads that are alive since the sleepthreashold
// private Hashtable activeThreads; // contains the active threads
// private Hashtable sleepingThreads; // contains the threads that are alive since the sleepthreashold
private boolean termSleepingThreads ; // if true then threads over sleepthreashold are killed
private int thresholdActive = 5000 ; // after that time a thread should have got a command line
private int thresholdSleep = 30000 ; // after that time a thread is considered as beeing sleeping (30 seconds)
private int thresholdDead = 3600000 ; // after that time a thread is considered as beeing dead-locked (1 hour)
private serverHandler handlerPrototype ; // the command class (a serverHandler)
serverHandler handlerPrototype ; // the command class (a serverHandler)
private Class [ ] initHandlerClasses ; // the init's methods arguments
private Class [ ] initSessionClasses ; // the init's methods arguments
private serverSwitch switchboard ; // the command class switchboard
private Hashtable denyHost ;
private int commandMaxLength ;
/ * *
* The session - object pool
* /
final SessionPool theSessionPool ;
final ThreadGroup theSessionThreadGroup = new ThreadGroup ( "sessionThreadGroup" ) ;
private static ServerSocketFactory getServerSocketFactory ( boolean dflt , File keyfile , String passphrase ) {
// see doc's at
// http://java.sun.com/developer/technicalArticles/Security/secureinternet/
@ -154,15 +173,38 @@ public class serverCore extends serverAbstractThread implements serverThread {
this . initHandlerClasses = new Class [ ] { Class . forName ( "de.anomic.server.serverSwitch" ) } ;
this . initSessionClasses = new Class [ ] { Class . forName ( "de.anomic.server.serverCore$Session" ) } ;
this . maxSessions = maxSessions ;
this . socket . setSoTimeout ( 0 ) ; // unlimited
this . timeout = timeout ;
this . termSleepingThreads = termSleepingThreads ;
this . log = new serverLog ( "SERVER" , logl ) ;
activeThreads = new Hashtable ( ) ;
sleepingThreads = new Hashtable ( ) ;
// activeThreads = new Hashtable();
// sleepingThreads = new Hashtable();
} catch ( java . lang . ClassNotFoundException e ) {
System . out . println ( "FATAL ERROR: " + e . getMessage ( ) + " - Class Not Found" ) ; System . exit ( 0 ) ;
}
// implementation of session thread pool
GenericObjectPool . Config config = new GenericObjectPool . Config ( ) ;
// The maximum number of active connections that can be allocated from pool at the same time,
// 0 for no limit
config . maxActive = this . maxSessions ;
// The maximum number of idle connections connections in the pool
// 0 = no limit.
config . maxIdle = this . maxSessions / 2 ;
config . minIdle = this . maxSessions / 4 ;
// block undefinitely
config . maxWait = timeout ;
// Action to take in case of an exhausted DBCP statement pool
// 0 = fail, 1 = block, 2= grow
config . whenExhaustedAction = GenericObjectPool . WHEN_EXHAUSTED_BLOCK ;
config . minEvictableIdleTimeMillis = this . thresholdSleep ;
config . testOnReturn = true ;
this . theSessionPool = new SessionPool ( new SessionFactory ( this . theSessionThreadGroup ) , config ) ;
}
public static boolean isNotLocal ( URL url ) {
@ -234,12 +276,12 @@ public class serverCore extends serverAbstractThread implements serverThread {
// class body
public boolean job ( ) throws Exception {
// prepare for new connection
idleThreadCheck ( ) ;
switchboard . handleBusyState ( activeThreads . size ( ) ) ;
// idleThreadCheck();
this . switchboard . handleBusyState ( this . theSessionPool . getNumActive ( ) /*activeThreads.size() */ ) ;
log . logDebug (
"* waiting for connections, " + activeThreads . siz e( ) + " sessions running, " +
sleepingThreads . siz e( ) + " sleeping" ) ;
"* waiting for connections, " + this . theSessionPool . getNumActiv e( ) + " sessions running, " +
this . theSessionPool . getNumIdl e( ) + " sleeping" ) ;
// list all connection (debug)
/ *
@ -257,17 +299,17 @@ public class serverCore extends serverAbstractThread implements serverThread {
// wait for new connection
announceThreadBlockApply ( ) ;
Socket controlSocket = socket. accept ( ) ;
Socket controlSocket = this . socket. accept ( ) ;
announceThreadBlockRelease ( ) ;
if ( ( denyHost = = null ) | | ( denyHost. get ( ( "" + controlSocket . getInetAddress ( ) . getHostAddress ( ) ) ) = = null ) ) {
if ( ( this . denyHost = = null ) | | ( this . denyHost. get ( ( "" + controlSocket . getInetAddress ( ) . getHostAddress ( ) ) ) = = null ) ) {
//log.logDebug("* catched request from " + controlSocket.getInetAddress().getHostAddress());
controlSocket . setSoTimeout ( timeout ) ;
controlSocket . setSoTimeout ( this . timeout ) ;
Session connection = ( Session ) this . theSessionPool . borrowObject ( ) ;
connection . execute ( controlSocket ) ;
Session connection = new Session ( controlSocket ) ;
// start the thread
connection . start ( ) ;
//try {Thread.currentThread().sleep(1000);} catch (InterruptedException e) {} // wait for debug
activeThreads . put ( connection , new Long ( System . currentTimeMillis ( ) ) ) ;
// activeThreads.put(connection, new Long(System.currentTimeMillis()));
//log.logDebug("* NEW SESSION: " + connection.request);
} else {
@ -275,111 +317,267 @@ public class serverCore extends serverAbstractThread implements serverThread {
}
// idle until number of maximal threads is (again) reached
//synchronized(this) {
while ( ( maxSessions > 0 ) & & ( activeThreads . size ( ) > = maxSessions ) ) try {
log . logDebug ( "* Waiting for activeThreads=" + activeThreads . size ( ) + " < maxSessions=" + maxSessions ) ;
Thread . currentThread ( ) . sleep ( 2000 ) ;
idleThreadCheck ( ) ;
} catch ( InterruptedException e ) { }
// while ((maxSessions > 0) && (activeThreads.size() >= maxSessions)) try {
// log.logDebug("* Waiting for activeThreads=" + activeThreads.size() + " < maxSessions=" + maxSessions);
// Thread.currentThread().sleep(2000);
// idleThreadCheck();
// } catch (InterruptedException e) {}
return true ;
}
public void close ( ) {
try {
// consuming the isInterrupted Flag. Otherwise we could not properly colse the session pool
Thread . interrupted ( ) ;
// close the session pool
this . theSessionPool . close ( ) ;
}
catch ( Exception e ) {
this . log . logSystem ( "Unable to close session pool: " + e . getMessage ( ) ) ;
}
log . logSystem ( "* terminated" ) ;
}
public int getJobCount ( ) {
return activeThreads . size ( ) ;
return this . theSessionPool . getNumActiv e( ) ;
}
// idle sensor: the thread is idle if there are no sessions running
public boolean idle ( ) {
idleThreadCheck ( ) ;
return ( activeThreads . siz e( ) = = 0 ) ;
// idleThreadCheck();
return ( this . theSessionPool . getNumActiv e( ) = = 0 ) ;
}
public void idleThreadCheck ( ) {
// a 'garbage collector' for session threads
Enumeration threadEnum ;
Session session ;
// public void idleThreadCheck() {
// // a 'garbage collector' for session threads
// Enumeration threadEnum;
// Session session;
//
// // look for sleeping threads
// threadEnum = activeThreads.keys();
// long time;
// while (threadEnum.hasMoreElements()) {
// session = (Session) (threadEnum.nextElement());
// //if (session.request == null) session.interrupt();
// if (session.isAlive()) {
// // check if socket still exists
// time = System.currentTimeMillis() - ((Long) activeThreads.get(session)).longValue();
// if (/*(session.controlSocket.isClosed()) || */
// (!(session.controlSocket.isBound())) ||
// (!(session.controlSocket.isConnected())) ||
// ((session.request == null) && (time > 1000))) {
// // kick it
// try {
// session.out.close();
// session.in.close();
// session.controlSocket.close();
// } catch (IOException e) {}
// session.interrupt(); // hopefully this wakes him up.
// activeThreads.remove(session);
// String reason = "";
// if (session.controlSocket.isClosed()) reason = "control socked closed";
// if (!(session.controlSocket.isBound())) reason = "control socked unbound";
// if (!(session.controlSocket.isConnected())) reason = "control socked not connected";
// if (session.request == null) reason = "no request placed";
// log.logDebug("* canceled disconnected connection (" + reason + ") '" + session.request + "'");
// } else if (time > thresholdSleep) {
// // move thread from the active threads to the sleeping
// sleepingThreads.put(session, activeThreads.remove(session));
// log.logDebug("* sleeping connection '" + session.request + "'");
// } else if ((time > thresholdActive) && (session.request == null)) {
// // thread is not in use (or too late). kickk it.
// try {
// session.out.close();
// session.in.close();
// session.controlSocket.close();
// } catch (IOException e) {}
// session.interrupt(); // hopefully this wakes him up.
// activeThreads.remove(session);
// log.logDebug("* canceled inactivated connection");
// }
// } else {
// // the thread is dead, remove it
// log.logDebug("* normal close of connection to '" + session.request + "', time=" + session.getTime());
// activeThreads.remove(session);
// }
// }
//
// // look for dead threads
// threadEnum = sleepingThreads.keys();
// while (threadEnum.hasMoreElements()) {
// session = (Session) (threadEnum.nextElement());
// if (session.isAlive()) {
// // check the age of the thread
// if (System.currentTimeMillis() - ((Long) sleepingThreads.get(session)).longValue() > thresholdDead) {
// // kill the thread
// if (termSleepingThreads) {
// try {
// session.out.close();
// session.in.close();
// session.controlSocket.close();
// } catch (IOException e) {}
// session.interrupt(); // hopefully this wakes him up.
// }
// sleepingThreads.remove(session);
// log.logDebug("* out-timed connection '" + session.request + "'");
// }
// } else {
// // the thread is dead, remove it
// sleepingThreads.remove(session);
// log.logDebug("* dead connection '" + session.request + "'");
// }
// }
//
// }
// look for sleeping threads
threadEnum = activeThreads . keys ( ) ;
long time ;
while ( threadEnum . hasMoreElements ( ) ) {
session = ( Session ) ( threadEnum . nextElement ( ) ) ;
//if (session.request == null) session.interrupt();
if ( session . isAlive ( ) ) {
// check if socket still exists
time = System . currentTimeMillis ( ) - ( ( Long ) activeThreads . get ( session ) ) . longValue ( ) ;
if ( /*(session.controlSocket.isClosed()) || */
( ! ( session . controlSocket . isBound ( ) ) ) | |
( ! ( session . controlSocket . isConnected ( ) ) ) | |
( ( session . request = = null ) & & ( time > 1000 ) ) ) {
// kick it
try {
session . out . close ( ) ;
session . in . close ( ) ;
session . controlSocket . close ( ) ;
} catch ( IOException e ) { }
session . interrupt ( ) ; // hopefully this wakes him up.
activeThreads . remove ( session ) ;
String reason = "" ;
if ( session . controlSocket . isClosed ( ) ) reason = "control socked closed" ;
if ( ! ( session . controlSocket . isBound ( ) ) ) reason = "control socked unbound" ;
if ( ! ( session . controlSocket . isConnected ( ) ) ) reason = "control socked not connected" ;
if ( session . request = = null ) reason = "no request placed" ;
log . logDebug ( "* canceled disconnected connection (" + reason + ") '" + session . request + "'" ) ;
} else if ( time > thresholdSleep ) {
// move thread from the active threads to the sleeping
sleepingThreads . put ( session , activeThreads . remove ( session ) ) ;
log . logDebug ( "* sleeping connection '" + session . request + "'" ) ;
} else if ( ( time > thresholdActive ) & & ( session . request = = null ) ) {
// thread is not in use (or too late). kickk it.
try {
session . out . close ( ) ;
session . in . close ( ) ;
session . controlSocket . close ( ) ;
} catch ( IOException e ) { }
session . interrupt ( ) ; // hopefully this wakes him up.
activeThreads . remove ( session ) ;
log . logDebug ( "* canceled inactivated connection" ) ;
public final class SessionPool extends GenericObjectPool
{
public boolean isClosed = false ;
/ * *
* First constructor .
* @param objFactory
* /
public SessionPool ( SessionFactory objFactory ) {
super ( objFactory ) ;
this . setMaxIdle ( 75 ) ; // Maximum idle threads.
this . setMaxActive ( 150 ) ; // Maximum active threads.
this . setMinEvictableIdleTimeMillis ( 30000 ) ; //Evictor runs every 30 secs.
//this.setMaxWait(1000); // Wait 1 second till a thread is available
}
} else {
// the thread is dead, remove it
log . logDebug ( "* normal close of connection to '" + session . request + "', time=" + session . getTime ( ) ) ;
activeThreads . remove ( session ) ;
public SessionPool ( SessionFactory objFactory ,
GenericObjectPool . Config config ) {
super ( objFactory , config ) ;
}
/ * *
* @see org . apache . commons . pool . impl . GenericObjectPool # borrowObject ( )
* /
public Object borrowObject ( ) throws Exception {
return super . borrowObject ( ) ;
}
// look for dead threads
threadEnum = sleepingThreads . keys ( ) ;
while ( threadEnum . hasMoreElements ( ) ) {
session = ( Session ) ( threadEnum . nextElement ( ) ) ;
if ( session . isAlive ( ) ) {
// check the age of the thread
if ( System . currentTimeMillis ( ) - ( ( Long ) sleepingThreads . get ( session ) ) . longValue ( ) > thresholdDead ) {
// kill the thread
if ( termSleepingThreads ) {
/ * *
* @see org . apache . commons . pool . impl . GenericObjectPool # returnObject ( java . lang . Object )
* /
public void returnObject ( Object obj ) throws Exception {
super . returnObject ( obj ) ;
}
public synchronized void close ( ) throws Exception {
/ *
* shutdown all still running session threads . . .
* /
// interrupting all still running or pooled threads ...
serverCore . this . theSessionThreadGroup . interrupt ( ) ;
/* waiting for all threads to finish */
int threadCount = serverCore . this . theSessionThreadGroup . activeCount ( ) ;
Thread [ ] threadList = new Thread [ threadCount ] ;
threadCount = serverCore . this . theSessionThreadGroup . enumerate ( threadList ) ;
try {
session . out . close ( ) ;
session . in . close ( ) ;
session . controlSocket . close ( ) ;
} catch ( IOException e ) { }
session . interrupt ( ) ; // hopefully this wakes him up.
for ( int currentThreadIdx = 0 ; currentThreadIdx < threadCount ; currentThreadIdx + + ) {
// we need to use a timeout here because of missing interruptable session threads ...
threadList [ currentThreadIdx ] . join ( 500 ) ;
}
sleepingThreads . remove ( session ) ;
log . logDebug ( "* out-timed connection '" + session . request + "'" ) ;
}
} else {
// the thread is dead, remove it
sleepingThreads . remove ( session ) ;
log . logDebug ( "* dead connection '" + session . request + "'" ) ;
catch ( InterruptedException e ) {
serverCore . this . log . logWarning ( "Interruption while trying to shutdown all session threads." ) ;
}
finally {
this . isClosed = true ;
}
super . close ( ) ;
}
}
public final class SessionFactory implements org . apache . commons . pool . PoolableObjectFactory {
final ThreadGroup sessionThreadGroup ;
public SessionFactory ( ThreadGroup theSessionThreadGroup ) {
super ( ) ;
if ( theSessionThreadGroup = = null )
throw new IllegalArgumentException ( "The threadgroup object must not be null." ) ;
this . sessionThreadGroup = theSessionThreadGroup ;
}
/ * *
* @see org . apache . commons . pool . PoolableObjectFactory # makeObject ( )
* /
public Object makeObject ( ) {
return new Session ( this . sessionThreadGroup ) ;
}
/ * *
* @see org . apache . commons . pool . PoolableObjectFactory # destroyObject ( java . lang . Object )
* /
public void destroyObject ( Object obj ) {
if ( obj instanceof Session ) {
Session theSession = ( Session ) obj ;
theSession . setStopped ( true ) ;
}
}
public class Session extends Thread {
/ * *
* @see org . apache . commons . pool . PoolableObjectFactory # validateObject ( java . lang . Object )
* /
public boolean validateObject ( Object obj ) {
if ( obj instanceof Session )
{
Session theSession = ( Session ) obj ;
if ( ! theSession . isAlive ( ) | | theSession . isInterrupted ( ) ) return false ;
if ( theSession . isRunning ( ) ) return true ;
return false ;
}
return true ;
}
/ * *
* @param obj
*
* /
public void activateObject ( Object obj ) {
//log.debug(" activateObject...");
}
/ * *
* @param obj
*
* /
public void passivateObject ( Object obj ) {
//log.debug(" passivateObject..." + obj);
if ( obj instanceof Session ) {
Session theSession = ( Session ) obj ;
// Clean up the result of the execution
theSession . setResult ( null ) ;
}
}
}
public final class Session extends Thread {
// used as replacement for activeThreads, sleepingThreads
// static ThreadGroup sessionThreadGroup = new ThreadGroup("sessionThreadGroup");
// synchronization object needed for the threadpool implementation
private Object syncObject ;
private Object processingResult = null ;
private boolean running = false ;
private boolean stopped = false ;
private boolean done = false ;
private long start ; // startup time
private serverHandler commandObj ;
@ -392,29 +590,31 @@ public class serverCore extends serverAbstractThread implements serverThread {
public PushbackInputStream in ; // on control input stream
public OutputStream out ; // on control output stream, autoflush
public Session ( Socket controlSocket ) throws IOException {
//this.promiscuous = false;
this . start = System . currentTimeMillis ( ) ;
//log.logDebug("* session " + handle + " allocated");
this . identity = "-" ;
this . userAddress = controlSocket . getInetAddress ( ) ;
String ipname = userAddress . getHostAddress ( ) ;
// check if we want to allow this socket to connect us
private final ByteArrayOutputStream readLineBuffer = new ByteArrayOutputStream ( 256 ) ;
public Session ( ThreadGroup theThreadGroup ) {
super ( theThreadGroup , "Session" ) ;
}
public void setStopped ( boolean stopped ) {
this . stopped = stopped ;
}
public void execute ( Socket controlSocket ) {
this . execute ( controlSocket , null ) ;
}
public synchronized void execute ( Socket controlSocket , Object synObj ) {
this . controlSocket = controlSocket ;
this . in = new PushbackInputStream ( controlSocket . getInputStream ( ) ) ;
this . out = controlSocket . getOutputStream ( ) ;
commandCounter = 0 ;
// initiate the command class
// we pass the input and output stream to the commands,
// so that they can take over communication, if needed
try {
// use the handler prototype to create a new command object class
commandObj = ( serverHandler ) handlerPrototype . clone ( ) ;
commandObj . initSession ( this ) ;
} catch ( Exception e ) {
e . printStackTrace ( ) ;
this . syncObject = synObj ;
this . done = false ;
if ( ! this . running ) {
// this.setDaemon(true);
this . start ( ) ;
} else {
this . notifyAll ( ) ;
}
//log.logDebug("* session " + handle + " initialized. time = " + (System.currentTimeMillis() - handle));
}
public long getTime ( ) {
@ -432,39 +632,144 @@ public class serverCore extends serverAbstractThread implements serverThread {
* /
public void log ( boolean outgoing , String request ) {
log. logInfo ( userAddress . getHostAddress ( ) + "/" + this . identity + " " +
"[" + activeThreads . size ( ) + ", " + commandCounter +
serverCore. this . log. logInfo ( userAddress . getHostAddress ( ) + "/" + this . identity + " " +
"[" + serverCore . this . theSessionPool . getNumActive ( ) + ", " + this . commandCounter +
( ( outgoing ) ? "] > " : "] < " ) +
request ) ;
}
public void writeLine ( String messg ) throws IOException {
send ( out, messg ) ;
send ( this . out, messg ) ;
log ( true , messg ) ;
}
public byte [ ] readLine ( ) {
return receive ( in , timeout , commandMaxLength , false ) ;
return receive ( in , this . readLineBuffer , timeout , commandMaxLength , false ) ;
}
/ * *
* @return
* /
public boolean isRunning ( ) {
return this . running ;
}
/ * *
* @param object
* /
public void setResult ( Object object ) {
this . processingResult = object ;
}
/ * *
*
* /
public void reset ( ) {
this . done = true ;
this . syncObject = null ;
this . readLineBuffer . reset ( ) ;
}
/ * *
*
*
* @see java . lang . Thread # run ( )
* /
public void run ( ) {
this . running = true ;
// The thread keeps running.
while ( ! this . stopped & & ! Thread . interrupted ( ) ) {
if ( this . done ) {
// We are waiting for a task now.
synchronized ( this ) {
try {
this . wait ( ) ; //Wait until we get a request to process.
}
catch ( InterruptedException e ) {
this . stopped = true ;
// log.error("", e);
}
}
}
else
{
//There is a task....let us execute it.
try {
execute ( ) ;
if ( this . syncObject ! = null ) {
synchronized ( this . syncObject ) {
//Notify the completion.
this . syncObject . notifyAll ( ) ;
}
}
} catch ( Exception e ) {
// log.error("", e);
}
finally {
reset ( ) ;
public final void run ( ) {
//log.logDebug("* session " + handle + " started. time = " + (System.currentTimeMillis() - handle));
if ( ! this . stopped & & ! this . isInterrupted ( ) ) {
try {
serverCore . this . theSessionPool . returnObject ( this ) ;
}
catch ( Exception e1 ) {
e1 . printStackTrace ( ) ;
}
}
}
}
}
}
private void execute ( ) {
try {
// setting the session startup time
this . start = System . currentTimeMillis ( ) ;
// settin the session identity
this . identity = "-" ;
// getting some client information
this . userAddress = this . controlSocket . getInetAddress ( ) ;
// TODO: check if we want to allow this socket to connect us
// getting input and output stream for communication with client
this . in = new PushbackInputStream ( this . controlSocket . getInputStream ( ) ) ;
this . out = this . controlSocket . getOutputStream ( ) ;
// initiate the command class
this . commandCounter = 0 ;
if ( ( this . commandObj ! = null ) & &
( this . commandObj . getClass ( ) . getName ( ) . equals ( serverCore . this . handlerPrototype . getClass ( ) . getName ( ) ) ) ) {
this . commandObj . reset ( ) ;
}
else {
this . commandObj = ( serverHandler ) serverCore . this . handlerPrototype . clone ( ) ;
}
this . commandObj . initSession ( this ) ;
listen ( ) ;
} catch ( Exception e ) {
System . err . println ( "ERROR: (internal) " + e ) ;
} finally {
try {
out . flush ( ) ;
this . out. flush ( ) ;
// close everything
out . close ( ) ;
in . close ( ) ;
controlSocket . close ( ) ;
this . out. close ( ) ;
this . in. close ( ) ;
this . controlSocket. close ( ) ;
} catch ( IOException e ) {
System . err . println ( "ERROR: (internal) " + e ) ;
}
synchronized ( this ) { this . notify ( ) ; }
}
//log.logDebug("* session " + handle + " completed. time = " + (System.currentTimeMillis() - handle));
announceMoreExecTime ( System . currentTimeMillis ( ) - start ) ;
announceMoreExecTime ( System . currentTimeMillis ( ) - this . start) ;
}
private void listen ( ) {
@ -486,7 +791,7 @@ public class serverCore extends serverAbstractThread implements serverThread {
String cmd ;
String tmp ;
Object [ ] stringParameter = new String [ 1 ] ;
while ( ( in ! = null ) & & ( ( requestBytes = readLine ( ) ) ! = null ) ) {
while ( ( this . in ! = null ) & & ( ( requestBytes = readLine ( ) ) ! = null ) ) {
commandCounter + + ;
request = new String ( requestBytes ) ;
//log.logDebug("* session " + handle + " received command '" + request + "'. time = " + (System.currentTimeMillis() - handle));
@ -502,7 +807,7 @@ public class serverCore extends serverAbstractThread implements serverThread {
}
// exec command and return value
result = commandObj. getClass ( ) . getMethod ( cmd , stringType ) . invoke ( commandObj, stringParameter ) ;
result = this . commandObj. getClass ( ) . getMethod ( cmd , stringType ) . invoke ( this . commandObj, stringParameter ) ;
//log.logDebug("* session " + handle + " completed command '" + request + "'. time = " + (System.currentTimeMillis() - handle));
this . out . flush ( ) ;
if ( result = = null ) {
@ -555,7 +860,7 @@ public class serverCore extends serverAbstractThread implements serverThread {
// whatever happens: the thread has to survive!
writeLine ( "UNKNOWN REASON:" + ( String ) commandObj . error ( e ) ) ;
}
}
} // end of while
} catch ( java . lang . ClassNotFoundException e ) {
System . out . println ( "Internal Error: wrapper class not found: " + e . getMessage ( ) ) ;
System . exit ( 0 ) ;
@ -566,11 +871,22 @@ public class serverCore extends serverAbstractThread implements serverThread {
}
public static byte [ ] receive ( PushbackInputStream pbis , long timeout , int maxSize , boolean logerr ) {
public static byte [ ] receive ( PushbackInputStream pbis , ByteArrayOutputStream readLineBuffer , long timeout , int maxSize , boolean logerr ) {
// this is essentially a readln on a PushbackInputStream
int bufferSize = 0 ;
bufferSize = 10 ;
// reuse an existing linebuffer or create a new one ...
if ( readLineBuffer = = null ) {
readLineBuffer = new ByteArrayOutputStream ( 256 ) ;
} else {
readLineBuffer . reset ( ) ;
}
// TODO: we should remove this statements because calling the available function is very time consuming
// we better should use nio sockets instead because they are interruptable ...
try {
long t = timeout ;
while ( ( ( bufferSize = pbis . available ( ) ) = = 0 ) & & ( t > 0 ) ) try {
@ -590,25 +906,28 @@ public class serverCore extends serverAbstractThread implements serverThread {
return null ;
}
byte [ ] buffer = new byte [ bufferSize ] ;
byte [ ] bufferBkp ;
// byte[] buffer = new byte[bufferSize];
// byte[] bufferBkp;
bufferSize = 0 ;
int b = 0 ;
try {
while ( ( b = pbis . read ( ) ) > 31 ) {
// we have a valid byte in b, add it to the buffer
if ( buffer . length = = bufferSize ) {
// the buffer is full, double its size
bufferBkp = buffer ;
buffer = new byte [ bufferSize * 2 ] ;
java . lang . System . arraycopy ( bufferBkp , 0 , buffer , 0 , bufferSize ) ;
bufferBkp = null ;
}
//if (bufferSize > 10000) {System.out.println("***ERRORDEBUG***:" + new String(buffer));} // debug
buffer [ bufferSize + + ] = ( byte ) b ; // error hier: ArrayIndexOutOfBoundsException: -2007395416 oder 0
if ( bufferSize > maxSize ) break ;
// // we have a valid byte in b, add it to the buffer
// if (buffer.length == bufferSize) {
// // the buffer is full, double its size
// bufferBkp = buffer;
// buffer = new byte[bufferSize * 2];
// java.lang.System.arraycopy(bufferBkp, 0, buffer, 0, bufferSize);
// bufferBkp = null;
// }
// //if (bufferSize > 10000) {System.out.println("***ERRORDEBUG***:" + new String(buffer));} // debug
// buffer[bufferSize++] = (byte) b; // error hier: ArrayIndexOutOfBoundsException: -2007395416 oder 0
readLineBuffer . write ( b ) ;
if ( bufferSize + + > maxSize ) break ;
}
// we have catched a possible line end
if ( b = = cr ) {
// maybe a lf follows, read it:
@ -616,13 +935,14 @@ public class serverCore extends serverAbstractThread implements serverThread {
}
// finally shrink buffer
bufferBkp = buffer ;
buffer = new byte [ bufferSize ] ;
java . lang . System . arraycopy ( bufferBkp , 0 , buffer , 0 , bufferSize ) ;
bufferBkp = null ;
// bufferBkp = buffer;
// buffer = new byte[bufferSize];
// java.lang.System.arraycopy(bufferBkp, 0, buffer, 0, bufferSize);
// bufferBkp = null;
// return only the byte[]
return buffer ;
// return buffer;
return readLineBuffer . toByteArray ( ) ;
} catch ( IOException e ) {
if ( logerr ) serverLog . logError ( "SERVER" , "receive interrupted - exception 2 = " + e . getMessage ( ) ) ;
return null ;
@ -651,4 +971,9 @@ public class serverCore extends serverAbstractThread implements serverThread {
if ( bufferSize > 80 ) return "<LONG STREAM>" ; else return new String ( buffer ) ;
}
protected void finalize ( ) throws Throwable {
if ( ! this . theSessionPool . isClosed ) this . theSessionPool . close ( ) ;
super . finalize ( ) ;
}
}