|
|
|
@ -23,17 +23,15 @@ package net.yacy.cora.federate.solr.connector;
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
import java.util.Collection;
|
|
|
|
|
import java.util.HashSet;
|
|
|
|
|
import java.util.Iterator;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.Set;
|
|
|
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
|
|
|
import java.util.concurrent.BlockingQueue;
|
|
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
|
|
|
|
import net.yacy.cora.sorting.ReversibleScoreMap;
|
|
|
|
|
import net.yacy.cora.storage.ARH;
|
|
|
|
|
import net.yacy.cora.storage.ConcurrentARH;
|
|
|
|
|
import net.yacy.cora.storage.ARC;
|
|
|
|
|
import net.yacy.cora.storage.ConcurrentARC;
|
|
|
|
|
import net.yacy.cora.util.ConcurrentLog;
|
|
|
|
|
import net.yacy.kelondro.util.MemoryControl;
|
|
|
|
|
import net.yacy.search.schema.CollectionSchema;
|
|
|
|
@ -68,7 +66,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
try {
|
|
|
|
|
removeIdFromUpdateQueue(id);
|
|
|
|
|
ConcurrentUpdateSolrConnector.this.connector.deleteById(id);
|
|
|
|
|
ConcurrentUpdateSolrConnector.this.idCache.delete(id);
|
|
|
|
|
ConcurrentUpdateSolrConnector.this.idCache.remove(id);
|
|
|
|
|
} catch (final IOException e) {
|
|
|
|
|
ConcurrentLog.logException(e);
|
|
|
|
|
}
|
|
|
|
@ -90,7 +88,9 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
// accumulate a collection of documents because that is better to send at once to a remote server
|
|
|
|
|
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(getmore + 1);
|
|
|
|
|
docs.add(doc);
|
|
|
|
|
updateIdCache((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()));
|
|
|
|
|
String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
|
|
|
|
|
long date = AbstractSolrConnector.getLoadDate(doc);
|
|
|
|
|
updateIdCache(id, date);
|
|
|
|
|
for (int i = 0; i < getmore; i++) {
|
|
|
|
|
SolrInputDocument d = ConcurrentUpdateSolrConnector.this.updateQueue.take();
|
|
|
|
|
if (d == POISON_DOCUMENT) {
|
|
|
|
@ -98,7 +98,9 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
docs.add(d);
|
|
|
|
|
updateIdCache((String) d.getFieldValue(CollectionSchema.id.getSolrFieldName()));
|
|
|
|
|
id = (String) d.getFieldValue(CollectionSchema.id.getSolrFieldName());
|
|
|
|
|
date = AbstractSolrConnector.getLoadDate(d);
|
|
|
|
|
updateIdCache(id, date);
|
|
|
|
|
}
|
|
|
|
|
//ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending " + docs.size() + " documents to solr");
|
|
|
|
|
try {
|
|
|
|
@ -109,7 +111,9 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
} else {
|
|
|
|
|
// if there is only a single document, send this directly to solr
|
|
|
|
|
//ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending one document to solr");
|
|
|
|
|
updateIdCache((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()));
|
|
|
|
|
String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
|
|
|
|
|
long date = AbstractSolrConnector.getLoadDate(doc);
|
|
|
|
|
updateIdCache(id, date);
|
|
|
|
|
try {
|
|
|
|
|
ConcurrentUpdateSolrConnector.this.connector.add(doc);
|
|
|
|
|
} catch (final OutOfMemoryError e) {
|
|
|
|
@ -131,14 +135,14 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private ARH<String> idCache;
|
|
|
|
|
private ARC<String, Long> idCache;
|
|
|
|
|
private BlockingQueue<SolrInputDocument> updateQueue;
|
|
|
|
|
private BlockingQueue<String> deleteQueue;
|
|
|
|
|
private Thread deletionHandler, updateHandler;
|
|
|
|
|
|
|
|
|
|
public ConcurrentUpdateSolrConnector(SolrConnector connector, int updateCapacity, int idCacheCapacity, int concurrency) {
|
|
|
|
|
this.connector = connector;
|
|
|
|
|
this.idCache = new ConcurrentARH<String>(idCacheCapacity, concurrency);
|
|
|
|
|
this.idCache = new ConcurrentARC<String, Long>(idCacheCapacity, concurrency); // url hash to load time
|
|
|
|
|
this.updateQueue = new ArrayBlockingQueue<SolrInputDocument>(updateCapacity);
|
|
|
|
|
this.deleteQueue = new LinkedBlockingQueue<String>();
|
|
|
|
|
this.deletionHandler = null;
|
|
|
|
@ -162,7 +166,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
* used for debugging
|
|
|
|
|
*/
|
|
|
|
|
private static void cacheSuccessSign() {
|
|
|
|
|
//Log.logInfo("ConcurrentUpdate", "**** cache hit");
|
|
|
|
|
//ConcurrentLog.info("ConcurrentUpdate", "**** cache hit");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private boolean existIdFromDeleteQueue(String id) {
|
|
|
|
@ -188,16 +192,16 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private boolean existIdFromUpdateQueue(String id) {
|
|
|
|
|
if (this.updateQueue.size() == 0) return false;
|
|
|
|
|
private long existIdFromUpdateQueue(String id) {
|
|
|
|
|
if (this.updateQueue.size() == 0) return -1;
|
|
|
|
|
Iterator<SolrInputDocument> i = this.updateQueue.iterator();
|
|
|
|
|
while (i.hasNext()) {
|
|
|
|
|
SolrInputDocument doc = i.next();
|
|
|
|
|
if (doc == null) break;
|
|
|
|
|
String docID = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
|
|
|
|
|
if (docID != null && docID.equals(id)) return true;
|
|
|
|
|
if (docID != null && docID.equals(id)) return AbstractSolrConnector.getLoadDate(doc);
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void removeIdFromUpdateQueue(String id) {
|
|
|
|
@ -227,10 +231,10 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void updateIdCache(String id) {
|
|
|
|
|
private void updateIdCache(String id, long time) {
|
|
|
|
|
if (id == null) return;
|
|
|
|
|
if (MemoryControl.shortStatus()) this.idCache.clear();
|
|
|
|
|
this.idCache.add(id);
|
|
|
|
|
this.idCache.put(id, time);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void ensureAliveDeletionHandler() {
|
|
|
|
@ -312,7 +316,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
@Override
|
|
|
|
|
public void deleteById(String id) throws IOException {
|
|
|
|
|
removeIdFromUpdateQueue(id);
|
|
|
|
|
this.idCache.delete(id);
|
|
|
|
|
this.idCache.remove(id);
|
|
|
|
|
if (this.deletionHandler.isAlive()) {
|
|
|
|
|
try {this.deleteQueue.put(id);} catch (final InterruptedException e) {}
|
|
|
|
|
} else {
|
|
|
|
@ -324,7 +328,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
public void deleteByIds(Collection<String> ids) throws IOException {
|
|
|
|
|
for (String id: ids) {
|
|
|
|
|
removeIdFromUpdateQueue(id);
|
|
|
|
|
this.idCache.delete(id);
|
|
|
|
|
this.idCache.remove(id);
|
|
|
|
|
}
|
|
|
|
|
if (this.deletionHandler.isAlive()) {
|
|
|
|
|
for (String id: ids) try {this.deleteQueue.put(id);} catch (final InterruptedException e) {}
|
|
|
|
@ -335,8 +339,8 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void deleteByQuery(final String querystring) throws IOException {
|
|
|
|
|
new Thread() {
|
|
|
|
|
public void run() {
|
|
|
|
|
//new Thread() {
|
|
|
|
|
// public void run() {
|
|
|
|
|
ConcurrentUpdateSolrConnector.this.idCache.clear();
|
|
|
|
|
try {
|
|
|
|
|
ConcurrentUpdateSolrConnector.this.connector.deleteByQuery(querystring);
|
|
|
|
@ -345,47 +349,30 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
ConcurrentLog.severe("ConcurrentUpdateSolrConnector", e.getMessage(), e);
|
|
|
|
|
}
|
|
|
|
|
ConcurrentUpdateSolrConnector.this.connector.commit(true);
|
|
|
|
|
}
|
|
|
|
|
}.start();
|
|
|
|
|
// }
|
|
|
|
|
//}.start();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean existsById(String id) throws IOException {
|
|
|
|
|
if (this.idCache.contains(id)) {cacheSuccessSign(); return true;}
|
|
|
|
|
if (existIdFromDeleteQueue(id)) {cacheSuccessSign(); return false;}
|
|
|
|
|
if (existIdFromUpdateQueue(id)) {cacheSuccessSign(); return true;}
|
|
|
|
|
if (this.connector.existsById(id)) {
|
|
|
|
|
updateIdCache(id);
|
|
|
|
|
return true;
|
|
|
|
|
public long getLoadTime(String id) throws IOException {
|
|
|
|
|
Long date = this.idCache.get(id);
|
|
|
|
|
if (date != null) {cacheSuccessSign(); return date.longValue();}
|
|
|
|
|
if (existIdFromDeleteQueue(id)) {cacheSuccessSign(); return -1;}
|
|
|
|
|
long d = existIdFromUpdateQueue(id);
|
|
|
|
|
if (d >= 0) {cacheSuccessSign(); return d;}
|
|
|
|
|
d = this.connector.getLoadTime(id);
|
|
|
|
|
if (d >= 0) {
|
|
|
|
|
updateIdCache(id, d);
|
|
|
|
|
return d;
|
|
|
|
|
}
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public Set<String> existsByIds(Set<String> ids) throws IOException {
|
|
|
|
|
HashSet<String> e = new HashSet<String>();
|
|
|
|
|
if (ids == null || ids.size() == 0) return e;
|
|
|
|
|
if (ids.size() == 1) return existsById(ids.iterator().next()) ? ids : e;
|
|
|
|
|
Set<String> idsC = new HashSet<String>();
|
|
|
|
|
for (String id: ids) {
|
|
|
|
|
if (this.idCache.contains(id)) {cacheSuccessSign(); e.add(id); continue;}
|
|
|
|
|
if (existIdFromDeleteQueue(id)) {cacheSuccessSign(); continue;}
|
|
|
|
|
if (existIdFromUpdateQueue(id)) {cacheSuccessSign(); e.add(id); continue;}
|
|
|
|
|
idsC.add(id);
|
|
|
|
|
}
|
|
|
|
|
Set<String> e1 = this.connector.existsByIds(idsC);
|
|
|
|
|
for (String id1: e1) {
|
|
|
|
|
updateIdCache(id1);
|
|
|
|
|
}
|
|
|
|
|
e.addAll(e1);
|
|
|
|
|
return e;
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void add(SolrInputDocument solrdoc) throws IOException, SolrException {
|
|
|
|
|
String id = (String) solrdoc.getFieldValue(CollectionSchema.id.getSolrFieldName());
|
|
|
|
|
removeIdFromDeleteQueue(id);
|
|
|
|
|
updateIdCache(id);
|
|
|
|
|
updateIdCache(id, AbstractSolrConnector.getLoadDate(solrdoc));
|
|
|
|
|
if (this.updateHandler.isAlive()) {
|
|
|
|
|
try {this.updateQueue.put(solrdoc);} catch (final InterruptedException e) {}
|
|
|
|
|
} else {
|
|
|
|
@ -398,7 +385,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
for (SolrInputDocument doc: solrdocs) {
|
|
|
|
|
String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
|
|
|
|
|
removeIdFromDeleteQueue(id);
|
|
|
|
|
updateIdCache(id);
|
|
|
|
|
updateIdCache(id, AbstractSolrConnector.getLoadDate(doc));
|
|
|
|
|
}
|
|
|
|
|
if (this.updateHandler.isAlive()) {
|
|
|
|
|
for (SolrInputDocument doc: solrdocs) try {this.updateQueue.put(doc);} catch (final InterruptedException e) {}
|
|
|
|
@ -406,14 +393,14 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
this.connector.add(solrdocs);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public SolrDocument getDocumentById(String id, String... fields) throws IOException {
|
|
|
|
|
public SolrDocument getDocumentById(final String id, String... fields) throws IOException {
|
|
|
|
|
if (existIdFromDeleteQueue(id)) return null;
|
|
|
|
|
SolrInputDocument idoc = getFromUpdateQueue(id);
|
|
|
|
|
if (idoc != null) {cacheSuccessSign(); return ClientUtils.toSolrDocument(idoc);}
|
|
|
|
|
SolrDocument doc = this.connector.getDocumentById(id, fields);
|
|
|
|
|
if (doc != null) updateIdCache(id);
|
|
|
|
|
SolrDocument doc = this.connector.getDocumentById(id, AbstractSolrConnector.ensureEssentialFieldsIncluded(fields));
|
|
|
|
|
if (doc != null) updateIdCache(id, AbstractSolrConnector.getLoadDate(doc));
|
|
|
|
|
return doc;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -436,7 +423,16 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public SolrDocumentList getDocumentListByQuery(String querystring, int offset, int count, String... fields) throws IOException, SolrException {
|
|
|
|
|
return this.connector.getDocumentListByQuery(querystring, offset, count, fields);
|
|
|
|
|
SolrDocumentList sdl = this.connector.getDocumentListByQuery(querystring, offset, count, AbstractSolrConnector.ensureEssentialFieldsIncluded(fields));
|
|
|
|
|
/*
|
|
|
|
|
Iterator<SolrDocument> i = sdl.iterator();
|
|
|
|
|
while (i.hasNext()) {
|
|
|
|
|
SolrDocument doc = i.next();
|
|
|
|
|
String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
|
|
|
|
|
if (doc != null) updateIdCache(id, AbstractSolrConnector.getLoadDate(doc));
|
|
|
|
|
}
|
|
|
|
|
*/
|
|
|
|
|
return sdl;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|