Use a common pooled http connection manager for remote solr instances

For a better control on the maximum simultaneous outgoing http
connections, as already done for any other http connections (crawls, rwi
search, p2p protocol) using the net.yacy.cora.protocol.http.HTTPClient
pull/137/merge
luccioman 7 years ago
parent d28f9ba0f6
commit ee6670fb8f

@ -43,6 +43,7 @@ import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.AuthCache; import org.apache.http.client.AuthCache;
import org.apache.http.client.HttpClient; import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.GzipDecompressingEntity; import org.apache.http.client.entity.GzipDecompressingEntity;
import org.apache.http.client.protocol.ClientContext;
import org.apache.http.conn.scheme.PlainSocketFactory; import org.apache.http.conn.scheme.PlainSocketFactory;
import org.apache.http.conn.scheme.Scheme; import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry; import org.apache.http.conn.scheme.SchemeRegistry;
@ -55,6 +56,8 @@ import org.apache.http.protocol.HttpContext;
import org.apache.http.ssl.SSLContextBuilder; import org.apache.http.ssl.SSLContextBuilder;
import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.common.params.ModifiableSolrParams;
import net.yacy.cora.document.id.MultiProtocolURL; import net.yacy.cora.document.id.MultiProtocolURL;
import net.yacy.cora.protocol.HeaderFramework; import net.yacy.cora.protocol.HeaderFramework;
@ -64,8 +67,17 @@ import net.yacy.kelondro.util.MemoryControl;
import net.yacy.search.schema.CollectionSchema; import net.yacy.search.schema.CollectionSchema;
import net.yacy.search.schema.WebgraphSchema; import net.yacy.search.schema.WebgraphSchema;
/**
* Handle access to a remote Solr instance.
*/
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
public class RemoteInstance implements SolrInstance { public class RemoteInstance implements SolrInstance {
/** The connection manager used to handle the common HTTP connections pool. */
private static final org.apache.http.impl.conn.PoolingClientConnectionManager CONNECTION_MANAGER = buildConnectionManager();
/** A custom scheme registry allowing https connections to servers using self-signed certificate */
private static final SchemeRegistry SCHEME_REGISTRY = buildTrustSelfSignedSchemeRegistry();
private String solrurl; private String solrurl;
private final HttpClient client; private final HttpClient client;
@ -176,52 +188,71 @@ public class RemoteInstance implements SolrInstance {
/* Here we must trust self-signed certificates as most peers with SSL enabled use such certificates */ /* Here we must trust self-signed certificates as most peers with SSL enabled use such certificates */
this.client = buildCustomHttpClient(timeout, u, solraccount, solrpw, host, true); this.client = buildCustomHttpClient(timeout, u, solraccount, solrpw, host, true);
} else { } else {
// The default HttpSolrClient will be used /* Build a http client the Solr utils as in the HttpSolrClient constructor implementation.
this.client = null; * The only difference is that the common connection manager is used and configured in the buildConnectionManager() function */
final ModifiableSolrParams params = new ModifiableSolrParams();
params.set(HttpClientUtil.PROP_FOLLOW_REDIRECTS, false);
this.client = HttpClientUtil.createClient(params, CONNECTION_MANAGER);
} }
this.defaultServer = (ConcurrentUpdateSolrClient) getServer(this.defaultCoreName); this.defaultServer = (ConcurrentUpdateSolrClient) getServer(this.defaultCoreName);
if (this.defaultServer == null) throw new IOException("cannot connect to url " + url + " and connect core " + defaultCoreName); if (this.defaultServer == null) throw new IOException("cannot connect to url " + url + " and connect core " + defaultCoreName);
} }
/**
* @return a connection manager with a HTTP connection pool
*/
private static org.apache.http.impl.conn.PoolingClientConnectionManager buildConnectionManager() {
/* Important note : use of deprecated Apache classes is required because SolrJ still use them internally (see HttpClientUtil).
* Upgrade only when Solr implementation will become compatible */
org.apache.http.impl.conn.PoolingClientConnectionManager cm;
cm = new org.apache.http.impl.conn.PoolingClientConnectionManager(); // try also: ThreadSafeClientConnManager
cm.setMaxTotal(100);
cm.setDefaultMaxPerRoute(100);
return cm;
}
/**
* @return a custom scheme registry allowing https connections to servers using
* a self-signed certificate
*/
private static SchemeRegistry buildTrustSelfSignedSchemeRegistry() {
/* Important note : use of deprecated Apache classes is required because SolrJ still use them internally (see HttpClientUtil).
* Upgrade only when Solr implementation will become compatible */
SchemeRegistry registry = null;
SSLContext sslContext;
try {
sslContext = SSLContextBuilder.create().loadTrustMaterial(TrustSelfSignedStrategy.INSTANCE).build();
registry = new SchemeRegistry();
registry.register(new Scheme("http", 80, PlainSocketFactory.getSocketFactory()));
registry.register(
new Scheme("https", 443, new SSLSocketFactory(sslContext, AllowAllHostnameVerifier.INSTANCE)));
} catch (final Exception e) {
// Should not happen
ConcurrentLog.warn("RemoteInstance",
"Error when initializing SSL context trusting self-signed certificates.", e);
registry = null;
}
return registry;
}
/** /**
* @param solraccount eventual user name used to authenticate on the target Solr * @param solraccount eventual user name used to authenticate on the target Solr
* @param solraccount eventual password used to authenticate on the target Solr * @param solraccount eventual password used to authenticate on the target Solr
* @param trustSelfSignedCertificates when true, https connections to an host rpviding a self-signed certificate are accepted * @param trustSelfSignedCertificates when true, https connections to an host providing a self-signed certificate are accepted
* @return a new apache HttpClient instance usable as a custom http client by SolrJ * @return a new apache HttpClient instance usable as a custom http client by SolrJ
*/ */
private static HttpClient buildCustomHttpClient(final int timeout, final MultiProtocolURL u, final String solraccount, final String solrpw, private static HttpClient buildCustomHttpClient(final int timeout, final MultiProtocolURL u, final String solraccount, final String solrpw,
final String host, final boolean trustSelfSignedCertificates) { final String host, final boolean trustSelfSignedCertificates) {
/* Important note : deprecated use of Apache classes is required because SolrJ still use them internally (see HttpClientUtil). /* Important note : use of deprecated Apache classes is required because SolrJ still use them internally (see HttpClientUtil).
* Upgrade only when Solr implementation will become compatible */ * Upgrade only when Solr implementation will become compatible */
org.apache.http.impl.conn.PoolingClientConnectionManager cm;
SchemeRegistry registry = null;
if(trustSelfSignedCertificates) {
SSLContext sslContext;
try {
sslContext = SSLContextBuilder.create().loadTrustMaterial(TrustSelfSignedStrategy.INSTANCE).build();
registry = new SchemeRegistry();
registry.register(
new Scheme("http", 80, PlainSocketFactory.getSocketFactory()));
registry.register(
new Scheme("https", 443, new SSLSocketFactory(sslContext, AllowAllHostnameVerifier.INSTANCE)));
} catch (final Exception e) {
// Should not happen
ConcurrentLog.warn("RemoteInstance", "Error when initializing SSL context trusting self-signed certificates.", e);
registry = null;
}
}
if(registry != null) {
cm = new org.apache.http.impl.conn.PoolingClientConnectionManager(registry);
} else {
cm = new org.apache.http.impl.conn.PoolingClientConnectionManager(); // try also: ThreadSafeClientConnManager
}
cm.setMaxTotal(100);
cm.setDefaultMaxPerRoute(100);
org.apache.http.impl.client.DefaultHttpClient result = new org.apache.http.impl.client.DefaultHttpClient(cm) { org.apache.http.impl.client.DefaultHttpClient result = new org.apache.http.impl.client.DefaultHttpClient(CONNECTION_MANAGER) {
@Override @Override
protected HttpContext createHttpContext() { protected HttpContext createHttpContext() {
HttpContext context = super.createHttpContext(); HttpContext context = super.createHttpContext();
@ -230,6 +261,9 @@ public class RemoteInstance implements SolrInstance {
HttpHost targetHost = new HttpHost(u.getHost(), u.getPort(), u.getProtocol()); HttpHost targetHost = new HttpHost(u.getHost(), u.getPort(), u.getProtocol());
authCache.put(targetHost, basicAuth); authCache.put(targetHost, basicAuth);
context.setAttribute(org.apache.http.client.protocol.HttpClientContext.AUTH_CACHE, authCache); context.setAttribute(org.apache.http.client.protocol.HttpClientContext.AUTH_CACHE, authCache);
if (trustSelfSignedCertificates && SCHEME_REGISTRY != null) {
context.setAttribute(ClientContext.SCHEME_REGISTRY, SCHEME_REGISTRY);
}
this.setHttpRequestRetryHandler(new org.apache.http.impl.client.DefaultHttpRequestRetryHandler(0, false)); // no retries needed; we expect connections to fail; therefore we should not retry this.setHttpRequestRetryHandler(new org.apache.http.impl.client.DefaultHttpRequestRetryHandler(0, false)); // no retries needed; we expect connections to fail; therefore we should not retry
return context; return context;
} }
@ -356,7 +390,7 @@ public class RemoteInstance implements SolrInstance {
} }
ConcurrentUpdateSolrClient.Builder builder = new ConcurrentUpdateSolrClient.Builder(solrServerURL); ConcurrentUpdateSolrClient.Builder builder = new ConcurrentUpdateSolrClient.Builder(solrServerURL);
builder.withHttpClient(this.client); builder.withHttpClient(this.client);
builder.withQueueSize(10); builder.withQueueSize(queueSizeByMemory());
builder.withThreadCount(Runtime.getRuntime().availableProcessors()); builder.withThreadCount(Runtime.getRuntime().availableProcessors());
s = builder.build(); s = builder.build();
} else { } else {
@ -376,10 +410,31 @@ public class RemoteInstance implements SolrInstance {
return s; return s;
} }
@Override /**
public void close() { * Closes each eventually open Solr client and its associated resources. The
if (this.client != null) ((org.apache.http.impl.client.DefaultHttpClient) this.client).getConnectionManager().shutdown(); * common connections manager is not closed here as it will be reused for other
} * RemoteInstances. The shutdown the connection manager at YaCy shutdown, use
* the {@link #closeConnectionManager()} function.
*/
@Override
public void close() {
for (final ConcurrentUpdateSolrClient solrClient : this.server.values()) {
/*
* Close every open Solr client : this is important as it shutdowns client's
* internal asynchronous tasks executor. To release the common connection
* manager, see closeConnectionManager().
*/
solrClient.close();
}
}
/**
* Shutdown the connection manager and close all its active and inactive HTTP
* connections. Must be called at the end of the application.
*/
public static void closeConnectionManager() {
CONNECTION_MANAGER.shutdown();
}
public static int queueSizeByMemory() { public static int queueSizeByMemory() {
return (int) Math.min(30, Math.max(1, MemoryControl.maxMemory() / 1024 / 1024 / 12)); return (int) Math.min(30, Math.max(1, MemoryControl.maxMemory() / 1024 / 1024 / 12));

@ -2001,6 +2001,7 @@ public final class Switchboard extends serverSwitch {
} catch (final InterruptedException e ) { } catch (final InterruptedException e ) {
ConcurrentLog.logException(e); ConcurrentLog.logException(e);
} }
RemoteInstance.closeConnectionManager();
this.log.config("SWITCHBOARD SHUTDOWN TERMINATED"); this.log.config("SWITCHBOARD SHUTDOWN TERMINATED");
/* Print also to the standard output : when this method is triggered by the shutdown hook thread, the LogManager is likely to have /* Print also to the standard output : when this method is triggered by the shutdown hook thread, the LogManager is likely to have
* been concurrently reset by its own shutdown hook thread */ * been concurrently reset by its own shutdown hook thread */

@ -56,6 +56,7 @@ import net.yacy.cora.date.GenericFormatter;
import net.yacy.cora.document.encoding.UTF8; import net.yacy.cora.document.encoding.UTF8;
import net.yacy.cora.document.id.DigestURL; import net.yacy.cora.document.id.DigestURL;
import net.yacy.cora.document.id.MultiProtocolURL; import net.yacy.cora.document.id.MultiProtocolURL;
import net.yacy.cora.federate.solr.instance.RemoteInstance;
import net.yacy.cora.federate.yacy.CacheStrategy; import net.yacy.cora.federate.yacy.CacheStrategy;
import net.yacy.cora.order.Digest; import net.yacy.cora.order.Digest;
import net.yacy.cora.protocol.ClientIdentification; import net.yacy.cora.protocol.ClientIdentification;
@ -621,6 +622,8 @@ public final class yacy {
} catch (final InterruptedException e1) { } catch (final InterruptedException e1) {
e1.printStackTrace(); e1.printStackTrace();
} }
RemoteInstance.closeConnectionManager();
System.exit(-1); System.exit(-1);
} }
@ -632,6 +635,7 @@ public final class yacy {
} catch (final InterruptedException e1) { } catch (final InterruptedException e1) {
e1.printStackTrace(); e1.printStackTrace();
} }
RemoteInstance.closeConnectionManager();
System.exit(-1); System.exit(-1);
} }
@ -641,6 +645,7 @@ public final class yacy {
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
RemoteInstance.closeConnectionManager();
// finished // finished
ConcurrentLog.config("COMMAND-STEERING", "SUCCESSFULLY FINISHED COMMAND: " + processdescription); ConcurrentLog.config("COMMAND-STEERING", "SUCCESSFULLY FINISHED COMMAND: " + processdescription);
@ -678,6 +683,7 @@ public final class yacy {
} catch (final InterruptedException e) { } catch (final InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
RemoteInstance.closeConnectionManager();
// finished // finished
ConcurrentLog.config("COMMAND-STEERING", "SUCCESSFULLY FINISHED COMMAND: " + processdescription); ConcurrentLog.config("COMMAND-STEERING", "SUCCESSFULLY FINISHED COMMAND: " + processdescription);

Loading…
Cancel
Save