... migrating to HttpComponents-Client-4.x ...

monitoring: replaced unused 'idletime' by uploading bytes
added some kind of 'upload-throttling' at dht-out :-)

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6983 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
sixcooler 15 years ago
parent b143f6b169
commit 52718e6dcb

@ -41,7 +41,7 @@
<tr class="TableHeader" valign="bottom">
<td>Protocol</td>
<td>Duration</td>
<td>Idle Time</td>
<td>Up-Bytes</td>
<td>Dest. IP[:Port]</td>
<td>Command</td>
<td>ID</td>
@ -50,7 +50,7 @@
<tr class="TableCell#(dark)#Light::Dark#(/dark)#">
<td>#[clientProtocol]#</td>
<td>#[clientLifetime]#</td>
<td>#[clientIdletime]#</td>
<td>#[clientUpbytes]#</td>
<td>#[clientTargetHost]#</td>
<td>#[clientCommand]#</td>
<td>#[clientID]#</td>

@ -160,7 +160,7 @@ public final class Connections_p {
for (final ConnectionInfo conInfo: allConnections) {
prop.put("clientList_" + c + "_clientProtocol", conInfo.getProtocol());
prop.putNum("clientList_" + c + "_clientLifetime", conInfo.getLifetime());
prop.putNum("clientList_" + c + "_clientIdletime", conInfo.getIdletime());
prop.putNum("clientList_" + c + "_clientUpbytes", conInfo.getUpbytes());
prop.put("clientList_" + c + "_clientTargetHost", conInfo.getTargetHost());
prop.putHTML("clientList_" + c + "_clientCommand", conInfo.getCommand());
prop.put("clientList_" + c + "_clientID", conInfo.getID());

@ -2121,7 +2121,7 @@ public final class Switchboard extends serverSwitch {
return false;
}
boolean hasDoneSomething = false;
final long kbytesUp = ConnectionInfo.getActiveUpbytes() / 1024;
// accumulate RWIs to transmission cloud
if (this.dhtDispatcher.cloudSize() > this.peers.scheme.verticalPartitions() * 2) {
log.logInfo("dhtTransferJob: no selection, too many entries in transmission cloud: " + this.dhtDispatcher.cloudSize());
@ -2131,6 +2131,8 @@ public final class Switchboard extends serverSwitch {
log.logInfo("dhtTransferJob: too many connections in httpc pool : " + ConnectionInfo.getCount());
// close unused connections
// Client.cleanup();
} else if (kbytesUp > 512) {
log.logInfo("dhtTransferJob: too much upload, currently uploading: " + kbytesUp + " Kb");
} else {
byte[] startHash = null, limitHash = null;
int tries = 10;
@ -2163,6 +2165,8 @@ public final class Switchboard extends serverSwitch {
log.logInfo("dhtTransferJob: too many connections in httpc pool : " + ConnectionInfo.getCount());
// close unused connections
// Client.cleanup();
} else if (kbytesUp > 512) {
log.logInfo("dhtTransferJob: too much upload, currently uploading: " + kbytesUp + " Kb");
} else {
boolean dequeued = this.dhtDispatcher.dequeueContainer();
hasDoneSomething = hasDoneSomething | dequeued;

@ -3,15 +3,18 @@ package net.yacy.cora.protocol;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.HttpVersion;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpHead;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.conn.ClientConnectionManager;
@ -29,6 +32,7 @@ import org.apache.http.entity.mime.content.StringBody;
import org.apache.http.impl.client.AbstractHttpClient;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
import org.apache.http.message.BasicHeader;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
@ -49,6 +53,9 @@ public class Client {
private final static int maxcon = 20;
private static IdledConnectionEvictor idledConnectionEvictor = null;
private static HttpClient httpClient = null;
private Header[] headers = null;
private HttpResponse httpResponse;
private long upbytes = 0L;
private int timeout = 10000;
private String userAgent = null;
private String host = null;
@ -62,7 +69,7 @@ public class Client {
private static void initConnectionManager() {
// Create and initialize HTTP parameters
HttpParams httpParams = new BasicHttpParams();
final HttpParams httpParams = new BasicHttpParams();
/**
* ConnectionManager settings
*/
@ -71,7 +78,7 @@ public class Client {
// for statistics same value should also be set here
ConnectionInfo.setMaxcount(maxcon);
// perhaps we need more than 2(default) connections per host?
ConnPerRouteBean connPerRoute = new ConnPerRouteBean(2);
final ConnPerRouteBean connPerRoute = new ConnPerRouteBean(2);
// Increase max connections for localhost to 100
HttpHost localhost = new HttpHost("locahost");
connPerRoute.setMaxForRoute(new HttpRoute(localhost), maxcon);
@ -102,7 +109,7 @@ public class Client {
// TODO: testing noreuse - there will be HttpConnectionParams.setSoReuseaddr(HttpParams params, boolean reuseaddr) in core-4.1
// Create and initialize scheme registry
SchemeRegistry schemeRegistry = new SchemeRegistry();
final SchemeRegistry schemeRegistry = new SchemeRegistry();
schemeRegistry.register(new Scheme("http", PlainSocketFactory.getSocketFactory(), 80));
schemeRegistry.register(new Scheme("https", SSLSocketFactory.getSocketFactory(), 443));
@ -132,6 +139,18 @@ public class Client {
}
/**
*
* @param entrys to be set as request header
*/
public void setHeader(final Set<Entry<String, String>> entrys) {
int i = 0;
headers = new Header[entrys.size()];
for (final Entry<String, String> entry : entrys) {
headers[i++] = new BasicHeader(entry.getKey(),entry.getValue());
}
}
/**
*
* @param timeout in milliseconds
@ -174,10 +193,23 @@ public class Client {
* @throws IOException
*/
public byte[] GETbytes(final String uri, long maxBytes) throws IOException {
HttpGet httpGet = new HttpGet(uri);
final HttpGet httpGet = new HttpGet(uri);
return getContentBytes(httpGet, maxBytes);
}
/**
* This method gets HEAD response
*
* @param uri the url to Response from
* @return the HttpResponse
* @throws IOException
*/
public HttpResponse HEADResponse(final String uri) throws IOException {
final HttpHead httpHead = new HttpHead(uri);
getContentBytes(httpHead, Long.MAX_VALUE);
return httpResponse;
}
/**
*
* @param uri the url to post
@ -186,29 +218,40 @@ public class Client {
* @throws IOException
*/
public byte[] POSTbytes(final String uri, LinkedHashMap<String,ContentBody> parts) throws IOException {
HttpPost httpPost = new HttpPost(uri);
final HttpPost httpPost = new HttpPost(uri);
MultipartEntity multipartEntity = new MultipartEntity();
final MultipartEntity multipartEntity = new MultipartEntity();
for (Entry<String,ContentBody> part : parts.entrySet())
multipartEntity.addPart(part.getKey(), part.getValue());
// statistics
upbytes = multipartEntity.getContentLength();
httpPost.setEntity(multipartEntity);
return getContentBytes(httpPost, Long.MAX_VALUE);
}
/**
*
* @return HttpResponse from call
*/
public HttpResponse getHttpResponse() {
return httpResponse;
}
private byte[] getContentBytes(HttpUriRequest httpUriRequest, long maxBytes) throws IOException {
byte[] content = null;
final HttpContext httpContext = new BasicHttpContext();
setHeaders(httpUriRequest);
setParams(httpUriRequest.getParams());
setProxy(httpUriRequest.getParams());
// statistics
storeConnectionInfo(httpUriRequest);
try {
// execute the method
HttpResponse httpResponse = httpClient.execute(httpUriRequest, httpContext);
httpResponse = httpClient.execute(httpUriRequest, httpContext);
// get the response body
HttpEntity httpEntity = httpResponse.getEntity();
final HttpEntity httpEntity = httpResponse.getEntity();
if (httpEntity != null) {
if (httpEntity.getContentLength() < maxBytes) {
content = EntityUtils.toByteArray(httpEntity);
@ -226,6 +269,14 @@ public class Client {
return content;
}
private void setHeaders(HttpUriRequest httpUriRequest) {
if (headers != null) {
for (Header header : headers) {
httpUriRequest.addHeader(header);
}
}
}
private void setParams(HttpParams httpParams) {
HttpConnectionParams.setConnectionTimeout(httpParams, timeout);
HttpConnectionParams.setSoTimeout(httpParams, timeout);
@ -243,14 +294,15 @@ public class Client {
}
private void storeConnectionInfo(HttpUriRequest httpUriRequest) {
int port = httpUriRequest.getURI().getPort();
String thost = httpUriRequest.getURI().getHost();
final int port = httpUriRequest.getURI().getPort();
final String thost = httpUriRequest.getURI().getHost();
ConnectionInfo.addConnection(new ConnectionInfo(
httpUriRequest.getURI().getScheme(),
port == 80 ? thost : thost + ":" + port,
httpUriRequest.getMethod() + " " + httpUriRequest.getURI().getPath(),
httpUriRequest.hashCode(),
System.currentTimeMillis()));
System.currentTimeMillis(),
upbytes));
}
/**
@ -305,12 +357,23 @@ public class Client {
e.printStackTrace();
}
}
// Post some
// Head some
try {
System.out.println(new String(client.POSTbytes(url, newparts)));
} catch (IOException e1) {
e1.printStackTrace();
for (Header header: client.HEADResponse(url).getAllHeaders())
System.out.println(header.getName() + " : " + header.getValue());
System.out.println(client.getHttpResponse().getLocale());
System.out.println(client.getHttpResponse().getProtocolVersion());
System.out.println(client.getHttpResponse().getStatusLine());
} catch (IOException e) {
e.printStackTrace();
}
// Post some
// try {
// System.out.println(new String(client.POSTbytes(url, newparts)));
// } catch (IOException e1) {
// e1.printStackTrace();
// }
// Close out connection manager
try {
Client.closeConnectionManager();
} catch (InterruptedException e) {

@ -53,6 +53,7 @@ public class ConnectionInfo {
private final String command;
private final int id;
private final long initTime;
private final long upbytes;
/**
* constructor setting all data
@ -64,12 +65,13 @@ public class ConnectionInfo {
* @param initTime
*/
public ConnectionInfo(final String protocol, final String targetHost, final String command, final int id,
final long initTime) {
final long initTime, final long upbytes) {
this.protocol = protocol;
this.targetHost = targetHost;
this.command = command;
this.id = id;
this.initTime = initTime;
this.upbytes = upbytes;
}
/**
@ -87,10 +89,10 @@ public class ConnectionInfo {
}
/**
* @return dummy 0
* @return
*/
public int getIdletime() {
return 0;
public long getUpbytes() {
return upbytes;
}
/**
@ -143,6 +145,23 @@ public class ConnectionInfo {
return getCount() * 100 / maxcount;
}
/**
* removes stale connections
*/
public static long getActiveUpbytes() {
long up = 0L;
try {
synchronized (allConnections) {
for(final ConnectionInfo con: allConnections) {
up += con.getUpbytes();
}
}
} catch (final java.util.ConcurrentModificationException e) {
// there will be another try :-)
}
return up;
}
/**
* gets the max connection count of the Client connection manager
*
@ -186,14 +205,14 @@ public class ConnectionInfo {
* @param id
*/
protected static void removeConnection(final int id) {
removeConnection(new ConnectionInfo(null, null, null, id, 0));
removeConnection(new ConnectionInfo(null, null, null, id, 0, 0));
}
/**
* removes stale connections
*/
public static void cleanUp() {
try {
try {
synchronized (allConnections) {
for(final ConnectionInfo con: allConnections) {
if(con.getLifetime() > staleAfterMillis) {

@ -122,6 +122,14 @@ public class HttpConnector {
}
/**
* get data from the server named by url
*
* @param url address of the server
* @param timeout in milliseconds
* @return response body
* @throws IOException
*/
public static byte[] wget(final MultiProtocolURI url, final int timeout) throws IOException {
return wget(url.toNormalform(false, false), url.getHost(), timeout);
}

Loading…
Cancel
Save