From 7e6cdb589326e65011ce2879ee175c761fadbbbc Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Fri, 11 May 2012 11:33:05 +0200 Subject: [PATCH 1/3] moved debian dependency to openjdk-7 --- debian/control | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debian/control b/debian/control index 918477e85..1a88204d1 100644 --- a/debian/control +++ b/debian/control @@ -2,7 +2,7 @@ Source: yacy Section: network Priority: extra Maintainer: Michael Peter Christen -Build-Depends: ant, sun-java6-jdk | openjdk-6-jdk, debhelper (>= 5), m4 +Build-Depends: ant, sun-java6-jdk | openjdk-7-jdk, debhelper (>= 5), m4 Standards-Version: 3.7.2 Package: yacy From 7740c02c5622e3dd89377d4bf126a668a9bd38eb Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Sat, 12 May 2012 10:32:42 +0200 Subject: [PATCH 2/3] - enhanced the solr connector - added new multiple connector (to replace singleConnector) --- .../federated/solr/SolrConnector.java | 2 + .../federated/solr/SolrMultipleConnector.java | 123 ++++++++++++++++ .../federated/solr/SolrRetryConnector.java | 6 + .../federated/solr/SolrShardingConnector.java | 6 + .../federated/solr/SolrSingleConnector.java | 133 ++++-------------- 5 files changed, 161 insertions(+), 109 deletions(-) create mode 100644 source/net/yacy/cora/services/federated/solr/SolrMultipleConnector.java diff --git a/source/net/yacy/cora/services/federated/solr/SolrConnector.java b/source/net/yacy/cora/services/federated/solr/SolrConnector.java index 10bbcf69b..b983b405d 100644 --- a/source/net/yacy/cora/services/federated/solr/SolrConnector.java +++ b/source/net/yacy/cora/services/federated/solr/SolrConnector.java @@ -25,6 +25,7 @@ package net.yacy.cora.services.federated.solr; import java.io.IOException; +import java.util.Collection; import java.util.List; import net.yacy.kelondro.data.meta.DigestURI; @@ -71,6 +72,7 @@ public interface SolrConnector { * @throws SolrException */ public void add(final SolrDoc solrdoc) throws IOException, SolrException; + public void add(final Collection solrdocs) throws IOException, SolrException; /** * register an entry as error document diff --git a/source/net/yacy/cora/services/federated/solr/SolrMultipleConnector.java b/source/net/yacy/cora/services/federated/solr/SolrMultipleConnector.java new file mode 100644 index 000000000..e38331bac --- /dev/null +++ b/source/net/yacy/cora/services/federated/solr/SolrMultipleConnector.java @@ -0,0 +1,123 @@ +package net.yacy.cora.services.federated.solr; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; + +import net.yacy.kelondro.data.meta.DigestURI; + +import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; + +public class SolrMultipleConnector implements SolrConnector { + + private final static SolrDoc POISON_DOC = new SolrDoc(); + + private final ArrayBlockingQueue queue; + private final AddWorker[] worker; + private final SolrConnector solr; + + public SolrMultipleConnector(final String url, int connections) throws IOException { + this.solr = new SolrSingleConnector(url); + this.queue = new ArrayBlockingQueue(1000); + this.worker = new AddWorker[connections]; + for (int i = 0; i < connections; i++) { + this.worker[i] = new AddWorker(url); + this.worker[i].start(); + } + } + + private class AddWorker extends Thread { + private final SolrConnector solr; + public AddWorker(final String url) throws IOException { + this.solr = new SolrSingleConnector(url); + } + @Override + public void run() { + SolrDoc doc; + try { + while ((doc = SolrMultipleConnector.this.queue.take()) != POISON_DOC) { + try { + this.solr.add(doc); + } catch (SolrException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } catch (InterruptedException e) { + } finally { + this.solr.close(); + } + } + } + + @Override + public void close() { + for (@SuppressWarnings("unused") AddWorker element : this.worker) { + try { + this.queue.put(POISON_DOC); + } catch (InterruptedException e) { + e.printStackTrace(); + } + this.solr.close(); + } + } + + @Override + public void clear() throws IOException { + this.solr.clear(); + } + + @Override + public void delete(String id) throws IOException { + this.solr.delete(id); + } + + @Override + public void delete(List ids) throws IOException { + this.solr.delete(ids); + } + + @Override + public boolean exists(String id) throws IOException { + return this.solr.exists(id); + } + + @Override + public void add(final SolrDoc solrdoc) throws IOException, SolrException { + try { + this.queue.put(solrdoc); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + public void add(final Collection solrdocs) throws IOException, SolrException { + for (SolrDoc d: solrdocs) { + try { + this.queue.put(d); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + @Override + public void err(DigestURI digestURI, String failReason, int httpstatus) throws IOException { + this.solr.err(digestURI, failReason, httpstatus); + } + + @Override + public SolrDocumentList get(String querystring, int offset, int count) throws IOException { + return this.solr.get(querystring, offset, count); + } + + @Override + public long getSize() { + return this.solr.getSize(); + } + +} diff --git a/source/net/yacy/cora/services/federated/solr/SolrRetryConnector.java b/source/net/yacy/cora/services/federated/solr/SolrRetryConnector.java index bbb37f52d..fe9b1fce7 100644 --- a/source/net/yacy/cora/services/federated/solr/SolrRetryConnector.java +++ b/source/net/yacy/cora/services/federated/solr/SolrRetryConnector.java @@ -25,6 +25,7 @@ package net.yacy.cora.services.federated.solr; import java.io.IOException; +import java.util.Collection; import java.util.List; import net.yacy.kelondro.data.meta.DigestURI; @@ -122,6 +123,11 @@ public class SolrRetryConnector implements SolrConnector { if (ee != null) throw (ee instanceof IOException) ? (IOException) ee : new IOException(ee.getMessage()); } + @Override + public void add(final Collection solrdocs) throws IOException, SolrException { + for (SolrDoc d: solrdocs) add(d); + } + @Override public void err(final DigestURI digestURI, final String failReason, final int httpstatus) throws IOException { final long t = System.currentTimeMillis() + this.retryMaxTime; diff --git a/source/net/yacy/cora/services/federated/solr/SolrShardingConnector.java b/source/net/yacy/cora/services/federated/solr/SolrShardingConnector.java index faed823f6..c5536e531 100644 --- a/source/net/yacy/cora/services/federated/solr/SolrShardingConnector.java +++ b/source/net/yacy/cora/services/federated/solr/SolrShardingConnector.java @@ -35,6 +35,7 @@ import net.yacy.kelondro.data.meta.DigestURI; import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocumentList; +import org.apache.solr.common.SolrException; public class SolrShardingConnector implements SolrConnector { @@ -111,6 +112,11 @@ public class SolrShardingConnector implements SolrConnector { this.connectors.get(this.sharding.select(solrdoc)).add(solrdoc); } + @Override + public void add(final Collection solrdocs) throws IOException, SolrException { + for (SolrDoc d: solrdocs) this.connectors.get(this.sharding.select(d)).add(d); + } + /** * add a collection of Solr documents * @param docs diff --git a/source/net/yacy/cora/services/federated/solr/SolrSingleConnector.java b/source/net/yacy/cora/services/federated/solr/SolrSingleConnector.java index a0fe0af7a..d2f746c15 100644 --- a/source/net/yacy/cora/services/federated/solr/SolrSingleConnector.java +++ b/source/net/yacy/cora/services/federated/solr/SolrSingleConnector.java @@ -30,8 +30,6 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import net.yacy.cora.document.ASCII; import net.yacy.cora.document.MultiProtocolURI; @@ -50,6 +48,7 @@ import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.protocol.HttpContext; import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; @@ -64,26 +63,14 @@ public class SolrSingleConnector implements SolrConnector { private final int port; private HttpSolrServer server; - private final static int transmissionQueueCount = 4; // allow concurrent http sessions to solr - private final static int transmissionQueueSize = 50; // number of documents that are collected until a commit is sent - private final Worker[] transmissionWorker; // the transmission workers to solr - private final BlockingQueue[] transmissionQueue; // the queues quere documents are collected - private int transmissionRoundRobinCounter; // a rount robin counter for the transmission queues - /** * create a new solr connector * @param url the solr url, like http://192.168.1.60:8983/solr/ or http://admin:pw@192.168.1.60:8983/solr/ * @param scheme * @throws IOException */ - @SuppressWarnings("unchecked") public SolrSingleConnector(final String url) throws IOException { this.solrurl = url; - this.transmissionRoundRobinCounter = 0; - this.transmissionQueue = new ArrayBlockingQueue[transmissionQueueCount]; - for (int i = 0; i < transmissionQueueCount; i++) { - this.transmissionQueue[i] = new ArrayBlockingQueue(transmissionQueueSize); - } // connect using authentication final MultiProtocolURI u = new MultiProtocolURI(this.solrurl); @@ -121,65 +108,20 @@ public class SolrSingleConnector implements SolrConnector { } else { this.server = new HttpSolrServer(this.solrurl); } - - // start worker - this.transmissionWorker = new Worker[transmissionQueueCount]; - for (int i = 0; i < transmissionQueueCount; i++) { - this.transmissionWorker[i] = new Worker(i); - this.transmissionWorker[i].start(); - } - } - - private class Worker extends Thread { - boolean shallRun; - int idx; - public Worker(final int i) { - this.idx = i; - this.shallRun = true; - } - public void pleaseStop() { - this.shallRun = false; - } - @Override - public void run() { - while (this.shallRun) { - if (SolrSingleConnector.this.transmissionQueue[this.idx].size() > 0) { - try { - flushTransmissionQueue(this.idx); - } catch (final IOException e) { - Log.logSevere("SolrSingleConnector", "flush Transmission failed in worker:IO", e); - continue; - } catch (final SolrException e) { - Log.logSevere("SolrSingleConnector", "flush Transmission failed in worker:Solr", e); - continue; - } - } else { - try {Thread.sleep(1000);} catch (final InterruptedException e) {} - } - } - try { - flushTransmissionQueue(this.idx); - } catch (final IOException e) {} - } + this.server.setAllowCompression(true); + this.server.setConnectionTimeout(60000); + this.server.setMaxRetries(10); + this.server.setSoTimeout(60000); } @Override public void close() { - for (int i = 0; i < transmissionQueueCount; i++) { - if (this.transmissionWorker[i].isAlive()) { - this.transmissionWorker[i].pleaseStop(); - try {this.transmissionWorker[i].join();} catch (final InterruptedException e) {} - } - } - for (int i = 0; i < transmissionQueueCount; i++) { - try { - flushTransmissionQueue(i); - } catch (final IOException e) { - Log.logException(e); - } catch (final SolrException e) { - Log.logException(e); - } - + try { + this.server.commit(); + } catch (SolrServerException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); } } @@ -254,35 +196,23 @@ public class SolrSingleConnector implements SolrConnector { @Override public void add(final SolrDoc solrdoc) throws IOException, SolrException { - int thisrrc = this.transmissionRoundRobinCounter; - int nextrrc = thisrrc++; - if (nextrrc >= transmissionQueueCount) nextrrc = 0; - this.transmissionRoundRobinCounter = nextrrc; - if (this.transmissionWorker[thisrrc].isAlive()) { - this.transmissionQueue[thisrrc].offer(solrdoc); - } else { - if (this.transmissionQueue[thisrrc].size() > 0) flushTransmissionQueue(thisrrc); - final Collection docs = new ArrayList(); - docs.add(solrdoc); - addSolr(docs); + try { + this.server.add(solrdoc); + //this.server.commit(); + } catch (SolrServerException e) { + Log.logWarning("SolrConnector", e.getMessage() + " DOC=" + solrdoc.toString()); + throw new IOException(e); } } - protected void addSolr(final Collection docs) throws IOException, SolrException { - + public void add(final Collection solrdocs) throws IOException, SolrException { + ArrayList l = new ArrayList(); + for (SolrDoc d: solrdocs) l.add(d); try { - if (docs.size() != 0) this.server.add(docs); - this.server.commit(); - /* To immediately commit after adding documents, you could use: - UpdateRequest req = new UpdateRequest(); - req.setAction( UpdateRequest.ACTION.COMMIT, false, false ); - req.add( docs ); - UpdateResponse rsp = req.process( server ); - */ - } catch (final SolrException e) { - // the field is probably not known - Log.logWarning("SolrConnector", e.getMessage()); - } catch (final Throwable e) { + this.server.add(l); + //this.server.commit(); + } catch (SolrServerException e) { + Log.logWarning("SolrConnector", e.getMessage() + " DOC=" + solrdocs.toString()); throw new IOException(e); } } @@ -303,26 +233,11 @@ public class SolrSingleConnector implements SolrConnector { final String[] paths = path.split("/"); if (paths.length > 0) solrdoc.addField("attr_paths", paths); } - solrdoc.addField("failreason_t", failReason); solrdoc.addField("httpstatus_i", httpstatus); - add(solrdoc); } - private void flushTransmissionQueue(final int idx) throws IOException, SolrException { - final Collection c = new ArrayList(); - while (this.transmissionQueue[idx].size() > 0) { - try { - c.add(this.transmissionQueue[idx].take()); - } catch (final InterruptedException e) { - continue; - } - } - addSolr(c); - } - - /** * get a query result from solr * to get all results set the query String to "*:*" From 0d58fea2105f05ab27262ef0ee4279ab36fbccc6 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Sat, 12 May 2012 10:39:01 +0200 Subject: [PATCH 3/3] made multiple connector default --- htroot/IndexFederated_p.java | 4 ++-- .../cora/services/federated/solr/SolrShardingConnector.java | 6 ++++-- source/net/yacy/search/Switchboard.java | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/htroot/IndexFederated_p.java b/htroot/IndexFederated_p.java index ce0f8a33b..d0c56afe1 100644 --- a/htroot/IndexFederated_p.java +++ b/htroot/IndexFederated_p.java @@ -39,8 +39,8 @@ import net.yacy.cora.storage.ConfigurationSet; import net.yacy.kelondro.logging.Log; import net.yacy.search.Switchboard; import net.yacy.search.index.Segments; -import net.yacy.search.index.SolrField; import net.yacy.search.index.SolrConfiguration; +import net.yacy.search.index.SolrField; import de.anomic.server.serverObjects; import de.anomic.server.serverSwitch; @@ -93,7 +93,7 @@ public class IndexFederated_p { // switch on final boolean usesolr = sb.getConfigBool("federated.service.solr.indexing.enabled", false) & solrurls.length() > 0; try { - sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr((usesolr) ? new SolrShardingConnector(solrurls, SolrShardingSelection.Method.MODULO_HOST_MD5, 10000) : null); + sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr((usesolr) ? new SolrShardingConnector(solrurls, SolrShardingSelection.Method.MODULO_HOST_MD5, 10000, true) : null); } catch (final IOException e) { Log.logException(e); sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr(null); diff --git a/source/net/yacy/cora/services/federated/solr/SolrShardingConnector.java b/source/net/yacy/cora/services/federated/solr/SolrShardingConnector.java index c5536e531..df4f82f1c 100644 --- a/source/net/yacy/cora/services/federated/solr/SolrShardingConnector.java +++ b/source/net/yacy/cora/services/federated/solr/SolrShardingConnector.java @@ -44,12 +44,14 @@ public class SolrShardingConnector implements SolrConnector { private final SolrShardingSelection sharding; private final String[] urls; - public SolrShardingConnector(final String urlList, final SolrShardingSelection.Method method, final long timeout) throws IOException { + public SolrShardingConnector(final String urlList, final SolrShardingSelection.Method method, final long timeout, boolean multipleConnections) throws IOException { urlList.replace(' ', ','); this.urls = urlList.split(","); this.connectors = new ArrayList(); + SolrConnector s; for (final String u: this.urls) { - this.connectors.add(new SolrRetryConnector(new SolrSingleConnector(u.trim()), timeout)); + s = multipleConnections ? new SolrMultipleConnector(u.trim(), 2) : new SolrSingleConnector(u.trim()); + this.connectors.add(new SolrRetryConnector(s, timeout)); } this.sharding = new SolrShardingSelection(method, this.urls.length); } diff --git a/source/net/yacy/search/Switchboard.java b/source/net/yacy/search/Switchboard.java index b3b52a934..9dfeead92 100644 --- a/source/net/yacy/search/Switchboard.java +++ b/source/net/yacy/search/Switchboard.java @@ -656,7 +656,7 @@ public final class Switchboard extends serverSwitch (usesolr) ? new SolrShardingConnector( solrurls, SolrShardingSelection.Method.MODULO_HOST_MD5, - 10000) : null); + 10000, true) : null); } catch ( final IOException e ) { Log.logException(e); this.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr(null);