added concurrent iterator methods to the solr connectors

pull/1/head
Michael Peter Christen 13 years ago
parent d54b80327a
commit 0904afe8fb

@ -1,7 +1,7 @@
/** /**
* AbstractSolrConnector * AbstractSolrConnector
* Copyright 2012 by Michael Peter Christen * Copyright 2012 by Michael Peter Christen
* First released 21.06.2012 at http://yacy.net * First released 27.06.2012 at http://yacy.net
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public * modify it under the terms of the GNU Lesser General Public
@ -20,140 +20,33 @@
package net.yacy.cora.services.federated.solr; package net.yacy.cora.services.federated.solr;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.Iterator;
import java.util.Collection; import java.util.concurrent.BlockingQueue;
import java.util.List; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import net.yacy.kelondro.logging.Log; import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.util.LookAheadIterator;
import net.yacy.search.index.YaCySchema; import net.yacy.search.index.YaCySchema;
import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
public class AbstractSolrConnector implements SolrConnector { public abstract class AbstractSolrConnector implements SolrConnector {
private final static SolrQuery catchallQuery = new SolrQuery(); public final SolrDocument POISON_DOCUMENT = new SolrDocument();
public final String POISON_ID = "POISON_ID";
public final static SolrQuery catchallQuery = new SolrQuery();
static { static {
catchallQuery.setQuery("*:*"); catchallQuery.setQuery("*:*");
catchallQuery.setFields(YaCySchema.id.name()); catchallQuery.setFields(YaCySchema.id.name());
catchallQuery.setRows(1); catchallQuery.setRows(1);
catchallQuery.setStart(0); catchallQuery.setStart(0);
}
protected SolrServer server;
protected int commitWithinMs; // max time (in ms) before a commit will happen
protected AbstractSolrConnector() {
this.server = null;
this.commitWithinMs = 180000;
}
protected void init(SolrServer server) {
this.server = server;
}
public SolrServer getServer() {
return this.server;
}
/**
* get the solr autocommit delay
* @return the maximum waiting time after a solr command until it is transported to the server
*/
@Override
public int getCommitWithinMs() {
return this.commitWithinMs;
}
/**
* set the solr autocommit delay
* @param c the maximum waiting time after a solr command until it is transported to the server
*/
@Override
public void setCommitWithinMs(int c) {
this.commitWithinMs = c;
}
@Override
public synchronized void close() {
try {
this.server.commit();
this.server = null;
} catch (SolrServerException e) {
Log.logException(e);
} catch (IOException e) {
Log.logException(e);
}
}
@Override
public long getSize() {
try {
final QueryResponse rsp = this.server.query(catchallQuery);
if (rsp == null) return 0;
final SolrDocumentList docs = rsp.getResults();
if (docs == null) return 0;
return docs.getNumFound();
} catch (final Throwable e) {
Log.logException(e);
return 0;
}
}
/**
* delete everything in the solr index
* @throws IOException
*/
@Override
public void clear() throws IOException {
try {
this.server.deleteByQuery("*:*");
this.server.commit();
} catch (final Throwable e) {
throw new IOException(e);
}
}
@Override
public void delete(final String id) throws IOException {
try {
this.server.deleteById(id, this.commitWithinMs);
} catch (final Throwable e) {
throw new IOException(e);
}
}
@Override
public void delete(final List<String> ids) throws IOException {
try {
this.server.deleteById(ids, this.commitWithinMs);
} catch (final Throwable e) {
throw new IOException(e);
}
}
/**
* delete entries from solr according the given solr query string
* @param id the url hash of the entry
* @throws IOException
*/
@Override
public void deleteByQuery(final String querystring) throws IOException {
try {
this.server.deleteByQuery(querystring, this.commitWithinMs);
} catch (final Throwable e) {
throw new IOException(e);
}
} }
private final static int pagesize = 10;
@Override @Override
public boolean exists(final String id) throws IOException { public boolean exists(final String id) throws IOException {
@ -166,115 +59,78 @@ public class AbstractSolrConnector implements SolrConnector {
} }
} }
public void add(final File file, final String solrId) throws IOException {
final ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update/extract");
up.addFile(file);
up.setParam("literal.id", solrId);
up.setParam("uprefix", "attr_");
up.setParam("fmap.content", "attr_content");
up.setCommitWithin(this.commitWithinMs);
//up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
try {
this.server.request(up);
this.server.commit();
} catch (final Throwable e) {
throw new IOException(e);
}
}
@Override
public void add(final SolrInputDocument solrdoc) throws IOException, SolrException {
try {
this.server.add(solrdoc, this.commitWithinMs);
//this.server.commit();
} catch (SolrServerException e) {
Log.logWarning("SolrConnector", e.getMessage() + " DOC=" + solrdoc.toString());
throw new IOException(e);
}
}
@Override @Override
public void add(final Collection<SolrInputDocument> solrdocs) throws IOException, SolrException { public BlockingQueue<SolrDocument> concurrentQuery(final String querystring, final int offset, final int maxcount, final long maxtime) {
ArrayList<SolrInputDocument> l = new ArrayList<SolrInputDocument>(); final BlockingQueue<SolrDocument> queue = new LinkedBlockingQueue<SolrDocument>();
for (SolrInputDocument d: solrdocs) l.add(d); final long endtime = System.currentTimeMillis() + maxtime;
try { final Thread t = new Thread() {
this.server.add(l, this.commitWithinMs); @Override
//this.server.commit(); public void run() {
} catch (SolrServerException e) { int o = offset;
Log.logWarning("SolrConnector", e.getMessage() + " DOC=" + solrdocs.toString()); while (System.currentTimeMillis() < endtime) {
throw new IOException(e); try {
} SolrDocumentList sdl = query(querystring, o, pagesize);
for (SolrDocument d: sdl) {
try {queue.put(d);} catch (InterruptedException e) {break;}
}
if (sdl.size() < pagesize) break;
} catch (SolrException e) {
break;
} catch (IOException e) {
break;
}
}
try {queue.put(AbstractSolrConnector.this.POISON_DOCUMENT);} catch (InterruptedException e1) {}
}
};
t.start();
return queue;
} }
/**
* get a query result from solr
* to get all results set the query String to "*:*"
* @param querystring
* @throws IOException
*/
@Override @Override
public SolrDocumentList query(final String querystring, final int offset, final int count) throws IOException { public BlockingQueue<String> concurrentIDs(final String querystring, final int offset, final int maxcount, final long maxtime) {
// construct query final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
final SolrQuery query = new SolrQuery(); final long endtime = System.currentTimeMillis() + maxtime;
query.setQuery(querystring); final Thread t = new Thread() {
query.setRows(count); @Override
query.setStart(offset); public void run() {
//query.addSortField( "price", SolrQuery.ORDER.asc ); int o = offset;
while (System.currentTimeMillis() < endtime) {
// query the server try {
//SearchResult result = new SearchResult(count); SolrDocumentList sdl = query(querystring, o, pagesize);
Throwable error = null; // well this is a bad hack; see https://issues.apache.org/jira/browse/LUCENE-2239 for (SolrDocument d: sdl) {
// also: https://issues.apache.org/jira/browse/SOLR-2247 try {queue.put((String) d.getFieldValue(YaCySchema.id.name()));} catch (InterruptedException e) {break;}
// we might try also: $JAVA_OPTS -Dsolr.directoryFactory=solr.MMapDirectoryFactory }
for (int retry = 30; retry > 0; retry--) { if (sdl.size() < pagesize) break;
try { } catch (SolrException e) {
final QueryResponse rsp = this.server.query(query); break;
final SolrDocumentList docs = rsp.getResults(); } catch (IOException e) {
if (error != null) Log.logWarning("AbstractSolrConnector", "produced search result by silently ignoring an error before, message = " + error.getMessage()); break;
return docs; }
} catch (final Throwable e) { }
Log.logWarning("AbstractSolrConnection", "problem with query=" + querystring, e); try {queue.put(AbstractSolrConnector.this.POISON_ID);} catch (InterruptedException e1) {}
error = e;
continue;
} }
} };
throw new IOException(error.getMessage(), error); t.start();
return queue;
} }
/**
* get a document from solr by given id
* @param id
* @return one result or null if no result exists
* @throws IOException
*/
@Override @Override
public SolrDocument get(final String id) throws IOException { public Iterator<String> iterator() {
// construct query final BlockingQueue<String> queue = concurrentIDs("*:*", 0, Integer.MAX_VALUE, 60000);
StringBuffer sb = new StringBuffer(id.length() + 5); return new LookAheadIterator<String>() {
sb.append(YaCySchema.id.getSolrFieldName()).append(':').append('"').append(id).append('"'); @Override
final SolrQuery query = new SolrQuery(); protected String next0() {
query.setQuery(sb.toString()); try {
query.setRows(1); String s = queue.poll(60000, TimeUnit.MILLISECONDS);
query.setStart(0); if (s == AbstractSolrConnector.this.POISON_ID) return null;
return s;
// query the server } catch (InterruptedException e) {
Throwable error = null; // well this is a bad hack; see https://issues.apache.org/jira/browse/LUCENE-2239 return null;
// also: https://issues.apache.org/jira/browse/SOLR-2247 }
// we might try also: $JAVA_OPTS -Dsolr.directoryFactory=solr.MMapDirectoryFactory
for (int retry = 30; retry > 0; retry--) {
try {
final QueryResponse rsp = this.server.query(query);
final SolrDocumentList docs = rsp.getResults();
if (docs.isEmpty()) return null;
if (error != null) Log.logWarning("AbstractSolrConnector", "produced search result by silently ignoring an error before, message = " + error.getMessage());
return docs.get(0);
} catch (final Throwable e) {
Log.logWarning("AbstractSolrConnection", "problem with id=" + id, e);
error = e;
continue;
} }
}
throw new IOException(error.getMessage(), error); };
} }
} }

@ -43,7 +43,7 @@ import org.apache.solr.common.SolrInputDocument;
* Because it is not possible to set a cache in front of this class (the single connect methods would need to be passed through the cache class), * Because it is not possible to set a cache in front of this class (the single connect methods would need to be passed through the cache class),
* this class also contains an object and hit/miss cache. * this class also contains an object and hit/miss cache.
*/ */
public class MirrorSolrConnector implements SolrConnector { public class MirrorSolrConnector extends AbstractSolrConnector implements SolrConnector {
private final static Object EXIST = new Object(); private final static Object EXIST = new Object();

@ -30,7 +30,7 @@ import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
public class MultipleSolrConnector implements SolrConnector { public class MultipleSolrConnector extends AbstractSolrConnector implements SolrConnector {
private final static SolrInputDocument POISON_DOC = new SolrInputDocument(); private final static SolrInputDocument POISON_DOC = new SolrInputDocument();
@ -133,11 +133,6 @@ public class MultipleSolrConnector implements SolrConnector {
this.solr.deleteByQuery(querystring); this.solr.deleteByQuery(querystring);
} }
@Override
public boolean exists(String id) throws IOException {
return this.solr.exists(id);
}
@Override @Override
public SolrDocument get(String id) throws IOException { public SolrDocument get(String id) throws IOException {
return this.solr.get(id); return this.solr.get(id);

@ -44,7 +44,7 @@ import org.apache.http.protocol.HttpContext;
import org.apache.solr.client.solrj.impl.HttpSolrServer; import org.apache.solr.client.solrj.impl.HttpSolrServer;
public class RemoteSolrConnector extends AbstractSolrConnector implements SolrConnector { public class RemoteSolrConnector extends SolrServerConnector implements SolrConnector {
private final String solrurl, host, solrpath, solraccount, solrpw; private final String solrurl, host, solrpath, solraccount, solrpw;
private final int port; private final int port;

@ -29,7 +29,7 @@ import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
public class RetrySolrConnector implements SolrConnector { public class RetrySolrConnector extends AbstractSolrConnector implements SolrConnector {
private final SolrConnector solrConnector; private final SolrConnector solrConnector;
private final long retryMaxTime; private final long retryMaxTime;

@ -38,7 +38,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
public class ShardSolrConnector implements SolrConnector { public class ShardSolrConnector extends AbstractSolrConnector implements SolrConnector {
private final List<SolrConnector> connectors; private final List<SolrConnector> connectors;
private final ShardSelection sharding; private final ShardSelection sharding;

@ -27,13 +27,14 @@ package net.yacy.cora.services.federated.solr;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue;
import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputDocument;
public interface SolrConnector { public interface SolrConnector extends Iterable<String> /* Iterable of document IDs */ {
/** /**
* get the solr autocommit delay * get the solr autocommit delay
@ -112,6 +113,28 @@ public interface SolrConnector {
*/ */
public SolrDocumentList query(final String querystring, final int offset, final int count) throws IOException, SolrException; public SolrDocumentList query(final String querystring, final int offset, final int count) throws IOException, SolrException;
/**
* Get a query result from solr as a stream of documents.
* The result queue is considered as terminated if AbstractSolrConnectro.POISON_DOCUMENT is returned.
* The method returns immediately and feeds the search results into the queue
* @param querystring
* @param offset
* @param maxcount
* @return
*/
public BlockingQueue<SolrDocument> concurrentQuery(final String querystring, final int offset, final int maxcount, final long maxtime);
/**
* get a document id result stream from a solr query.
* The result queue is considered as terminated if AbstractSolrConnectro.POISON_ID is returned.
* The method returns immediately and feeds the search results into the queue
* @param querystring
* @param offset
* @param maxcount
* @return
*/
public BlockingQueue<String> concurrentIDs(final String querystring, final int offset, final int maxcount, final long maxtime);
/** /**
* get the size of the index * get the size of the index
* @return number of results if solr is queries with a catch-all pattern * @return number of results if solr is queries with a catch-all pattern

@ -0,0 +1,261 @@
/**
* SolrServerConnector
* Copyright 2012 by Michael Peter Christen
* First released 21.06.2012 at http://yacy.net
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
*/
package net.yacy.cora.services.federated.solr;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import net.yacy.kelondro.logging.Log;
import net.yacy.search.index.YaCySchema;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
public class SolrServerConnector extends AbstractSolrConnector implements SolrConnector {
protected SolrServer server;
protected int commitWithinMs; // max time (in ms) before a commit will happen
protected SolrServerConnector() {
this.server = null;
this.commitWithinMs = 180000;
}
protected void init(SolrServer server) {
this.server = server;
}
public SolrServer getServer() {
return this.server;
}
/**
* get the solr autocommit delay
* @return the maximum waiting time after a solr command until it is transported to the server
*/
@Override
public int getCommitWithinMs() {
return this.commitWithinMs;
}
/**
* set the solr autocommit delay
* @param c the maximum waiting time after a solr command until it is transported to the server
*/
@Override
public void setCommitWithinMs(int c) {
this.commitWithinMs = c;
}
@Override
public synchronized void close() {
try {
this.server.commit();
this.server = null;
} catch (SolrServerException e) {
Log.logException(e);
} catch (IOException e) {
Log.logException(e);
}
}
@Override
public long getSize() {
try {
final QueryResponse rsp = this.server.query(AbstractSolrConnector.catchallQuery);
if (rsp == null) return 0;
final SolrDocumentList docs = rsp.getResults();
if (docs == null) return 0;
return docs.getNumFound();
} catch (final Throwable e) {
Log.logException(e);
return 0;
}
}
/**
* delete everything in the solr index
* @throws IOException
*/
@Override
public void clear() throws IOException {
try {
this.server.deleteByQuery("*:*");
this.server.commit();
} catch (final Throwable e) {
throw new IOException(e);
}
}
@Override
public void delete(final String id) throws IOException {
try {
this.server.deleteById(id, this.commitWithinMs);
} catch (final Throwable e) {
throw new IOException(e);
}
}
@Override
public void delete(final List<String> ids) throws IOException {
try {
this.server.deleteById(ids, this.commitWithinMs);
} catch (final Throwable e) {
throw new IOException(e);
}
}
/**
* delete entries from solr according the given solr query string
* @param id the url hash of the entry
* @throws IOException
*/
@Override
public void deleteByQuery(final String querystring) throws IOException {
try {
this.server.deleteByQuery(querystring, this.commitWithinMs);
} catch (final Throwable e) {
throw new IOException(e);
}
}
public void add(final File file, final String solrId) throws IOException {
final ContentStreamUpdateRequest up = new ContentStreamUpdateRequest("/update/extract");
up.addFile(file);
up.setParam("literal.id", solrId);
up.setParam("uprefix", "attr_");
up.setParam("fmap.content", "attr_content");
up.setCommitWithin(this.commitWithinMs);
//up.setAction(AbstractUpdateRequest.ACTION.COMMIT, true, true);
try {
this.server.request(up);
this.server.commit();
} catch (final Throwable e) {
throw new IOException(e);
}
}
@Override
public void add(final SolrInputDocument solrdoc) throws IOException, SolrException {
try {
this.server.add(solrdoc, this.commitWithinMs);
//this.server.commit();
} catch (SolrServerException e) {
Log.logWarning("SolrConnector", e.getMessage() + " DOC=" + solrdoc.toString());
throw new IOException(e);
}
}
@Override
public void add(final Collection<SolrInputDocument> solrdocs) throws IOException, SolrException {
ArrayList<SolrInputDocument> l = new ArrayList<SolrInputDocument>();
for (SolrInputDocument d: solrdocs) l.add(d);
try {
this.server.add(l, this.commitWithinMs);
//this.server.commit();
} catch (SolrServerException e) {
Log.logWarning("SolrConnector", e.getMessage() + " DOC=" + solrdocs.toString());
throw new IOException(e);
}
}
/**
* get a query result from solr
* to get all results set the query String to "*:*"
* @param querystring
* @throws IOException
*/
@Override
public SolrDocumentList query(final String querystring, final int offset, final int count) throws IOException {
// construct query
final SolrQuery query = new SolrQuery();
query.setQuery(querystring);
query.setRows(count);
query.setStart(offset);
//query.addSortField( "price", SolrQuery.ORDER.asc );
// query the server
//SearchResult result = new SearchResult(count);
Throwable error = null; // well this is a bad hack; see https://issues.apache.org/jira/browse/LUCENE-2239
// also: https://issues.apache.org/jira/browse/SOLR-2247
// we might try also: $JAVA_OPTS -Dsolr.directoryFactory=solr.MMapDirectoryFactory
for (int retry = 30; retry > 0; retry--) {
try {
final QueryResponse rsp = this.server.query(query);
final SolrDocumentList docs = rsp.getResults();
if (error != null) Log.logWarning("AbstractSolrConnector", "produced search result by silently ignoring an error before, message = " + error.getMessage());
return docs;
} catch (final Throwable e) {
Log.logWarning("AbstractSolrConnection", "problem with query=" + querystring, e);
error = e;
continue;
}
}
throw new IOException(error.getMessage(), error);
}
/**
* get a document from solr by given id
* @param id
* @return one result or null if no result exists
* @throws IOException
*/
@Override
public SolrDocument get(final String id) throws IOException {
// construct query
StringBuffer sb = new StringBuffer(id.length() + 5);
sb.append(YaCySchema.id.getSolrFieldName()).append(':').append('"').append(id).append('"');
final SolrQuery query = new SolrQuery();
query.setQuery(sb.toString());
query.setRows(1);
query.setStart(0);
// query the server
Throwable error = null; // well this is a bad hack; see https://issues.apache.org/jira/browse/LUCENE-2239
// also: https://issues.apache.org/jira/browse/SOLR-2247
// we might try also: $JAVA_OPTS -Dsolr.directoryFactory=solr.MMapDirectoryFactory
for (int retry = 30; retry > 0; retry--) {
try {
final QueryResponse rsp = this.server.query(query);
final SolrDocumentList docs = rsp.getResults();
if (docs.isEmpty()) return null;
if (error != null) Log.logWarning("AbstractSolrConnector", "produced search result by silently ignoring an error before, message = " + error.getMessage());
return docs.get(0);
} catch (final Throwable e) {
Log.logWarning("AbstractSolrConnection", "problem with id=" + id, e);
error = e;
continue;
}
}
throw new IOException(error.getMessage(), error);
}
}

@ -26,7 +26,7 @@ import java.io.IOException;
import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.ParserConfigurationException;
import net.yacy.cora.services.federated.solr.AbstractSolrConnector; import net.yacy.cora.services.federated.solr.SolrServerConnector;
import net.yacy.cora.services.federated.solr.SolrConnector; import net.yacy.cora.services.federated.solr.SolrConnector;
import net.yacy.cora.services.federated.solr.SolrDoc; import net.yacy.cora.services.federated.solr.SolrDoc;
import net.yacy.kelondro.logging.Log; import net.yacy.kelondro.logging.Log;
@ -52,7 +52,7 @@ import org.xml.sax.SAXException;
import com.google.common.io.Files; import com.google.common.io.Files;
public class EmbeddedSolrConnector extends AbstractSolrConnector implements SolrConnector { public class EmbeddedSolrConnector extends SolrServerConnector implements SolrConnector {
public static final String SELECT = "/select"; public static final String SELECT = "/select";
public static final String CONTEXT = "/solr"; public static final String CONTEXT = "/solr";

Loading…
Cancel
Save