diff --git a/htroot/autoconfig.java b/htroot/autoconfig.java index cd9ce521b..a5e98b7f6 100644 --- a/htroot/autoconfig.java +++ b/htroot/autoconfig.java @@ -62,7 +62,7 @@ public class autoconfig { serverObjects prop = new serverObjects(); - prop.put("host", serverCore.publicIP().getHostAddress()); + prop.put("host", serverCore.publicLocalIP().getHostAddress()); prop.put("port", env.getConfig("port", "8080")); // return rewrite properties diff --git a/htroot/yacy/hello.java b/htroot/yacy/hello.java index 5241593b1..e33d95b57 100644 --- a/htroot/yacy/hello.java +++ b/htroot/yacy/hello.java @@ -83,7 +83,7 @@ public class hello { float clientversion = remoteSeed.getVersion(); int urls = -1; - if ((!(clientip.equals(reportedip))) && (clientversion >= (float)0.383)) { + if ((reportedip.length() > 0) && (!(clientip.equals(reportedip))) && (clientversion >= (float)0.383)) { // try first the reportedip, since this may be a connect from a port-forwarding host prop.put("yourip", reportedip); remoteSeed.put("IP", reportedip); @@ -117,7 +117,7 @@ public class hello { remoteSeed.put("LastSeen", yacyCore.universalDateShortString()); yacyCore.peerActions.juniorConnects++; // update statistics remoteSeed.put("PeerType", "junior"); - yacyCore.log.logInfo("hello: responded remote junior peer '" + remoteSeed.getName() + "' from " + remoteSeed.getAddress()); + yacyCore.log.logInfo("hello: responded remote junior peer '" + remoteSeed.getName() + "' from " + reportedip); // no connection here, instead store junior in connection cache if ((remoteSeed.hash != null) && (remoteSeed.isProper())) yacyCore.peerActions.peerPing(remoteSeed); } diff --git a/source/de/anomic/http/httpc.java b/source/de/anomic/http/httpc.java index ddc50544a..c410083f9 100644 --- a/source/de/anomic/http/httpc.java +++ b/source/de/anomic/http/httpc.java @@ -223,6 +223,11 @@ public final class httpc { } } + public boolean isClosed() { + if (this.socket == null) return true; + else return (!this.socket.isConnected()) || (this.socket.isClosed()); + } + public static String dnsResolve(String host) { // looks for the ip of host and returns ip number as string String ip = (String) nameCacheHit.get(host); diff --git a/source/de/anomic/http/httpdProxyHandler.java b/source/de/anomic/http/httpdProxyHandler.java index c96686f41..35747d402 100644 --- a/source/de/anomic/http/httpdProxyHandler.java +++ b/source/de/anomic/http/httpdProxyHandler.java @@ -67,10 +67,14 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.PushbackInputStream; +import java.net.BindException; +import java.net.ConnectException; import java.net.MalformedURLException; +import java.net.NoRouteToHostException; import java.net.Socket; import java.net.SocketException; import java.net.URL; +import java.net.UnknownHostException; import java.util.Date; import java.util.HashSet; import java.util.Properties; @@ -454,8 +458,20 @@ public final class httpdProxyHandler extends httpdAbstractHandler implements htt } } catch (Exception e) { - String errorMsg = "Unexpected Error. " + e.getClass().getName() + ": " + e.getMessage(); - this.theLogger.logError(errorMsg,e); + try { + String exTxt = e.getMessage(); + if ((exTxt!=null)&&(exTxt.startsWith("Socket closed"))) { + this.forceConnectionClose(); + } else if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) { + String errorMsg = "Unexpected Error. " + e.getClass().getName() + ": " + e.getMessage(); + httpd.sendRespondError(conProp,respond,4,501,null,errorMsg,e); + this.theLogger.logError(errorMsg); + } else { + this.forceConnectionClose(); + } + } catch (Exception ee) { + this.forceConnectionClose(); + } } finally { try { respond.flush(); } catch (Exception e) {} if (respond instanceof httpdByteCountOutputStream) ((httpdByteCountOutputStream)respond).finish(); @@ -595,141 +611,136 @@ public final class httpdProxyHandler extends httpdAbstractHandler implements htt // remove hop by hop headers this.removeHopByHopHeaders(res.responseHeader); - // request has been placed and result has been returned. work off response - try { - String[] resStatus = res.status.split(" "); - httpd.sendRespondHeader( - conProp, - respond, - httpVer, - Integer.parseInt((resStatus.length > 0) ? resStatus[0]:"503"), - (resStatus.length > 1) ? resStatus[1] : null, - res.responseHeader); - - String storeError; - if ((storeError = cacheEntry.shallStoreCache()) == null) { - // we write a new cache entry - if ((contentLength > 0) && (contentLength < 1048576)) // if the length is known and < 1 MB - { - // ok, we don't write actually into a file, only to RAM, and schedule writing the file. - byte[] cacheArray = res.writeContent(hfos); - this.theLogger.logDebug("writeContent of " + url + " produced cacheArray = " + ((cacheArray == null) ? "null" : ("size=" + cacheArray.length))); - - if (hfos instanceof htmlFilterOutputStream) ((htmlFilterOutputStream) hfos).finalize(); - - if (sizeBeforeDelete == -1) { - // totally fresh file - cacheEntry.status = plasmaHTCache.CACHE_FILL; // it's an insert - cacheManager.stackProcess(cacheEntry, cacheArray); - conProp.setProperty(httpd.CONNECTION_PROP_PROXY_RESPOND_CODE,"TCP_MISS"); - } else if (sizeBeforeDelete == cacheArray.length) { - // before we came here we deleted a cache entry - cacheArray = null; - cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_BAD; - cacheManager.stackProcess(cacheEntry); // unnecessary update - conProp.setProperty(httpd.CONNECTION_PROP_PROXY_RESPOND_CODE,"TCP_REF_FAIL_HIT"); - } else { - // before we came here we deleted a cache entry - cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_GOOD; - cacheManager.stackProcess(cacheEntry, cacheArray); // necessary update, write response header to cache - conProp.setProperty(httpd.CONNECTION_PROP_PROXY_RESPOND_CODE,"TCP_REFRESH_MISS"); - } + // request has been placed and result has been returned. work off response + String[] resStatus = res.status.split(" "); + + // sending the respond header back to the client + httpd.sendRespondHeader( + conProp, + respond, + httpVer, + Integer.parseInt((resStatus.length > 0) ? resStatus[0]:"503"), + (resStatus.length > 1) ? resStatus[1] : null, + res.responseHeader); + + String storeError; + if ((storeError = cacheEntry.shallStoreCache()) == null) { + // we write a new cache entry + if ((contentLength > 0) && (contentLength < 1048576)) // if the length is known and < 1 MB + { + // ok, we don't write actually into a file, only to RAM, and schedule writing the file. + byte[] cacheArray = res.writeContent(hfos); + this.theLogger.logDebug("writeContent of " + url + " produced cacheArray = " + ((cacheArray == null) ? "null" : ("size=" + cacheArray.length))); + + if (hfos instanceof htmlFilterOutputStream) ((htmlFilterOutputStream) hfos).finalize(); + + if (sizeBeforeDelete == -1) { + // totally fresh file + cacheEntry.status = plasmaHTCache.CACHE_FILL; // it's an insert + cacheManager.stackProcess(cacheEntry, cacheArray); + conProp.setProperty(httpd.CONNECTION_PROP_PROXY_RESPOND_CODE,"TCP_MISS"); + } else if (sizeBeforeDelete == cacheArray.length) { + // before we came here we deleted a cache entry + cacheArray = null; + cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_BAD; + cacheManager.stackProcess(cacheEntry); // unnecessary update + conProp.setProperty(httpd.CONNECTION_PROP_PROXY_RESPOND_CODE,"TCP_REF_FAIL_HIT"); } else { - // the file is too big to cache it in the ram, or the size is unknown - // write to file right here. - cacheFile.getParentFile().mkdirs(); - res.writeContent(hfos, cacheFile); - if (hfos instanceof htmlFilterOutputStream) ((htmlFilterOutputStream) hfos).finalize(); - this.theLogger.logDebug("for write-file of " + url + ": contentLength = " + contentLength + ", sizeBeforeDelete = " + sizeBeforeDelete); - if (sizeBeforeDelete == -1) { - // totally fresh file - cacheEntry.status = plasmaHTCache.CACHE_FILL; // it's an insert - cacheManager.stackProcess(cacheEntry); - } else if (sizeBeforeDelete == cacheFile.length()) { - // before we came here we deleted a cache entry - cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_BAD; - cacheManager.stackProcess(cacheEntry); // unnecessary update - } else { - // before we came here we deleted a cache entry - cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_GOOD; - cacheManager.stackProcess(cacheEntry); // necessary update, write response header to cache - } - // beware! all these writings will not fill the cacheEntry.cacheArray - // that means they are not available for the indexer (except they are scraped before) - } + // before we came here we deleted a cache entry + cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_GOOD; + cacheManager.stackProcess(cacheEntry, cacheArray); // necessary update, write response header to cache + conProp.setProperty(httpd.CONNECTION_PROP_PROXY_RESPOND_CODE,"TCP_REFRESH_MISS"); + } } else { - // no caching - this.theLogger.logDebug(cacheFile.toString() + " not cached: " + storeError); - res.writeContent(hfos, null); + // the file is too big to cache it in the ram, or the size is unknown + // write to file right here. + cacheFile.getParentFile().mkdirs(); + res.writeContent(hfos, cacheFile); if (hfos instanceof htmlFilterOutputStream) ((htmlFilterOutputStream) hfos).finalize(); + this.theLogger.logDebug("for write-file of " + url + ": contentLength = " + contentLength + ", sizeBeforeDelete = " + sizeBeforeDelete); if (sizeBeforeDelete == -1) { - // no old file and no load. just data passing - cacheEntry.status = plasmaHTCache.CACHE_PASSING; + // totally fresh file + cacheEntry.status = plasmaHTCache.CACHE_FILL; // it's an insert cacheManager.stackProcess(cacheEntry); + } else if (sizeBeforeDelete == cacheFile.length()) { + // before we came here we deleted a cache entry + cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_BAD; + cacheManager.stackProcess(cacheEntry); // unnecessary update } else { // before we came here we deleted a cache entry - cacheEntry.status = plasmaHTCache.CACHE_STALE_NO_RELOAD; - cacheManager.stackProcess(cacheEntry); + cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_GOOD; + cacheManager.stackProcess(cacheEntry); // necessary update, write response header to cache } + // beware! all these writings will not fill the cacheEntry.cacheArray + // that means they are not available for the indexer (except they are scraped before) } - - if (gzippedOut != null) { - gzippedOut.finish(); - } - if (chunkedOut != null) { - chunkedOut.finish(); - chunkedOut.flush(); - } - - } catch (SocketException e) { - // this may happen if the client suddenly closes its connection - // maybe the user has stopped loading - // in that case, we are not responsible and just forget it - // but we clean the cache also, since it may be only partial - // and most possible corrupted - if (cacheFile.exists()) cacheFile.delete(); - if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) { - httpd.sendRespondError(conProp,respond,4,404,null,"client unexpectedly closed connection",e); - } else { - conProp.put(httpd.CONNECTION_PROP_PERSISTENT,"close"); - } - } catch (IOException e) { - // can have various reasons - if (cacheFile.exists()) cacheFile.delete(); - if (e.getMessage().indexOf("Corrupt GZIP trailer") >= 0) { - // just do nothing, we leave it this way - this.theLogger.logDebug("ignoring bad gzip trail for URL " + url + " (" + e.getMessage() + ")",e); - conProp.put(httpd.CONNECTION_PROP_PERSISTENT,"close"); + } else { + // no caching + this.theLogger.logDebug(cacheFile.toString() + " not cached: " + storeError); + res.writeContent(hfos, null); + if (hfos instanceof htmlFilterOutputStream) ((htmlFilterOutputStream) hfos).finalize(); + if (sizeBeforeDelete == -1) { + // no old file and no load. just data passing + cacheEntry.status = plasmaHTCache.CACHE_PASSING; + cacheManager.stackProcess(cacheEntry); } else { - if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) { - httpd.sendRespondError(conProp,respond,4,404,null,"client unexpectedly closed connection",e); - this.theLogger.logDebug("IOError for URL " + url + " (" + e.getMessage() + ") - responded 404",e); - } else { - conProp.put(httpd.CONNECTION_PROP_PERSISTENT,"close"); - } + // before we came here we deleted a cache entry + cacheEntry.status = plasmaHTCache.CACHE_STALE_NO_RELOAD; + cacheManager.stackProcess(cacheEntry); } - + } + + if (gzippedOut != null) { + gzippedOut.finish(); + } + if (chunkedOut != null) { + chunkedOut.finish(); + chunkedOut.flush(); } } catch (Exception e) { // this may happen if the targeted host does not exist or anything with the // remote server was wrong. // in any case, sending a 404 is appropriate try { - if ((e.toString().indexOf("unknown host")) > 0) { - if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) { - httpd.sendRespondError(conProp,respond,4,404,null,"unknown host",e); - } else { - conProp.put(httpd.CONNECTION_PROP_PERSISTENT,"close"); - } + + // deleting cached content + if (cacheFile.exists()) cacheFile.delete(); + + // doing some errorhandling ... + int httpStatusCode = 404; + String httpStatusText = null; + String errorMessage = null; + Exception errorExc = e; + + if (e instanceof ConnectException) { + httpStatusCode = 403; httpStatusText = "Connection refused"; + errorMessage = "Connection refused by destination host"; + } else if (e instanceof BindException) { + errorMessage = "Unable to establish a connection to the destination host"; + } else if (e instanceof NoRouteToHostException) { + errorMessage = "No route to destination host"; + } else if (e instanceof UnknownHostException) { + errorMessage = "IP address of the destination host could not be determined"; } else { - if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) { - httpd.sendRespondError(conProp,respond,4,404,null,"Not Found",e); + if (e.getMessage().indexOf("Corrupt GZIP trailer") >= 0) { + // just do nothing, we leave it this way + this.theLogger.logDebug("ignoring bad gzip trail for URL " + url + " (" + e.getMessage() + ")",e); + this.forceConnectionClose(); + } else if ((remote != null)&&(remote.isClosed())) { + errorMessage = "destination host unexpectedly closed connection"; } else { - conProp.put(httpd.CONNECTION_PROP_PERSISTENT,"close"); + errorMessage = "Unexpected Error. " + e.getClass().getName() + ": " + e.getMessage(); } } + + // sending back an error message to the client + if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) { + httpd.sendRespondError(conProp,respond,4,httpStatusCode,httpStatusText,errorMessage,errorExc); + } else { + this.forceConnectionClose(); + } } catch (Exception ee) { - conProp.put(httpd.CONNECTION_PROP_PERSISTENT,"close"); + this.forceConnectionClose(); } } finally { if (remote != null) httpc.returnInstance(remote); @@ -850,6 +861,12 @@ public final class httpdProxyHandler extends httpdAbstractHandler implements htt headers.remove(httpHeader.X_CACHE_LOOKUP); } + private void forceConnectionClose() { + if (this.currentConProp != null) { + this.currentConProp.setProperty(httpd.CONNECTION_PROP_PERSISTENT,"close"); + } + } + public void doHead(Properties conProp, httpHeader requestHeader, OutputStream respond) throws IOException { this.currentConProp = conProp; @@ -898,25 +915,32 @@ public final class httpdProxyHandler extends httpdAbstractHandler implements htt httpc.response res = null; try { - // open the connection - if (yAddress == null) { - remote = newhttpc(host, port, timeout); - } else { - remote = newhttpc(yAddress, timeout); // with [AS] patch - } + // open the connection: second is needed for [AS] patch + remote = (yAddress == null) ? newhttpc(host, port, timeout): newhttpc(yAddress, timeout); + + // sending the http-HEAD request to the server res = remote.HEAD(remotePath, requestHeader); + + // removing hop by hop headers + this.removeHopByHopHeaders(res.responseHeader); + + // sending the server respond back to the client httpd.sendRespondHeader(conProp,respond,httpVer,Integer.parseInt(res.status.split(" ")[0]),res.responseHeader); - //respondHeader(respond, res.status, res.responseHeader); } catch (Exception e) { try { - if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) { - httpd.sendRespondError(conProp,respond,4,404,null,"Not Found",e); + String exTxt = e.getMessage(); + if ((exTxt!=null)&&(exTxt.startsWith("Socket closed"))) { + this.forceConnectionClose(); + } else if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) { + String errorMsg = "Unexpected Error. " + e.getClass().getName() + ": " + e.getMessage(); + httpd.sendRespondError(conProp,respond,4,503,null,errorMsg,e); + this.theLogger.logError(errorMsg); } else { - conProp.setProperty(httpd.CONNECTION_PROP_PERSISTENT,"close"); + this.forceConnectionClose(); } } catch (Exception ee) { - conProp.setProperty(httpd.CONNECTION_PROP_PERSISTENT,"close"); - } + this.forceConnectionClose(); + } } finally { if (remote != null) httpc.returnInstance(remote); } @@ -988,24 +1012,25 @@ public final class httpdProxyHandler extends httpdAbstractHandler implements htt if (chunked != null) chunked.finish(); remote.close(); - } catch (Exception e) { - try { - if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) { - httpd.sendRespondError(conProp,respond,4,404,null,"Not Found",e); - } else { - conProp.setProperty(httpd.CONNECTION_PROP_PERSISTENT,"close"); - } - } catch (Exception ee) { - conProp.setProperty(httpd.CONNECTION_PROP_PERSISTENT,"close"); - } } finally { if (remote != null) httpc.returnInstance(remote); } respond.flush(); } catch (Exception e) { - String errorMsg = "Unexpected Error. " + e.getClass().getName() + ": " + e.getMessage(); - System.err.println("PROXY: " + errorMsg); - this.theLogger.logError(errorMsg); + try { + String exTxt = e.getMessage(); + if ((exTxt!=null)&&(exTxt.startsWith("Socket closed"))) { + this.forceConnectionClose(); + } else if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) { + String errorMsg = "Unexpected Error. " + e.getClass().getName() + ": " + e.getMessage(); + httpd.sendRespondError(conProp,respond,4,503,null,errorMsg,e); + this.theLogger.logError(errorMsg); + } else { + this.forceConnectionClose(); + } + } catch (Exception ee) { + this.forceConnectionClose(); + } } finally { respond.flush(); if (respond instanceof httpdByteCountOutputStream) ((httpdByteCountOutputStream)respond).finish(); diff --git a/source/de/anomic/server/serverCore.java b/source/de/anomic/server/serverCore.java index 1069393d8..f86b256a4 100644 --- a/source/de/anomic/server/serverCore.java +++ b/source/de/anomic/server/serverCore.java @@ -78,6 +78,7 @@ import org.apache.commons.pool.impl.GenericObjectPool.Config; import de.anomic.http.httpd; import de.anomic.server.logging.serverLog; +import de.anomic.yacy.yacyCore; public final class serverCore extends serverAbstractThread implements serverThread { @@ -232,7 +233,10 @@ public final class serverCore extends serverAbstractThread implements serverThre localPort.intValue()); serverCore.portForwarding.connect(); + serverCore.portForwardingEnabled = true; + yacyCore.seedDB.mySeed.put("IP",publicIP().getHostAddress()); + yacyCore.seedDB.mySeed.put("Port",Integer.toString(serverCore.portForwarding.getPort())); this.log.logInfo("Remote port forwarding connection established: " + portFwHost+":"+portFwPort+" -> "+localHost+":"+localPort); } catch (Exception e) { @@ -402,31 +406,39 @@ public final class serverCore extends serverAbstractThread implements serverThre connection.execute(controlSocket,this.timeout); //log.logDebug("* NEW SESSION: " + connection.request + " from " + clientIP); } else { - System.out.println("ACCESS FROM " + cIP + " DENIED"); + this.log.logWarning("ACCESS FROM " + cIP + " DENIED"); } return true; } public void close() { - try { - // consuming the isInterrupted Flag. Otherwise we could not properly close the session pool - Thread.interrupted(); - - // closing the port forwarding channel - if ((portForwardingEnabled) && (portForwarding != null) ) { + // consuming the isInterrupted Flag. Otherwise we could not properly close the session pool + Thread.interrupted(); + + // closing the port forwarding channel + if ((portForwardingEnabled) && (portForwarding != null) ) { + try { portForwarding.disconnect(); + } catch (Exception e) { + this.log.logWarning("Unable to shutdown the port forwarding channel."); } - - // close the session pool - this.theSessionPool.close(); - - // closing the serverchannel and socket - this.socket.close(); } - catch (Exception e) { - this.log.logSystem("Unable to close session pool: " + e.getMessage()); + + // close the session pool + try { + this.theSessionPool.close(); + } catch (Exception e) { + this.log.logWarning("Unable to close the session pool."); + } + + // closing the serverchannel and socket + try { + this.socket.close(); + } catch (Exception e) { + this.log.logWarning("Unable to close the server socket."); } + this.log.logSystem("* terminated"); } @@ -504,7 +516,7 @@ public final class serverCore extends serverAbstractThread implements serverThre if (currentSession.isAlive()) { if ((currentSession.controlSocket != null)&&(currentSession.controlSocket.isConnected())) { currentSession.controlSocket.close(); - serverCore.this.log.logInfo("Closing socket of thread " + currentSession.getName()); + serverCore.this.log.logInfo("Closing socket of thread '" + currentSession.getName() + "'"); } } } diff --git a/source/de/anomic/yacy/yacyClient.java b/source/de/anomic/yacy/yacyClient.java index 29e5654ff..703a2d22c 100644 --- a/source/de/anomic/yacy/yacyClient.java +++ b/source/de/anomic/yacy/yacyClient.java @@ -122,11 +122,12 @@ public class yacyClient { Date remoteTime = yacyCore.parseUniversalDate((String) result.get("mytime")); // read remote time // check consistency with expectation + yacySeed otherPeer = null; float otherPeerVersion = 0; if ((otherHash != null ) && (otherHash.length() > 0)) { - yacySeed otherPeer = yacySeed.genRemoteSeed((String) result.get("seed0"), key, remoteTime); + otherPeer = yacySeed.genRemoteSeed((String) result.get("seed0"), key, remoteTime); if ((otherPeer == null) || (!(otherPeer.hash.equals(otherHash)))) { - yacyCore.log.logDebug("yacyClient.publishMySeed consistency error: other peer wrong"); + yacyCore.log.logDebug("yacyClient.publishMySeed consistency error: other peer '" + ((otherPeer==null)?"unknown":otherPeer.getName()) + "' wrong"); return -1; // no success } otherPeerVersion = otherPeer.getVersion(); @@ -138,6 +139,8 @@ public class yacyClient { // we overwrite our own IP number only, if we do not portForwarding if (!serverCore.portForwardingEnabled) { yacyCore.seedDB.mySeed.put("IP", (String) result.get("yourip")); + } else { + yacyCore.seedDB.mySeed.put("IP", serverCore.publicIP().getHostAddress()); } /* If we have port forwarding enabled but the other peer uses a too old yacy version @@ -151,6 +154,10 @@ public class yacyClient { String mytype = (String) result.get("yourtype"); if (mytype == null) mytype = "junior"; if ((yacyCore.seedDB.mySeed.get("PeerType", "junior").equals("principal")) && (mytype.equals("senior"))) mytype = "principal"; + + if (mytype.equalsIgnoreCase("junior")) { + yacyCore.log.logInfo("yacyClient.publishMySeed: Peer '" + ((otherPeer==null)?"unknown":otherPeer.getName()) + "' reported us as junior."); + } yacyCore.seedDB.mySeed.put("PeerType", mytype); } diff --git a/source/de/anomic/yacy/yacyCore.java b/source/de/anomic/yacy/yacyCore.java index 62f389fef..b039ee925 100644 --- a/source/de/anomic/yacy/yacyCore.java +++ b/source/de/anomic/yacy/yacyCore.java @@ -62,9 +62,13 @@ import java.net.MalformedURLException; import java.net.URI; import java.net.URL; import java.text.SimpleDateFormat; +import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.GregorianCalendar; import java.util.Hashtable; +import java.util.LinkedList; +import java.util.List; import java.util.TimeZone; import java.util.Vector; @@ -72,11 +76,14 @@ import de.anomic.http.httpc; import de.anomic.net.natLib; import de.anomic.plasma.plasmaSwitchboard; import de.anomic.server.logging.serverLog; +import de.anomic.server.serverCore; +import de.anomic.server.serverSemaphore; import de.anomic.server.serverSwitch; public class yacyCore { // statics + public static ThreadGroup publishThreadGroup = new ThreadGroup("publishThreadGroup"); public static long startupTime = System.currentTimeMillis(); public static yacySeedDB seedDB = null; public static final Hashtable seedUploadMethods = new Hashtable(); @@ -289,118 +296,191 @@ public class yacyCore { public int added; public yacySeed seed; public Exception error; + private final serverSemaphore sync; + private final List syncList; - public publishThread(yacySeed seed) { - super("PublishSeed_" + seed.getName()); + public publishThread(ThreadGroup tg, yacySeed seed, serverSemaphore sync, List syncList) throws InterruptedException { + super(tg, "PublishSeed_" + seed.getName()); + + this.sync = sync; + this.sync.P(); + this.syncList = syncList; + this.seed = seed; this.added = 0; this.error = null; } public void run() { - try { - added = yacyClient.publishMySeed(seed.getAddress(), seed.hash); - if (added < 0) { + try { + this.added = yacyClient.publishMySeed(seed.getAddress(), seed.hash); + if (this.added < 0) { // no or wrong response, delete that address - log.logInfo("publish: disconnected " + seed.get("PeerType", "senior") + " peer '" + seed.getName() + "' from " + seed.getAddress()); - peerActions.peerDeparture(seed); + log.logInfo("publish: disconnected " + this.seed.get("PeerType", "senior") + " peer '" + this.seed.getName() + "' from " + this.seed.getAddress()); + peerActions.peerDeparture(this.seed); } else { // success! we have published our peer to a senior peer // update latest news from the other peer - log.logInfo("publish: handshaked " + seed.get("PeerType", "senior") + " peer '" + seed.getName() + "' at " + seed.getAddress()); + log.logInfo("publish: handshaked " + this.seed.get("PeerType", "senior") + " peer '" + this.seed.getName() + "' at " + this.seed.getAddress()); } } catch (Exception e) { - error = e; + this.error = e; + } finally { + this.syncList.add(this); + this.sync.V(); } } } private int publishMySeed(boolean force) { - // call this after the httpd was started up - - // we need to find out our own ip - // This is not always easy, since the application may - // live behind a firewall or nat. - // the normal way to do this is either measure the value that java gives us, - // but this is not correct if the peer lives behind a NAT/Router or has several - // addresses and not the right one can be found out. - // We have several alternatives: - // 1. ask another peer. This should be normal and the default method. - // but if no other peer lives, or we don't know them, we cannot do that - // 2. ask own NAT. This is only an option if the NAT is a DI604, because this is the - // only supported for address retrieval - // 3. ask ip respond services in the internet. There are several, and they are all - // probed until we get a valid response. - - // init yacyHello-process - String address; - int added; - yacySeed[] seeds; - int attempts = seedDB.sizeConnected(); if (attempts > 10) attempts = 10; - if (seedDB.mySeed.get("PeerType", "virgin").equals("virgin")) { - seeds = seedDB.seedsByAge(true, attempts); // best for fast connection - } else { - seeds = seedDB.seedsByAge(false, attempts); // best for seed list maintenance/cleaning - } - if (seeds == null) return 0; - Vector v = new Vector(); // memory for threads - publishThread t; - for (int i = 0; i < seeds.length; i++) { - if (seeds[i] == null) continue; - log.logDebug("HELLO #" + i + " to peer " + seeds[i].get("Name", "")); // debug - address = seeds[i].getAddress(); - if ((address == null) || (!(seeds[i].isProper()))) { - // we don't like that address, delete it - peerActions.peerDeparture(seeds[i]); - } else { - // ask senior peer - t = new publishThread(seeds[i]); - v.add(t); - t.start(); - } - - // wait - try { - if (i == 0) Thread.currentThread().sleep(2000); // after the first time wait some seconds - Thread.currentThread().sleep(1000 + 500 * v.size()); // wait a while - } catch (InterruptedException e) {} + try { + // call this after the httpd was started up + + // we need to find out our own ip + // This is not always easy, since the application may + // live behind a firewall or nat. + // the normal way to do this is either measure the value that java gives us, + // but this is not correct if the peer lives behind a NAT/Router or has several + // addresses and not the right one can be found out. + // We have several alternatives: + // 1. ask another peer. This should be normal and the default method. + // but if no other peer lives, or we don't know them, we cannot do that + // 2. ask own NAT. This is only an option if the NAT is a DI604, because this is the + // only supported for address retrieval + // 3. ask ip respond services in the internet. There are several, and they are all + // probed until we get a valid response. + + // init yacyHello-process + yacySeed[] seeds; - // check all threads - for (int j = 0; j < v.size(); j++) { - t = (publishThread) v.elementAt(j); - added = t.added; - if (!(t.isAlive())) { - //log.logDebug("PEER " + seeds[j].get("Name", "") + " request terminated"); // debug - if (added >= 0) { - // success! we have published our peer to a senior peer - // update latest news from the other peer - //log.logInfo("publish: handshaked " + t.seed.get("PeerType", "senior") + " peer '" + t.seed.getName() + "' at " + t.seed.getAddress()); - peerActions.saveMySeed(); - return added; + int attempts = seedDB.sizeConnected(); + if (attempts > 10) attempts = 10; + + // getting a list of peers to contact + if (seedDB.mySeed.get("PeerType", "virgin").equals("virgin")) { + seeds = seedDB.seedsByAge(true, attempts); // best for fast connection + } else { + seeds = seedDB.seedsByAge(false, attempts); // best for seed list maintenance/cleaning + } + if (seeds == null) return 0; + + // holding a reference to all started threads + int contactedSeedCount = 0; + List syncList = Collections.synchronizedList(new LinkedList()); // memory for threads + serverSemaphore sync = new serverSemaphore(attempts); + + // going through the peer list and starting a new publisher thread for each peer + for (int i = 0; i < seeds.length; i++) { + if (seeds[i] == null) continue; + + String address = seeds[i].getAddress(); + log.logDebug("HELLO #" + i + " to peer '" + seeds[i].get("Name", "") + "' at " + address); // debug + if ((address == null) || (!(seeds[i].isProper()))) { + // we don't like that address, delete it + peerActions.peerDeparture(seeds[i]); + sync.V(); + } else { + // starting a new publisher thread + contactedSeedCount++; + (new publishThread(yacyCore.publishThreadGroup,seeds[i],sync,syncList)).start(); + } + } + + // receiving the result of all started publisher threads + int newSeeds = -1; + for (int j=0; j < contactedSeedCount; j++) { + + // waiting for the next thread to finish + sync.P(); + + // if this is true something is wrong ... + if (syncList.isEmpty()) return 0; + + // getting a reference to the finished thread + publishThread t = (publishThread) syncList.remove(0); + + // getting the amount of new reported seeds + if (t.added >= 0) { + if (newSeeds==-1) newSeeds = t.added; + else newSeeds += t.added; + } + } + + if (newSeeds >= 0) { + // success! we have published our peer to a senior peer + // update latest news from the other peer + //log.logInfo("publish: handshaked " + t.seed.get("PeerType", "senior") + " peer '" + t.seed.getName() + "' at " + t.seed.getAddress()); + peerActions.saveMySeed(); + return newSeeds; + } + +// // wait +// try { +// if (i == 0) Thread.currentThread().sleep(2000); // after the first time wait some seconds +// Thread.currentThread().sleep(1000 + 500 * v.size()); // wait a while +// } catch (InterruptedException e) {} +// +// // check all threads +// for (int j = 0; j < v.size(); j++) { +// t = (publishThread) v.elementAt(j); +// added = t.added; +// if (!(t.isAlive())) { +// //log.logDebug("PEER " + seeds[j].get("Name", "") + " request terminated"); // debug +// if (added >= 0) { +// // success! we have published our peer to a senior peer +// // update latest news from the other peer +// //log.logInfo("publish: handshaked " + t.seed.get("PeerType", "senior") + " peer '" + t.seed.getName() + "' at " + t.seed.getAddress()); +// peerActions.saveMySeed(); +// return added; +// } +// } +// } + + // if we have an address, we do nothing + if ((seedDB.mySeed.isProper()) && (!(force))) return 0; + + // still no success: ask own NAT or internet responder + boolean DI604use = switchboard.getConfig("DI604use", "false").equals("true"); + String DI604pw = switchboard.getConfig("DI604pw", ""); + String ip = switchboard.getConfig("staticIP", ""); + if(ip.equals("")){ + ip = natLib.retrieveIP(DI604use, DI604pw, (switchboard.getConfig("yacyDebugMode", "false")=="false" ? false : true)); + } + //System.out.println("DEBUG: new IP=" + ip); + seedDB.mySeed.put("IP", ip); + if (seedDB.mySeed.get("PeerType", "junior").equals("junior")) // ??????????????? + seedDB.mySeed.put("PeerType", "senior"); // to start bootstraping, we need to be recognised as "senior" peer + log.logInfo("publish: no recipient found, asked NAT or responder; our address is " + + ((seedDB.mySeed.getAddress() == null) ? "unknown" : seedDB.mySeed.getAddress())); + peerActions.saveMySeed(); + return 0; + } catch (InterruptedException e) { + log.logInfo("publish: Interruption detected while publishing my seed."); + + // interrupt all already started publishThreads + log.logInfo("publish: Trying to shutdown all remaining publishing threads ..."); + yacyCore.publishThreadGroup.interrupt(); + + // waiting some time for the publishThreads to finish handshake + int threadCount = yacyCore.publishThreadGroup.activeCount(); + Thread[] threadList = new Thread[threadCount]; + threadCount = yacyCore.publishThreadGroup.enumerate(threadList); + try { + // we need to use a timeout here because of missing interruptable session threads ... + for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) { + if (threadList[currentThreadIdx].isAlive()) { + log.logInfo("publish: Waiting for remaining publishing thread '" + threadList[currentThreadIdx].getName() + "' to finish shutdown"); + threadList[currentThreadIdx].join(500); } } } - } - - // if we have an address, we do nothing - if ((seedDB.mySeed.isProper()) && (!(force))) return 0; - - // still no success: ask own NAT or internet responder - boolean DI604use = switchboard.getConfig("DI604use", "false").equals("true"); - String DI604pw = switchboard.getConfig("DI604pw", ""); - String ip = switchboard.getConfig("staticIP", ""); - if(ip.equals("")){ - ip = natLib.retrieveIP(DI604use, DI604pw, (switchboard.getConfig("yacyDebugMode", "false")=="false" ? false : true)); - } - //System.out.println("DEBUG: new IP=" + ip); - seedDB.mySeed.put("IP", ip); - if (seedDB.mySeed.get("PeerType", "junior").equals("junior")) // ??????????????? - seedDB.mySeed.put("PeerType", "senior"); // to start bootstraping, we need to be recognised as "senior" peer - log.logInfo("publish: no recipient found, asked NAT or responder; our address is " + - ((seedDB.mySeed.getAddress() == null) ? "unknown" : seedDB.mySeed.getAddress())); - peerActions.saveMySeed(); - return 0; + catch (InterruptedException ee) { + log.logWarning("Interruption while trying to shutdown all remaining publishing threads."); + } + + return 0; + } } public static Hashtable getSeedUploadMethods() { @@ -459,7 +539,8 @@ public class yacyCore { String[] neededLibx = ((yacySeedUploader)theUploader).getLibxDependences(); if (neededLibx != null) { for (int libxId=0; libxId < neededLibx.length; libxId++) { - if (javaClassPath.indexOf(neededLibx[libxId]) == -1) continue; + if (javaClassPath.indexOf(neededLibx[libxId]) == -1) + throw new Exception("Missing dependency"); } } availableUploaders.put(className.substring("yacySeedUpload".length()),fullClassName); @@ -519,7 +600,8 @@ public class yacyCore { sb.setConfig("seedUploadMethod",seedUploadMethod); } else if ( (seedUploadMethod.equalsIgnoreCase("File")) || - (sb.getConfig("seedFilePath", "").length() > 0) + ((seedUploadMethod.equals("")) && + (sb.getConfig("seedFilePath", "").length() > 0)) ) { seedUploadMethod = "File"; sb.setConfig("seedUploadMethod",seedUploadMethod); diff --git a/source/de/anomic/yacy/yacySeedDB.java b/source/de/anomic/yacy/yacySeedDB.java index 2ef97e1ee..16360c72b 100644 --- a/source/de/anomic/yacy/yacySeedDB.java +++ b/source/de/anomic/yacy/yacySeedDB.java @@ -92,12 +92,12 @@ public class yacySeedDB { public yacySeedDB(plasmaSwitchboard sb, - File seedActiveDBFile, - File seedPassiveDBFile, - File seedPotentialDBFile, - int bufferkb) throws IOException { - - this.seedActiveDBFile = seedActiveDBFile; + File seedActiveDBFile, + File seedPassiveDBFile, + File seedPotentialDBFile, + int bufferkb) throws IOException { + + this.seedActiveDBFile = seedActiveDBFile; this.seedPassiveDBFile = seedPassiveDBFile; this.seedPotentialDBFile = seedPotentialDBFile; this.mySeed = null; // my own seed @@ -110,33 +110,34 @@ public class yacySeedDB { // create or init own seed myOwnSeedFile = new File(sb.getRootPath(), sb.getConfig("yacyOwnSeedFile", "mySeed.txt")); - if (myOwnSeedFile.exists() && (myOwnSeedFile.length() > 0)) { - // load existing identity - mySeed = yacySeed.load(myOwnSeedFile); - } else { - // create new identity - mySeed = yacySeed.genLocalSeed(sb); - // save of for later use - mySeed.save(myOwnSeedFile); // in a file - //writeMap(mySeed.hash, mySeed.dna, "new"); // in a database - } + if (myOwnSeedFile.exists() && (myOwnSeedFile.length() > 0)) { + // load existing identity + mySeed = yacySeed.load(myOwnSeedFile); + } else { + // create new identity + mySeed = yacySeed.genLocalSeed(sb); + // save of for later use + mySeed.save(myOwnSeedFile); // in a file + //writeMap(mySeed.hash, mySeed.dna, "new"); // in a database + } + + if (sb.getConfig("portForwardingEnabled","false").equalsIgnoreCase("true")) { + mySeed.put("Port", sb.getConfig("portForwardingPort","8080")); + mySeed.put("IP", sb.getConfig("portForwardingHost","localhost")); + } else { + mySeed.put("IP", ""); // we delete the old information to see what we have now + mySeed.put("Port", sb.getConfig("port", "8080")); // set my seed's correct port number + } + mySeed.put("PeerType", "virgin"); // markup startup condition - mySeed.put("IP", ""); // we delete the old information to see what we have now - if ((serverCore.portForwardingEnabled) && (serverCore.portForwarding != null)) { - mySeed.put("Port", Integer.toString(serverCore.portForwarding.getPort())); - } else { - mySeed.put("Port", sb.getConfig("port", "8080")); // set my seed's correct port number - } - mySeed.put("PeerType", "virgin"); // markup startup condition - // start our virtual DNS service for yacy peers with empty cache nameLookupCache = new Hashtable(); // check if we are in the seedCaches: this can happen if someone else published our seed removeMySeed(); - - // set up seed queue (for probing candidates) - seedQueue = null; + + // set up seed queue (for probing candidates) + seedQueue = null; } public synchronized void removeMySeed() {