*) autoconfig.java: ip address was not reported correctly when port-forwardin is on

*) hello.java: reportedip my be empty at peer startup
*) httpc.java: adding method to determine if the connection was already closed or is broken
*) httpdProxyHandler.java: trying to do a better errorhandling
*) server/serverCore.java
- setting myseed ip-address and port correctly if port-forwarding is on
- doing a more failsafe close and adding some debugging output
*) yacyClient.java: adding some logging statements to allow a better detection of 
   "degraded to senior"-bug
*) yacyCore.java: restructuring publishMySeed
   (@Orbiter: pleas take a look)
- to avoid buzy waiting
- to allow a gracefull shutdown on server shutdown
- new seed count was not calculated correctly in the previous version
*) yacySeedDB.java: host ip and port was not initialized correctly if port-forwarding
   was activated

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@318 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
theli 20 years ago
parent 933256bbfc
commit d53b2393e5

@ -62,7 +62,7 @@ public class autoconfig {
serverObjects prop = new serverObjects();
prop.put("host", serverCore.publicIP().getHostAddress());
prop.put("host", serverCore.publicLocalIP().getHostAddress());
prop.put("port", env.getConfig("port", "8080"));
// return rewrite properties

@ -83,7 +83,7 @@ public class hello {
float clientversion = remoteSeed.getVersion();
int urls = -1;
if ((!(clientip.equals(reportedip))) && (clientversion >= (float)0.383)) {
if ((reportedip.length() > 0) && (!(clientip.equals(reportedip))) && (clientversion >= (float)0.383)) {
// try first the reportedip, since this may be a connect from a port-forwarding host
prop.put("yourip", reportedip);
remoteSeed.put("IP", reportedip);
@ -117,7 +117,7 @@ public class hello {
remoteSeed.put("LastSeen", yacyCore.universalDateShortString());
yacyCore.peerActions.juniorConnects++; // update statistics
remoteSeed.put("PeerType", "junior");
yacyCore.log.logInfo("hello: responded remote junior peer '" + remoteSeed.getName() + "' from " + remoteSeed.getAddress());
yacyCore.log.logInfo("hello: responded remote junior peer '" + remoteSeed.getName() + "' from " + reportedip);
// no connection here, instead store junior in connection cache
if ((remoteSeed.hash != null) && (remoteSeed.isProper())) yacyCore.peerActions.peerPing(remoteSeed);
}

@ -223,6 +223,11 @@ public final class httpc {
}
}
public boolean isClosed() {
if (this.socket == null) return true;
else return (!this.socket.isConnected()) || (this.socket.isClosed());
}
public static String dnsResolve(String host) {
// looks for the ip of host <host> and returns ip number as string
String ip = (String) nameCacheHit.get(host);

@ -67,10 +67,14 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.net.BindException;
import java.net.ConnectException;
import java.net.MalformedURLException;
import java.net.NoRouteToHostException;
import java.net.Socket;
import java.net.SocketException;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashSet;
import java.util.Properties;
@ -454,8 +458,20 @@ public final class httpdProxyHandler extends httpdAbstractHandler implements htt
}
} catch (Exception e) {
String errorMsg = "Unexpected Error. " + e.getClass().getName() + ": " + e.getMessage();
this.theLogger.logError(errorMsg,e);
try {
String exTxt = e.getMessage();
if ((exTxt!=null)&&(exTxt.startsWith("Socket closed"))) {
this.forceConnectionClose();
} else if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) {
String errorMsg = "Unexpected Error. " + e.getClass().getName() + ": " + e.getMessage();
httpd.sendRespondError(conProp,respond,4,501,null,errorMsg,e);
this.theLogger.logError(errorMsg);
} else {
this.forceConnectionClose();
}
} catch (Exception ee) {
this.forceConnectionClose();
}
} finally {
try { respond.flush(); } catch (Exception e) {}
if (respond instanceof httpdByteCountOutputStream) ((httpdByteCountOutputStream)respond).finish();
@ -595,141 +611,136 @@ public final class httpdProxyHandler extends httpdAbstractHandler implements htt
// remove hop by hop headers
this.removeHopByHopHeaders(res.responseHeader);
// request has been placed and result has been returned. work off response
try {
String[] resStatus = res.status.split(" ");
httpd.sendRespondHeader(
conProp,
respond,
httpVer,
Integer.parseInt((resStatus.length > 0) ? resStatus[0]:"503"),
(resStatus.length > 1) ? resStatus[1] : null,
res.responseHeader);
String storeError;
if ((storeError = cacheEntry.shallStoreCache()) == null) {
// we write a new cache entry
if ((contentLength > 0) && (contentLength < 1048576)) // if the length is known and < 1 MB
{
// ok, we don't write actually into a file, only to RAM, and schedule writing the file.
byte[] cacheArray = res.writeContent(hfos);
this.theLogger.logDebug("writeContent of " + url + " produced cacheArray = " + ((cacheArray == null) ? "null" : ("size=" + cacheArray.length)));
if (hfos instanceof htmlFilterOutputStream) ((htmlFilterOutputStream) hfos).finalize();
if (sizeBeforeDelete == -1) {
// totally fresh file
cacheEntry.status = plasmaHTCache.CACHE_FILL; // it's an insert
cacheManager.stackProcess(cacheEntry, cacheArray);
conProp.setProperty(httpd.CONNECTION_PROP_PROXY_RESPOND_CODE,"TCP_MISS");
} else if (sizeBeforeDelete == cacheArray.length) {
// before we came here we deleted a cache entry
cacheArray = null;
cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_BAD;
cacheManager.stackProcess(cacheEntry); // unnecessary update
conProp.setProperty(httpd.CONNECTION_PROP_PROXY_RESPOND_CODE,"TCP_REF_FAIL_HIT");
} else {
// before we came here we deleted a cache entry
cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_GOOD;
cacheManager.stackProcess(cacheEntry, cacheArray); // necessary update, write response header to cache
conProp.setProperty(httpd.CONNECTION_PROP_PROXY_RESPOND_CODE,"TCP_REFRESH_MISS");
}
// request has been placed and result has been returned. work off response
String[] resStatus = res.status.split(" ");
// sending the respond header back to the client
httpd.sendRespondHeader(
conProp,
respond,
httpVer,
Integer.parseInt((resStatus.length > 0) ? resStatus[0]:"503"),
(resStatus.length > 1) ? resStatus[1] : null,
res.responseHeader);
String storeError;
if ((storeError = cacheEntry.shallStoreCache()) == null) {
// we write a new cache entry
if ((contentLength > 0) && (contentLength < 1048576)) // if the length is known and < 1 MB
{
// ok, we don't write actually into a file, only to RAM, and schedule writing the file.
byte[] cacheArray = res.writeContent(hfos);
this.theLogger.logDebug("writeContent of " + url + " produced cacheArray = " + ((cacheArray == null) ? "null" : ("size=" + cacheArray.length)));
if (hfos instanceof htmlFilterOutputStream) ((htmlFilterOutputStream) hfos).finalize();
if (sizeBeforeDelete == -1) {
// totally fresh file
cacheEntry.status = plasmaHTCache.CACHE_FILL; // it's an insert
cacheManager.stackProcess(cacheEntry, cacheArray);
conProp.setProperty(httpd.CONNECTION_PROP_PROXY_RESPOND_CODE,"TCP_MISS");
} else if (sizeBeforeDelete == cacheArray.length) {
// before we came here we deleted a cache entry
cacheArray = null;
cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_BAD;
cacheManager.stackProcess(cacheEntry); // unnecessary update
conProp.setProperty(httpd.CONNECTION_PROP_PROXY_RESPOND_CODE,"TCP_REF_FAIL_HIT");
} else {
// the file is too big to cache it in the ram, or the size is unknown
// write to file right here.
cacheFile.getParentFile().mkdirs();
res.writeContent(hfos, cacheFile);
if (hfos instanceof htmlFilterOutputStream) ((htmlFilterOutputStream) hfos).finalize();
this.theLogger.logDebug("for write-file of " + url + ": contentLength = " + contentLength + ", sizeBeforeDelete = " + sizeBeforeDelete);
if (sizeBeforeDelete == -1) {
// totally fresh file
cacheEntry.status = plasmaHTCache.CACHE_FILL; // it's an insert
cacheManager.stackProcess(cacheEntry);
} else if (sizeBeforeDelete == cacheFile.length()) {
// before we came here we deleted a cache entry
cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_BAD;
cacheManager.stackProcess(cacheEntry); // unnecessary update
} else {
// before we came here we deleted a cache entry
cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_GOOD;
cacheManager.stackProcess(cacheEntry); // necessary update, write response header to cache
}
// beware! all these writings will not fill the cacheEntry.cacheArray
// that means they are not available for the indexer (except they are scraped before)
}
// before we came here we deleted a cache entry
cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_GOOD;
cacheManager.stackProcess(cacheEntry, cacheArray); // necessary update, write response header to cache
conProp.setProperty(httpd.CONNECTION_PROP_PROXY_RESPOND_CODE,"TCP_REFRESH_MISS");
}
} else {
// no caching
this.theLogger.logDebug(cacheFile.toString() + " not cached: " + storeError);
res.writeContent(hfos, null);
// the file is too big to cache it in the ram, or the size is unknown
// write to file right here.
cacheFile.getParentFile().mkdirs();
res.writeContent(hfos, cacheFile);
if (hfos instanceof htmlFilterOutputStream) ((htmlFilterOutputStream) hfos).finalize();
this.theLogger.logDebug("for write-file of " + url + ": contentLength = " + contentLength + ", sizeBeforeDelete = " + sizeBeforeDelete);
if (sizeBeforeDelete == -1) {
// no old file and no load. just data passing
cacheEntry.status = plasmaHTCache.CACHE_PASSING;
// totally fresh file
cacheEntry.status = plasmaHTCache.CACHE_FILL; // it's an insert
cacheManager.stackProcess(cacheEntry);
} else if (sizeBeforeDelete == cacheFile.length()) {
// before we came here we deleted a cache entry
cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_BAD;
cacheManager.stackProcess(cacheEntry); // unnecessary update
} else {
// before we came here we deleted a cache entry
cacheEntry.status = plasmaHTCache.CACHE_STALE_NO_RELOAD;
cacheManager.stackProcess(cacheEntry);
cacheEntry.status = plasmaHTCache.CACHE_STALE_RELOAD_GOOD;
cacheManager.stackProcess(cacheEntry); // necessary update, write response header to cache
}
// beware! all these writings will not fill the cacheEntry.cacheArray
// that means they are not available for the indexer (except they are scraped before)
}
if (gzippedOut != null) {
gzippedOut.finish();
}
if (chunkedOut != null) {
chunkedOut.finish();
chunkedOut.flush();
}
} catch (SocketException e) {
// this may happen if the client suddenly closes its connection
// maybe the user has stopped loading
// in that case, we are not responsible and just forget it
// but we clean the cache also, since it may be only partial
// and most possible corrupted
if (cacheFile.exists()) cacheFile.delete();
if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) {
httpd.sendRespondError(conProp,respond,4,404,null,"client unexpectedly closed connection",e);
} else {
conProp.put(httpd.CONNECTION_PROP_PERSISTENT,"close");
}
} catch (IOException e) {
// can have various reasons
if (cacheFile.exists()) cacheFile.delete();
if (e.getMessage().indexOf("Corrupt GZIP trailer") >= 0) {
// just do nothing, we leave it this way
this.theLogger.logDebug("ignoring bad gzip trail for URL " + url + " (" + e.getMessage() + ")",e);
conProp.put(httpd.CONNECTION_PROP_PERSISTENT,"close");
} else {
// no caching
this.theLogger.logDebug(cacheFile.toString() + " not cached: " + storeError);
res.writeContent(hfos, null);
if (hfos instanceof htmlFilterOutputStream) ((htmlFilterOutputStream) hfos).finalize();
if (sizeBeforeDelete == -1) {
// no old file and no load. just data passing
cacheEntry.status = plasmaHTCache.CACHE_PASSING;
cacheManager.stackProcess(cacheEntry);
} else {
if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) {
httpd.sendRespondError(conProp,respond,4,404,null,"client unexpectedly closed connection",e);
this.theLogger.logDebug("IOError for URL " + url + " (" + e.getMessage() + ") - responded 404",e);
} else {
conProp.put(httpd.CONNECTION_PROP_PERSISTENT,"close");
}
// before we came here we deleted a cache entry
cacheEntry.status = plasmaHTCache.CACHE_STALE_NO_RELOAD;
cacheManager.stackProcess(cacheEntry);
}
}
if (gzippedOut != null) {
gzippedOut.finish();
}
if (chunkedOut != null) {
chunkedOut.finish();
chunkedOut.flush();
}
} catch (Exception e) {
// this may happen if the targeted host does not exist or anything with the
// remote server was wrong.
// in any case, sending a 404 is appropriate
try {
if ((e.toString().indexOf("unknown host")) > 0) {
if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) {
httpd.sendRespondError(conProp,respond,4,404,null,"unknown host",e);
} else {
conProp.put(httpd.CONNECTION_PROP_PERSISTENT,"close");
}
// deleting cached content
if (cacheFile.exists()) cacheFile.delete();
// doing some errorhandling ...
int httpStatusCode = 404;
String httpStatusText = null;
String errorMessage = null;
Exception errorExc = e;
if (e instanceof ConnectException) {
httpStatusCode = 403; httpStatusText = "Connection refused";
errorMessage = "Connection refused by destination host";
} else if (e instanceof BindException) {
errorMessage = "Unable to establish a connection to the destination host";
} else if (e instanceof NoRouteToHostException) {
errorMessage = "No route to destination host";
} else if (e instanceof UnknownHostException) {
errorMessage = "IP address of the destination host could not be determined";
} else {
if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) {
httpd.sendRespondError(conProp,respond,4,404,null,"Not Found",e);
if (e.getMessage().indexOf("Corrupt GZIP trailer") >= 0) {
// just do nothing, we leave it this way
this.theLogger.logDebug("ignoring bad gzip trail for URL " + url + " (" + e.getMessage() + ")",e);
this.forceConnectionClose();
} else if ((remote != null)&&(remote.isClosed())) {
errorMessage = "destination host unexpectedly closed connection";
} else {
conProp.put(httpd.CONNECTION_PROP_PERSISTENT,"close");
errorMessage = "Unexpected Error. " + e.getClass().getName() + ": " + e.getMessage();
}
}
// sending back an error message to the client
if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) {
httpd.sendRespondError(conProp,respond,4,httpStatusCode,httpStatusText,errorMessage,errorExc);
} else {
this.forceConnectionClose();
}
} catch (Exception ee) {
conProp.put(httpd.CONNECTION_PROP_PERSISTENT,"close");
this.forceConnectionClose();
}
} finally {
if (remote != null) httpc.returnInstance(remote);
@ -850,6 +861,12 @@ public final class httpdProxyHandler extends httpdAbstractHandler implements htt
headers.remove(httpHeader.X_CACHE_LOOKUP);
}
private void forceConnectionClose() {
if (this.currentConProp != null) {
this.currentConProp.setProperty(httpd.CONNECTION_PROP_PERSISTENT,"close");
}
}
public void doHead(Properties conProp, httpHeader requestHeader, OutputStream respond) throws IOException {
this.currentConProp = conProp;
@ -898,25 +915,32 @@ public final class httpdProxyHandler extends httpdAbstractHandler implements htt
httpc.response res = null;
try {
// open the connection
if (yAddress == null) {
remote = newhttpc(host, port, timeout);
} else {
remote = newhttpc(yAddress, timeout); // with [AS] patch
}
// open the connection: second is needed for [AS] patch
remote = (yAddress == null) ? newhttpc(host, port, timeout): newhttpc(yAddress, timeout);
// sending the http-HEAD request to the server
res = remote.HEAD(remotePath, requestHeader);
// removing hop by hop headers
this.removeHopByHopHeaders(res.responseHeader);
// sending the server respond back to the client
httpd.sendRespondHeader(conProp,respond,httpVer,Integer.parseInt(res.status.split(" ")[0]),res.responseHeader);
//respondHeader(respond, res.status, res.responseHeader);
} catch (Exception e) {
try {
if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) {
httpd.sendRespondError(conProp,respond,4,404,null,"Not Found",e);
String exTxt = e.getMessage();
if ((exTxt!=null)&&(exTxt.startsWith("Socket closed"))) {
this.forceConnectionClose();
} else if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) {
String errorMsg = "Unexpected Error. " + e.getClass().getName() + ": " + e.getMessage();
httpd.sendRespondError(conProp,respond,4,503,null,errorMsg,e);
this.theLogger.logError(errorMsg);
} else {
conProp.setProperty(httpd.CONNECTION_PROP_PERSISTENT,"close");
this.forceConnectionClose();
}
} catch (Exception ee) {
conProp.setProperty(httpd.CONNECTION_PROP_PERSISTENT,"close");
}
this.forceConnectionClose();
}
} finally {
if (remote != null) httpc.returnInstance(remote);
}
@ -988,24 +1012,25 @@ public final class httpdProxyHandler extends httpdAbstractHandler implements htt
if (chunked != null) chunked.finish();
remote.close();
} catch (Exception e) {
try {
if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) {
httpd.sendRespondError(conProp,respond,4,404,null,"Not Found",e);
} else {
conProp.setProperty(httpd.CONNECTION_PROP_PERSISTENT,"close");
}
} catch (Exception ee) {
conProp.setProperty(httpd.CONNECTION_PROP_PERSISTENT,"close");
}
} finally {
if (remote != null) httpc.returnInstance(remote);
}
respond.flush();
} catch (Exception e) {
String errorMsg = "Unexpected Error. " + e.getClass().getName() + ": " + e.getMessage();
System.err.println("PROXY: " + errorMsg);
this.theLogger.logError(errorMsg);
try {
String exTxt = e.getMessage();
if ((exTxt!=null)&&(exTxt.startsWith("Socket closed"))) {
this.forceConnectionClose();
} else if (!conProp.containsKey(httpd.CONNECTION_PROP_PROXY_RESPOND_HEADER)) {
String errorMsg = "Unexpected Error. " + e.getClass().getName() + ": " + e.getMessage();
httpd.sendRespondError(conProp,respond,4,503,null,errorMsg,e);
this.theLogger.logError(errorMsg);
} else {
this.forceConnectionClose();
}
} catch (Exception ee) {
this.forceConnectionClose();
}
} finally {
respond.flush();
if (respond instanceof httpdByteCountOutputStream) ((httpdByteCountOutputStream)respond).finish();

@ -78,6 +78,7 @@ import org.apache.commons.pool.impl.GenericObjectPool.Config;
import de.anomic.http.httpd;
import de.anomic.server.logging.serverLog;
import de.anomic.yacy.yacyCore;
public final class serverCore extends serverAbstractThread implements serverThread {
@ -232,7 +233,10 @@ public final class serverCore extends serverAbstractThread implements serverThre
localPort.intValue());
serverCore.portForwarding.connect();
serverCore.portForwardingEnabled = true;
yacyCore.seedDB.mySeed.put("IP",publicIP().getHostAddress());
yacyCore.seedDB.mySeed.put("Port",Integer.toString(serverCore.portForwarding.getPort()));
this.log.logInfo("Remote port forwarding connection established: " + portFwHost+":"+portFwPort+" -> "+localHost+":"+localPort);
} catch (Exception e) {
@ -402,31 +406,39 @@ public final class serverCore extends serverAbstractThread implements serverThre
connection.execute(controlSocket,this.timeout);
//log.logDebug("* NEW SESSION: " + connection.request + " from " + clientIP);
} else {
System.out.println("ACCESS FROM " + cIP + " DENIED");
this.log.logWarning("ACCESS FROM " + cIP + " DENIED");
}
return true;
}
public void close() {
try {
// consuming the isInterrupted Flag. Otherwise we could not properly close the session pool
Thread.interrupted();
// closing the port forwarding channel
if ((portForwardingEnabled) && (portForwarding != null) ) {
// consuming the isInterrupted Flag. Otherwise we could not properly close the session pool
Thread.interrupted();
// closing the port forwarding channel
if ((portForwardingEnabled) && (portForwarding != null) ) {
try {
portForwarding.disconnect();
} catch (Exception e) {
this.log.logWarning("Unable to shutdown the port forwarding channel.");
}
// close the session pool
this.theSessionPool.close();
// closing the serverchannel and socket
this.socket.close();
}
catch (Exception e) {
this.log.logSystem("Unable to close session pool: " + e.getMessage());
// close the session pool
try {
this.theSessionPool.close();
} catch (Exception e) {
this.log.logWarning("Unable to close the session pool.");
}
// closing the serverchannel and socket
try {
this.socket.close();
} catch (Exception e) {
this.log.logWarning("Unable to close the server socket.");
}
this.log.logSystem("* terminated");
}
@ -504,7 +516,7 @@ public final class serverCore extends serverAbstractThread implements serverThre
if (currentSession.isAlive()) {
if ((currentSession.controlSocket != null)&&(currentSession.controlSocket.isConnected())) {
currentSession.controlSocket.close();
serverCore.this.log.logInfo("Closing socket of thread " + currentSession.getName());
serverCore.this.log.logInfo("Closing socket of thread '" + currentSession.getName() + "'");
}
}
}

@ -122,11 +122,12 @@ public class yacyClient {
Date remoteTime = yacyCore.parseUniversalDate((String) result.get("mytime")); // read remote time
// check consistency with expectation
yacySeed otherPeer = null;
float otherPeerVersion = 0;
if ((otherHash != null ) && (otherHash.length() > 0)) {
yacySeed otherPeer = yacySeed.genRemoteSeed((String) result.get("seed0"), key, remoteTime);
otherPeer = yacySeed.genRemoteSeed((String) result.get("seed0"), key, remoteTime);
if ((otherPeer == null) || (!(otherPeer.hash.equals(otherHash)))) {
yacyCore.log.logDebug("yacyClient.publishMySeed consistency error: other peer wrong");
yacyCore.log.logDebug("yacyClient.publishMySeed consistency error: other peer '" + ((otherPeer==null)?"unknown":otherPeer.getName()) + "' wrong");
return -1; // no success
}
otherPeerVersion = otherPeer.getVersion();
@ -138,6 +139,8 @@ public class yacyClient {
// we overwrite our own IP number only, if we do not portForwarding
if (!serverCore.portForwardingEnabled) {
yacyCore.seedDB.mySeed.put("IP", (String) result.get("yourip"));
} else {
yacyCore.seedDB.mySeed.put("IP", serverCore.publicIP().getHostAddress());
}
/* If we have port forwarding enabled but the other peer uses a too old yacy version
@ -151,6 +154,10 @@ public class yacyClient {
String mytype = (String) result.get("yourtype");
if (mytype == null) mytype = "junior";
if ((yacyCore.seedDB.mySeed.get("PeerType", "junior").equals("principal")) && (mytype.equals("senior"))) mytype = "principal";
if (mytype.equalsIgnoreCase("junior")) {
yacyCore.log.logInfo("yacyClient.publishMySeed: Peer '" + ((otherPeer==null)?"unknown":otherPeer.getName()) + "' reported us as junior.");
}
yacyCore.seedDB.mySeed.put("PeerType", mytype);
}

@ -62,9 +62,13 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.List;
import java.util.TimeZone;
import java.util.Vector;
@ -72,11 +76,14 @@ import de.anomic.http.httpc;
import de.anomic.net.natLib;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.logging.serverLog;
import de.anomic.server.serverCore;
import de.anomic.server.serverSemaphore;
import de.anomic.server.serverSwitch;
public class yacyCore {
// statics
public static ThreadGroup publishThreadGroup = new ThreadGroup("publishThreadGroup");
public static long startupTime = System.currentTimeMillis();
public static yacySeedDB seedDB = null;
public static final Hashtable seedUploadMethods = new Hashtable();
@ -289,118 +296,191 @@ public class yacyCore {
public int added;
public yacySeed seed;
public Exception error;
private final serverSemaphore sync;
private final List syncList;
public publishThread(yacySeed seed) {
super("PublishSeed_" + seed.getName());
public publishThread(ThreadGroup tg, yacySeed seed, serverSemaphore sync, List syncList) throws InterruptedException {
super(tg, "PublishSeed_" + seed.getName());
this.sync = sync;
this.sync.P();
this.syncList = syncList;
this.seed = seed;
this.added = 0;
this.error = null;
}
public void run() {
try {
added = yacyClient.publishMySeed(seed.getAddress(), seed.hash);
if (added < 0) {
try {
this.added = yacyClient.publishMySeed(seed.getAddress(), seed.hash);
if (this.added < 0) {
// no or wrong response, delete that address
log.logInfo("publish: disconnected " + seed.get("PeerType", "senior") + " peer '" + seed.getName() + "' from " + seed.getAddress());
peerActions.peerDeparture(seed);
log.logInfo("publish: disconnected " + this.seed.get("PeerType", "senior") + " peer '" + this.seed.getName() + "' from " + this.seed.getAddress());
peerActions.peerDeparture(this.seed);
} else {
// success! we have published our peer to a senior peer
// update latest news from the other peer
log.logInfo("publish: handshaked " + seed.get("PeerType", "senior") + " peer '" + seed.getName() + "' at " + seed.getAddress());
log.logInfo("publish: handshaked " + this.seed.get("PeerType", "senior") + " peer '" + this.seed.getName() + "' at " + this.seed.getAddress());
}
} catch (Exception e) {
error = e;
this.error = e;
} finally {
this.syncList.add(this);
this.sync.V();
}
}
}
private int publishMySeed(boolean force) {
// call this after the httpd was started up
// we need to find out our own ip
// This is not always easy, since the application may
// live behind a firewall or nat.
// the normal way to do this is either measure the value that java gives us,
// but this is not correct if the peer lives behind a NAT/Router or has several
// addresses and not the right one can be found out.
// We have several alternatives:
// 1. ask another peer. This should be normal and the default method.
// but if no other peer lives, or we don't know them, we cannot do that
// 2. ask own NAT. This is only an option if the NAT is a DI604, because this is the
// only supported for address retrieval
// 3. ask ip respond services in the internet. There are several, and they are all
// probed until we get a valid response.
// init yacyHello-process
String address;
int added;
yacySeed[] seeds;
int attempts = seedDB.sizeConnected(); if (attempts > 10) attempts = 10;
if (seedDB.mySeed.get("PeerType", "virgin").equals("virgin")) {
seeds = seedDB.seedsByAge(true, attempts); // best for fast connection
} else {
seeds = seedDB.seedsByAge(false, attempts); // best for seed list maintenance/cleaning
}
if (seeds == null) return 0;
Vector v = new Vector(); // memory for threads
publishThread t;
for (int i = 0; i < seeds.length; i++) {
if (seeds[i] == null) continue;
log.logDebug("HELLO #" + i + " to peer " + seeds[i].get("Name", "")); // debug
address = seeds[i].getAddress();
if ((address == null) || (!(seeds[i].isProper()))) {
// we don't like that address, delete it
peerActions.peerDeparture(seeds[i]);
} else {
// ask senior peer
t = new publishThread(seeds[i]);
v.add(t);
t.start();
}
// wait
try {
if (i == 0) Thread.currentThread().sleep(2000); // after the first time wait some seconds
Thread.currentThread().sleep(1000 + 500 * v.size()); // wait a while
} catch (InterruptedException e) {}
try {
// call this after the httpd was started up
// we need to find out our own ip
// This is not always easy, since the application may
// live behind a firewall or nat.
// the normal way to do this is either measure the value that java gives us,
// but this is not correct if the peer lives behind a NAT/Router or has several
// addresses and not the right one can be found out.
// We have several alternatives:
// 1. ask another peer. This should be normal and the default method.
// but if no other peer lives, or we don't know them, we cannot do that
// 2. ask own NAT. This is only an option if the NAT is a DI604, because this is the
// only supported for address retrieval
// 3. ask ip respond services in the internet. There are several, and they are all
// probed until we get a valid response.
// init yacyHello-process
yacySeed[] seeds;
// check all threads
for (int j = 0; j < v.size(); j++) {
t = (publishThread) v.elementAt(j);
added = t.added;
if (!(t.isAlive())) {
//log.logDebug("PEER " + seeds[j].get("Name", "") + " request terminated"); // debug
if (added >= 0) {
// success! we have published our peer to a senior peer
// update latest news from the other peer
//log.logInfo("publish: handshaked " + t.seed.get("PeerType", "senior") + " peer '" + t.seed.getName() + "' at " + t.seed.getAddress());
peerActions.saveMySeed();
return added;
int attempts = seedDB.sizeConnected();
if (attempts > 10) attempts = 10;
// getting a list of peers to contact
if (seedDB.mySeed.get("PeerType", "virgin").equals("virgin")) {
seeds = seedDB.seedsByAge(true, attempts); // best for fast connection
} else {
seeds = seedDB.seedsByAge(false, attempts); // best for seed list maintenance/cleaning
}
if (seeds == null) return 0;
// holding a reference to all started threads
int contactedSeedCount = 0;
List syncList = Collections.synchronizedList(new LinkedList()); // memory for threads
serverSemaphore sync = new serverSemaphore(attempts);
// going through the peer list and starting a new publisher thread for each peer
for (int i = 0; i < seeds.length; i++) {
if (seeds[i] == null) continue;
String address = seeds[i].getAddress();
log.logDebug("HELLO #" + i + " to peer '" + seeds[i].get("Name", "") + "' at " + address); // debug
if ((address == null) || (!(seeds[i].isProper()))) {
// we don't like that address, delete it
peerActions.peerDeparture(seeds[i]);
sync.V();
} else {
// starting a new publisher thread
contactedSeedCount++;
(new publishThread(yacyCore.publishThreadGroup,seeds[i],sync,syncList)).start();
}
}
// receiving the result of all started publisher threads
int newSeeds = -1;
for (int j=0; j < contactedSeedCount; j++) {
// waiting for the next thread to finish
sync.P();
// if this is true something is wrong ...
if (syncList.isEmpty()) return 0;
// getting a reference to the finished thread
publishThread t = (publishThread) syncList.remove(0);
// getting the amount of new reported seeds
if (t.added >= 0) {
if (newSeeds==-1) newSeeds = t.added;
else newSeeds += t.added;
}
}
if (newSeeds >= 0) {
// success! we have published our peer to a senior peer
// update latest news from the other peer
//log.logInfo("publish: handshaked " + t.seed.get("PeerType", "senior") + " peer '" + t.seed.getName() + "' at " + t.seed.getAddress());
peerActions.saveMySeed();
return newSeeds;
}
// // wait
// try {
// if (i == 0) Thread.currentThread().sleep(2000); // after the first time wait some seconds
// Thread.currentThread().sleep(1000 + 500 * v.size()); // wait a while
// } catch (InterruptedException e) {}
//
// // check all threads
// for (int j = 0; j < v.size(); j++) {
// t = (publishThread) v.elementAt(j);
// added = t.added;
// if (!(t.isAlive())) {
// //log.logDebug("PEER " + seeds[j].get("Name", "") + " request terminated"); // debug
// if (added >= 0) {
// // success! we have published our peer to a senior peer
// // update latest news from the other peer
// //log.logInfo("publish: handshaked " + t.seed.get("PeerType", "senior") + " peer '" + t.seed.getName() + "' at " + t.seed.getAddress());
// peerActions.saveMySeed();
// return added;
// }
// }
// }
// if we have an address, we do nothing
if ((seedDB.mySeed.isProper()) && (!(force))) return 0;
// still no success: ask own NAT or internet responder
boolean DI604use = switchboard.getConfig("DI604use", "false").equals("true");
String DI604pw = switchboard.getConfig("DI604pw", "");
String ip = switchboard.getConfig("staticIP", "");
if(ip.equals("")){
ip = natLib.retrieveIP(DI604use, DI604pw, (switchboard.getConfig("yacyDebugMode", "false")=="false" ? false : true));
}
//System.out.println("DEBUG: new IP=" + ip);
seedDB.mySeed.put("IP", ip);
if (seedDB.mySeed.get("PeerType", "junior").equals("junior")) // ???????????????
seedDB.mySeed.put("PeerType", "senior"); // to start bootstraping, we need to be recognised as "senior" peer
log.logInfo("publish: no recipient found, asked NAT or responder; our address is " +
((seedDB.mySeed.getAddress() == null) ? "unknown" : seedDB.mySeed.getAddress()));
peerActions.saveMySeed();
return 0;
} catch (InterruptedException e) {
log.logInfo("publish: Interruption detected while publishing my seed.");
// interrupt all already started publishThreads
log.logInfo("publish: Trying to shutdown all remaining publishing threads ...");
yacyCore.publishThreadGroup.interrupt();
// waiting some time for the publishThreads to finish handshake
int threadCount = yacyCore.publishThreadGroup.activeCount();
Thread[] threadList = new Thread[threadCount];
threadCount = yacyCore.publishThreadGroup.enumerate(threadList);
try {
// we need to use a timeout here because of missing interruptable session threads ...
for ( int currentThreadIdx = 0; currentThreadIdx < threadCount; currentThreadIdx++ ) {
if (threadList[currentThreadIdx].isAlive()) {
log.logInfo("publish: Waiting for remaining publishing thread '" + threadList[currentThreadIdx].getName() + "' to finish shutdown");
threadList[currentThreadIdx].join(500);
}
}
}
}
// if we have an address, we do nothing
if ((seedDB.mySeed.isProper()) && (!(force))) return 0;
// still no success: ask own NAT or internet responder
boolean DI604use = switchboard.getConfig("DI604use", "false").equals("true");
String DI604pw = switchboard.getConfig("DI604pw", "");
String ip = switchboard.getConfig("staticIP", "");
if(ip.equals("")){
ip = natLib.retrieveIP(DI604use, DI604pw, (switchboard.getConfig("yacyDebugMode", "false")=="false" ? false : true));
}
//System.out.println("DEBUG: new IP=" + ip);
seedDB.mySeed.put("IP", ip);
if (seedDB.mySeed.get("PeerType", "junior").equals("junior")) // ???????????????
seedDB.mySeed.put("PeerType", "senior"); // to start bootstraping, we need to be recognised as "senior" peer
log.logInfo("publish: no recipient found, asked NAT or responder; our address is " +
((seedDB.mySeed.getAddress() == null) ? "unknown" : seedDB.mySeed.getAddress()));
peerActions.saveMySeed();
return 0;
catch (InterruptedException ee) {
log.logWarning("Interruption while trying to shutdown all remaining publishing threads.");
}
return 0;
}
}
public static Hashtable getSeedUploadMethods() {
@ -459,7 +539,8 @@ public class yacyCore {
String[] neededLibx = ((yacySeedUploader)theUploader).getLibxDependences();
if (neededLibx != null) {
for (int libxId=0; libxId < neededLibx.length; libxId++) {
if (javaClassPath.indexOf(neededLibx[libxId]) == -1) continue;
if (javaClassPath.indexOf(neededLibx[libxId]) == -1)
throw new Exception("Missing dependency");
}
}
availableUploaders.put(className.substring("yacySeedUpload".length()),fullClassName);
@ -519,7 +600,8 @@ public class yacyCore {
sb.setConfig("seedUploadMethod",seedUploadMethod);
} else if (
(seedUploadMethod.equalsIgnoreCase("File")) ||
(sb.getConfig("seedFilePath", "").length() > 0)
((seedUploadMethod.equals("")) &&
(sb.getConfig("seedFilePath", "").length() > 0))
) {
seedUploadMethod = "File";
sb.setConfig("seedUploadMethod",seedUploadMethod);

@ -92,12 +92,12 @@ public class yacySeedDB {
public yacySeedDB(plasmaSwitchboard sb,
File seedActiveDBFile,
File seedPassiveDBFile,
File seedPotentialDBFile,
int bufferkb) throws IOException {
this.seedActiveDBFile = seedActiveDBFile;
File seedActiveDBFile,
File seedPassiveDBFile,
File seedPotentialDBFile,
int bufferkb) throws IOException {
this.seedActiveDBFile = seedActiveDBFile;
this.seedPassiveDBFile = seedPassiveDBFile;
this.seedPotentialDBFile = seedPotentialDBFile;
this.mySeed = null; // my own seed
@ -110,33 +110,34 @@ public class yacySeedDB {
// create or init own seed
myOwnSeedFile = new File(sb.getRootPath(), sb.getConfig("yacyOwnSeedFile", "mySeed.txt"));
if (myOwnSeedFile.exists() && (myOwnSeedFile.length() > 0)) {
// load existing identity
mySeed = yacySeed.load(myOwnSeedFile);
} else {
// create new identity
mySeed = yacySeed.genLocalSeed(sb);
// save of for later use
mySeed.save(myOwnSeedFile); // in a file
//writeMap(mySeed.hash, mySeed.dna, "new"); // in a database
}
if (myOwnSeedFile.exists() && (myOwnSeedFile.length() > 0)) {
// load existing identity
mySeed = yacySeed.load(myOwnSeedFile);
} else {
// create new identity
mySeed = yacySeed.genLocalSeed(sb);
// save of for later use
mySeed.save(myOwnSeedFile); // in a file
//writeMap(mySeed.hash, mySeed.dna, "new"); // in a database
}
if (sb.getConfig("portForwardingEnabled","false").equalsIgnoreCase("true")) {
mySeed.put("Port", sb.getConfig("portForwardingPort","8080"));
mySeed.put("IP", sb.getConfig("portForwardingHost","localhost"));
} else {
mySeed.put("IP", ""); // we delete the old information to see what we have now
mySeed.put("Port", sb.getConfig("port", "8080")); // set my seed's correct port number
}
mySeed.put("PeerType", "virgin"); // markup startup condition
mySeed.put("IP", ""); // we delete the old information to see what we have now
if ((serverCore.portForwardingEnabled) && (serverCore.portForwarding != null)) {
mySeed.put("Port", Integer.toString(serverCore.portForwarding.getPort()));
} else {
mySeed.put("Port", sb.getConfig("port", "8080")); // set my seed's correct port number
}
mySeed.put("PeerType", "virgin"); // markup startup condition
// start our virtual DNS service for yacy peers with empty cache
nameLookupCache = new Hashtable();
// check if we are in the seedCaches: this can happen if someone else published our seed
removeMySeed();
// set up seed queue (for probing candidates)
seedQueue = null;
// set up seed queue (for probing candidates)
seedQueue = null;
}
public synchronized void removeMySeed() {

Loading…
Cancel
Save