Merge branch 'master' of git://gitorious.org/yacy/rc1.git

pull/1/head
reger 13 years ago
commit 49867fcc8f

2
debian/control vendored

@ -2,7 +2,7 @@ Source: yacy
Section: network
Priority: extra
Maintainer: Michael Peter Christen <mc@yacy.net>
Build-Depends: ant, sun-java6-jdk | openjdk-6-jdk, debhelper (>= 5), m4
Build-Depends: ant, sun-java6-jdk | openjdk-7-jdk, debhelper (>= 5), m4
Standards-Version: 3.7.2
Package: yacy

@ -39,8 +39,8 @@ import net.yacy.cora.storage.ConfigurationSet;
import net.yacy.kelondro.logging.Log;
import net.yacy.search.Switchboard;
import net.yacy.search.index.Segments;
import net.yacy.search.index.SolrField;
import net.yacy.search.index.SolrConfiguration;
import net.yacy.search.index.SolrField;
import de.anomic.server.serverObjects;
import de.anomic.server.serverSwitch;
@ -93,7 +93,7 @@ public class IndexFederated_p {
// switch on
final boolean usesolr = sb.getConfigBool("federated.service.solr.indexing.enabled", false) & solrurls.length() > 0;
try {
sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr((usesolr) ? new SolrShardingConnector(solrurls, SolrShardingSelection.Method.MODULO_HOST_MD5, 10000) : null);
sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr((usesolr) ? new SolrShardingConnector(solrurls, SolrShardingSelection.Method.MODULO_HOST_MD5, 10000, true) : null);
} catch (final IOException e) {
Log.logException(e);
sb.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr(null);

@ -25,6 +25,7 @@
package net.yacy.cora.services.federated.solr;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import net.yacy.kelondro.data.meta.DigestURI;
@ -71,6 +72,7 @@ public interface SolrConnector {
* @throws SolrException
*/
public void add(final SolrDoc solrdoc) throws IOException, SolrException;
public void add(final Collection<SolrDoc> solrdocs) throws IOException, SolrException;
/**
* register an entry as error document

@ -0,0 +1,123 @@
package net.yacy.cora.services.federated.solr;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import net.yacy.kelondro.data.meta.DigestURI;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
public class SolrMultipleConnector implements SolrConnector {
private final static SolrDoc POISON_DOC = new SolrDoc();
private final ArrayBlockingQueue<SolrDoc> queue;
private final AddWorker[] worker;
private final SolrConnector solr;
public SolrMultipleConnector(final String url, int connections) throws IOException {
this.solr = new SolrSingleConnector(url);
this.queue = new ArrayBlockingQueue<SolrDoc>(1000);
this.worker = new AddWorker[connections];
for (int i = 0; i < connections; i++) {
this.worker[i] = new AddWorker(url);
this.worker[i].start();
}
}
private class AddWorker extends Thread {
private final SolrConnector solr;
public AddWorker(final String url) throws IOException {
this.solr = new SolrSingleConnector(url);
}
@Override
public void run() {
SolrDoc doc;
try {
while ((doc = SolrMultipleConnector.this.queue.take()) != POISON_DOC) {
try {
this.solr.add(doc);
} catch (SolrException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (InterruptedException e) {
} finally {
this.solr.close();
}
}
}
@Override
public void close() {
for (@SuppressWarnings("unused") AddWorker element : this.worker) {
try {
this.queue.put(POISON_DOC);
} catch (InterruptedException e) {
e.printStackTrace();
}
this.solr.close();
}
}
@Override
public void clear() throws IOException {
this.solr.clear();
}
@Override
public void delete(String id) throws IOException {
this.solr.delete(id);
}
@Override
public void delete(List<String> ids) throws IOException {
this.solr.delete(ids);
}
@Override
public boolean exists(String id) throws IOException {
return this.solr.exists(id);
}
@Override
public void add(final SolrDoc solrdoc) throws IOException, SolrException {
try {
this.queue.put(solrdoc);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void add(final Collection<SolrDoc> solrdocs) throws IOException, SolrException {
for (SolrDoc d: solrdocs) {
try {
this.queue.put(d);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void err(DigestURI digestURI, String failReason, int httpstatus) throws IOException {
this.solr.err(digestURI, failReason, httpstatus);
}
@Override
public SolrDocumentList get(String querystring, int offset, int count) throws IOException {
return this.solr.get(querystring, offset, count);
}
@Override
public long getSize() {
return this.solr.getSize();
}
}

@ -25,6 +25,7 @@
package net.yacy.cora.services.federated.solr;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import net.yacy.kelondro.data.meta.DigestURI;
@ -122,6 +123,11 @@ public class SolrRetryConnector implements SolrConnector {
if (ee != null) throw (ee instanceof IOException) ? (IOException) ee : new IOException(ee.getMessage());
}
@Override
public void add(final Collection<SolrDoc> solrdocs) throws IOException, SolrException {
for (SolrDoc d: solrdocs) add(d);
}
@Override
public void err(final DigestURI digestURI, final String failReason, final int httpstatus) throws IOException {
final long t = System.currentTimeMillis() + this.retryMaxTime;

@ -35,6 +35,7 @@ import net.yacy.kelondro.data.meta.DigestURI;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
public class SolrShardingConnector implements SolrConnector {
@ -43,12 +44,14 @@ public class SolrShardingConnector implements SolrConnector {
private final SolrShardingSelection sharding;
private final String[] urls;
public SolrShardingConnector(final String urlList, final SolrShardingSelection.Method method, final long timeout) throws IOException {
public SolrShardingConnector(final String urlList, final SolrShardingSelection.Method method, final long timeout, boolean multipleConnections) throws IOException {
urlList.replace(' ', ',');
this.urls = urlList.split(",");
this.connectors = new ArrayList<SolrConnector>();
SolrConnector s;
for (final String u: this.urls) {
this.connectors.add(new SolrRetryConnector(new SolrSingleConnector(u.trim()), timeout));
s = multipleConnections ? new SolrMultipleConnector(u.trim(), 2) : new SolrSingleConnector(u.trim());
this.connectors.add(new SolrRetryConnector(s, timeout));
}
this.sharding = new SolrShardingSelection(method, this.urls.length);
}
@ -111,6 +114,11 @@ public class SolrShardingConnector implements SolrConnector {
this.connectors.get(this.sharding.select(solrdoc)).add(solrdoc);
}
@Override
public void add(final Collection<SolrDoc> solrdocs) throws IOException, SolrException {
for (SolrDoc d: solrdocs) this.connectors.get(this.sharding.select(d)).add(d);
}
/**
* add a collection of Solr documents
* @param docs

@ -30,8 +30,6 @@ import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import net.yacy.cora.document.ASCII;
import net.yacy.cora.document.MultiProtocolURI;
@ -50,6 +48,7 @@ import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.protocol.HttpContext;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
@ -64,26 +63,14 @@ public class SolrSingleConnector implements SolrConnector {
private final int port;
private HttpSolrServer server;
private final static int transmissionQueueCount = 4; // allow concurrent http sessions to solr
private final static int transmissionQueueSize = 50; // number of documents that are collected until a commit is sent
private final Worker[] transmissionWorker; // the transmission workers to solr
private final BlockingQueue<SolrDoc>[] transmissionQueue; // the queues quere documents are collected
private int transmissionRoundRobinCounter; // a rount robin counter for the transmission queues
/**
* create a new solr connector
* @param url the solr url, like http://192.168.1.60:8983/solr/ or http://admin:pw@192.168.1.60:8983/solr/
* @param scheme
* @throws IOException
*/
@SuppressWarnings("unchecked")
public SolrSingleConnector(final String url) throws IOException {
this.solrurl = url;
this.transmissionRoundRobinCounter = 0;
this.transmissionQueue = new ArrayBlockingQueue[transmissionQueueCount];
for (int i = 0; i < transmissionQueueCount; i++) {
this.transmissionQueue[i] = new ArrayBlockingQueue<SolrDoc>(transmissionQueueSize);
}
// connect using authentication
final MultiProtocolURI u = new MultiProtocolURI(this.solrurl);
@ -121,65 +108,20 @@ public class SolrSingleConnector implements SolrConnector {
} else {
this.server = new HttpSolrServer(this.solrurl);
}
// start worker
this.transmissionWorker = new Worker[transmissionQueueCount];
for (int i = 0; i < transmissionQueueCount; i++) {
this.transmissionWorker[i] = new Worker(i);
this.transmissionWorker[i].start();
}
}
private class Worker extends Thread {
boolean shallRun;
int idx;
public Worker(final int i) {
this.idx = i;
this.shallRun = true;
}
public void pleaseStop() {
this.shallRun = false;
}
@Override
public void run() {
while (this.shallRun) {
if (SolrSingleConnector.this.transmissionQueue[this.idx].size() > 0) {
try {
flushTransmissionQueue(this.idx);
} catch (final IOException e) {
Log.logSevere("SolrSingleConnector", "flush Transmission failed in worker:IO", e);
continue;
} catch (final SolrException e) {
Log.logSevere("SolrSingleConnector", "flush Transmission failed in worker:Solr", e);
continue;
}
} else {
try {Thread.sleep(1000);} catch (final InterruptedException e) {}
}
}
try {
flushTransmissionQueue(this.idx);
} catch (final IOException e) {}
}
this.server.setAllowCompression(true);
this.server.setConnectionTimeout(60000);
this.server.setMaxRetries(10);
this.server.setSoTimeout(60000);
}
@Override
public void close() {
for (int i = 0; i < transmissionQueueCount; i++) {
if (this.transmissionWorker[i].isAlive()) {
this.transmissionWorker[i].pleaseStop();
try {this.transmissionWorker[i].join();} catch (final InterruptedException e) {}
}
}
for (int i = 0; i < transmissionQueueCount; i++) {
try {
flushTransmissionQueue(i);
} catch (final IOException e) {
Log.logException(e);
} catch (final SolrException e) {
Log.logException(e);
}
try {
this.server.commit();
} catch (SolrServerException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
@ -254,35 +196,23 @@ public class SolrSingleConnector implements SolrConnector {
@Override
public void add(final SolrDoc solrdoc) throws IOException, SolrException {
int thisrrc = this.transmissionRoundRobinCounter;
int nextrrc = thisrrc++;
if (nextrrc >= transmissionQueueCount) nextrrc = 0;
this.transmissionRoundRobinCounter = nextrrc;
if (this.transmissionWorker[thisrrc].isAlive()) {
this.transmissionQueue[thisrrc].offer(solrdoc);
} else {
if (this.transmissionQueue[thisrrc].size() > 0) flushTransmissionQueue(thisrrc);
final Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
docs.add(solrdoc);
addSolr(docs);
try {
this.server.add(solrdoc);
//this.server.commit();
} catch (SolrServerException e) {
Log.logWarning("SolrConnector", e.getMessage() + " DOC=" + solrdoc.toString());
throw new IOException(e);
}
}
protected void addSolr(final Collection<SolrInputDocument> docs) throws IOException, SolrException {
public void add(final Collection<SolrDoc> solrdocs) throws IOException, SolrException {
ArrayList<SolrInputDocument> l = new ArrayList<SolrInputDocument>();
for (SolrDoc d: solrdocs) l.add(d);
try {
if (docs.size() != 0) this.server.add(docs);
this.server.commit();
/* To immediately commit after adding documents, you could use:
UpdateRequest req = new UpdateRequest();
req.setAction( UpdateRequest.ACTION.COMMIT, false, false );
req.add( docs );
UpdateResponse rsp = req.process( server );
*/
} catch (final SolrException e) {
// the field is probably not known
Log.logWarning("SolrConnector", e.getMessage());
} catch (final Throwable e) {
this.server.add(l);
//this.server.commit();
} catch (SolrServerException e) {
Log.logWarning("SolrConnector", e.getMessage() + " DOC=" + solrdocs.toString());
throw new IOException(e);
}
}
@ -303,26 +233,11 @@ public class SolrSingleConnector implements SolrConnector {
final String[] paths = path.split("/");
if (paths.length > 0) solrdoc.addField("attr_paths", paths);
}
solrdoc.addField("failreason_t", failReason);
solrdoc.addField("httpstatus_i", httpstatus);
add(solrdoc);
}
private void flushTransmissionQueue(final int idx) throws IOException, SolrException {
final Collection<SolrInputDocument> c = new ArrayList<SolrInputDocument>();
while (this.transmissionQueue[idx].size() > 0) {
try {
c.add(this.transmissionQueue[idx].take());
} catch (final InterruptedException e) {
continue;
}
}
addSolr(c);
}
/**
* get a query result from solr
* to get all results set the query String to "*:*"

@ -656,7 +656,7 @@ public final class Switchboard extends serverSwitch
(usesolr) ? new SolrShardingConnector(
solrurls,
SolrShardingSelection.Method.MODULO_HOST_MD5,
10000) : null);
10000, true) : null);
} catch ( final IOException e ) {
Log.logException(e);
this.indexSegments.segment(Segments.Process.LOCALCRAWLING).connectSolr(null);

Loading…
Cancel
Save