(again) full redesign of ConcurrentUpdateSolrConnector to remove

out-of-order transactions regarding add and delete operations. Now all
operations (add and delete) are executed concurrently in-order.
pull/1/head
orbiter 11 years ago
parent 1960aafd6c
commit 133d41386c

@ -27,7 +27,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.yacy.cora.sorting.ReversibleScoreMap;
import net.yacy.cora.storage.ARC;
@ -57,117 +56,78 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
SolrConnector connector;
private final static String POISON_ID = "________";
private final static SolrInputDocument POISON_DOCUMENT = new SolrInputDocument();
private final static Object POISON_PROCESS = new Object();
private class DeletionHandler implements Runnable {
private class ProcessHandler implements Runnable {
@Override
public void run() {
String id;
try {
while ((id = ConcurrentUpdateSolrConnector.this.deleteQueue.take()) != POISON_ID) {
Object process;
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
while ((process = ConcurrentUpdateSolrConnector.this.processQueue.take()) != POISON_PROCESS) {
if (process instanceof String) {
// delete document
if (docs.size() > 0) addSynchronized(docs);
String id = (String) process;
try {
removeIdFromUpdateQueue(id);
ConcurrentUpdateSolrConnector.this.connector.deleteById(id);
ConcurrentUpdateSolrConnector.this.metadataCache.remove(id);
} catch (final IOException e) {
ConcurrentLog.logException(e);
}
}
} catch (final InterruptedException e) {
ConcurrentLog.logException(e);
}
}
}
private class UpdateHandler implements Runnable {
@Override
public void run() {
SolrInputDocument doc;
try {
while ((doc = ConcurrentUpdateSolrConnector.this.updateQueue.take()) != POISON_DOCUMENT) {
int getmore = ConcurrentUpdateSolrConnector.this.updateQueue.size();
if (getmore > 0) {
// accumulate a collection of documents because that is better to send at once to a remote server
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(getmore + 1);
if (process instanceof SolrInputDocument) {
SolrInputDocument doc = (SolrInputDocument) process;
docs.add(doc);
String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
Metadata md = AbstractSolrConnector.getMetadata(doc);
updateCache(id, md);
SolrInputDocument d;
while (getmore-- > 0 && (d = ConcurrentUpdateSolrConnector.this.updateQueue.poll()) != null) {
if (d == POISON_DOCUMENT) {
ConcurrentUpdateSolrConnector.this.updateQueue.put(POISON_DOCUMENT); // make sure that the outer loop terminates as well
break;
}
docs.add(d);
id = (String) d.getFieldValue(CollectionSchema.id.getSolrFieldName());
md = AbstractSolrConnector.getMetadata(d);
updateCache(id, md);
}
//ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending " + docs.size() + " documents to solr");
try {
ConcurrentUpdateSolrConnector.this.connector.add(docs);
} catch (final OutOfMemoryError e) {
// clear and try again...
clearCaches();
try {
ConcurrentUpdateSolrConnector.this.connector.add(docs);
} catch (final IOException ee) {
ConcurrentLog.logException(e);
if (docs.size() > 0 &&
(ConcurrentUpdateSolrConnector.this.processQueue.size() == 0 ||
docs.size() >= ConcurrentUpdateSolrConnector.this.processQueue.size() + ConcurrentUpdateSolrConnector.this.processQueue.remainingCapacity())) {
addSynchronized(docs);
}
} catch (final IOException e) {
}
} catch (final InterruptedException e) {
ConcurrentLog.logException(e);
}
} else {
// if there is only a single document, send this directly to solr
//ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending one document to solr");
String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
Metadata md = AbstractSolrConnector.getMetadata(doc);
updateCache(id, md);
}
private void addSynchronized(final Collection<SolrInputDocument> docs) {
assert docs.size() > 0;
try {
ConcurrentUpdateSolrConnector.this.connector.add(doc);
ConcurrentUpdateSolrConnector.this.connector.add(docs);
} catch (final OutOfMemoryError e) {
// clear and try again...
clearCaches();
try {
ConcurrentUpdateSolrConnector.this.connector.add(doc);
ConcurrentUpdateSolrConnector.this.connector.add(docs);
} catch (final IOException ee) {
ConcurrentLog.logException(e);
}
} catch (final IOException e) {
ConcurrentLog.logException(e);
}
}
}
} catch (final InterruptedException e) {
ConcurrentLog.logException(e);
}
docs.clear();
}
}
private ARC<String, Metadata> metadataCache;
private ARH<String> missCache;
private BlockingQueue<SolrInputDocument> updateQueue;
private BlockingQueue<String> deleteQueue;
private Thread deletionHandler;
private Thread[] updateHandler;
private BlockingQueue<Object> processQueue;
private Thread processHandler;
public ConcurrentUpdateSolrConnector(SolrConnector connector, int updateCapacity, int idCacheCapacity, int concurrency) {
public ConcurrentUpdateSolrConnector(final SolrConnector connector, final int updateCapacity, final int idCacheCapacity, final int concurrency) {
this.connector = connector;
this.metadataCache = new ConcurrentARC<String, Metadata>(idCacheCapacity, concurrency);
this.missCache = new ConcurrentARH<String>(idCacheCapacity, concurrency);
this.updateQueue = new ArrayBlockingQueue<SolrInputDocument>(updateCapacity);
this.deleteQueue = new LinkedBlockingQueue<String>();
this.deletionHandler = null;
this.updateHandler = null;
ensureAliveDeletionHandler();
ensureAliveUpdateHandler();
this.processQueue = new ArrayBlockingQueue<Object>(updateCapacity);
this.processHandler = null;
ensureAliveProcessHandler();
}
@Override
public int bufferSize() {
return this.updateQueue.size() + this.deleteQueue.size();
return this.processQueue.size();
}
@Override
@ -184,68 +144,48 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
//ConcurrentLog.info("ConcurrentUpdate", "**** cache hit");
}
private boolean existIdFromDeleteQueue(String id) {
if (this.deleteQueue.size() == 0) return false;
Iterator<String> i = this.deleteQueue.iterator();
while (i.hasNext()) {
String docID = i.next();
if (docID == null) break;
if (docID.equals(id)) return true;
private boolean containsDeleteInProcessQueue(final String id) {
boolean delete = false;
boolean ctch = false;
for (Object o: this.processQueue) {
if (o == null) break;
if (checkDelete(o, id)) delete = true; // do not add a break here!
if (checkAdd(o, id)) {delete = false; ctch = true;} // do not add a break here!
}
return false;
if (ctch && delete) removeFromProcessQueue(id); // clean up put+remove
return delete;
}
private SolrInputDocument getFromUpdateQueue(String id) {
if (this.updateQueue.size() == 0) return null;
Iterator<SolrInputDocument> i = this.updateQueue.iterator();
while (i.hasNext()) {
SolrInputDocument doc = i.next();
if (doc == null) break;
String docID = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
if (docID != null && docID.equals(id)) return doc;
private SolrInputDocument getFromProcessQueue(final String id) {
SolrInputDocument d = null;
boolean ctch = false;
for (Object o: this.processQueue) {
if (o == null) break;
if (checkDelete(o, id)) d = null; // do not add a break here!
if (checkAdd(o, id)) {d = (SolrInputDocument) o; ctch = true;} // do not add a break here!
}
return null;
if (ctch && d == null) removeFromProcessQueue(id); // clean up put+remove
return d;
}
private Metadata existIdFromUpdateQueue(String id) {
if (this.updateQueue.size() == 0) return null;
Iterator<SolrInputDocument> i = this.updateQueue.iterator();
private void removeFromProcessQueue(final String id) {
Iterator<Object> i = this.processQueue.iterator();
while (i.hasNext()) {
SolrInputDocument doc = i.next();
if (doc == null) break;
String docID = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
if (docID != null && docID.equals(id)) {
return AbstractSolrConnector.getMetadata(doc);
if (checkAdd(i.next(), id)) {i.remove(); break;}
}
}
return null;
}
private void removeIdFromUpdateQueue(String id) {
if (this.updateQueue.size() == 0) return;
Iterator<SolrInputDocument> i = this.updateQueue.iterator();
while (i.hasNext()) {
SolrInputDocument doc = i.next();
if (doc == null) break;
String docID = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
if (docID != null && docID.equals(id)) {
i.remove();
break;
}
}
private boolean checkDelete(final Object o, final String id) {
if (!(o instanceof String)) return false;
String docID = (String) o;
return (docID != null && docID.equals(id));
}
private void removeIdFromDeleteQueue(String id) {
if (this.updateQueue.size() == 0) return;
Iterator<String> i = this.deleteQueue.iterator();
while (i.hasNext()) {
String docID = i.next();
if (docID == null) break;
if (docID.equals(id)) {
i.remove();
break;
}
}
private boolean checkAdd(final Object o, final String id) {
if (!(o instanceof SolrInputDocument)) return false;
SolrInputDocument doc = (SolrInputDocument) o;
String docID = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
return (docID != null && docID.equals(id));
}
private void updateCache(final String id, final Metadata md) {
@ -258,37 +198,14 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
this.missCache.delete(id);
}
public void ensureAliveDeletionHandler() {
if (this.deletionHandler == null || !this.deletionHandler.isAlive()) {
this.deletionHandler = new Thread(new DeletionHandler());
this.deletionHandler.setName(this.getClass().getName() + "_DeletionHandler");
this.deletionHandler.start();
}
}
public void ensureAliveUpdateHandler() {
if (this.updateHandler == null) {
this.updateHandler = new Thread[1/*Runtime.getRuntime().availableProcessors()*/];
}
for (int i = 0; i < this.updateHandler.length; i++) {
if (this.updateHandler[i] == null || !this.updateHandler[i].isAlive()) {
this.updateHandler[i] = new Thread(new UpdateHandler());
this.updateHandler[i].setName(this.getClass().getName() + "_UpdateHandler_" + i);
this.updateHandler[i].start();
}
public void ensureAliveProcessHandler() {
if (this.processHandler == null || !this.processHandler.isAlive()) {
this.processHandler = new Thread(new ProcessHandler());
this.processHandler.setName(this.getClass().getName() + "_ProcessHandler");
this.processHandler.start();
}
}
public boolean anyUpdateHandlerAlive() {
if (this.updateHandler == null) {
return false;
}
for (int i = 0; i < this.updateHandler.length; i++) {
if (this.updateHandler[i] != null && this.updateHandler[i].isAlive()) return true;
}
return false;
}
@Override
public Iterator<String> iterator() {
return this.connector.iterator();
@ -296,19 +213,14 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public long getSize() {
return this.connector.getSize() + this.updateQueue.size();
return this.connector.getSize() + this.processQueue.size();
}
@Override
public void commit(boolean softCommit) {
long timeout = System.currentTimeMillis() + 1000;
ensureAliveDeletionHandler();
while (this.deleteQueue.size() > 0) {
try {Thread.sleep(10);} catch (final InterruptedException e) {}
if (System.currentTimeMillis() > timeout) break;
}
ensureAliveUpdateHandler();
while (this.updateQueue.size() > 0) {
ensureAliveProcessHandler();
while (this.processQueue.size() > 0) {
try {Thread.sleep(10);} catch (final InterruptedException e) {}
if (System.currentTimeMillis() > timeout) break;
}
@ -332,14 +244,9 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public void close() {
ensureAliveDeletionHandler();
try {this.deleteQueue.put(POISON_ID);} catch (final InterruptedException e) {}
ensureAliveUpdateHandler();
for (int i = 0; i < this.updateHandler.length; i++) {
try {this.updateQueue.put(POISON_DOCUMENT);} catch (final InterruptedException e) {}
}
try {this.deletionHandler.join();} catch (final InterruptedException e) {}
for (Thread t: this.updateHandler) try {t.join();} catch (final InterruptedException e) {}
ensureAliveProcessHandler();
try {this.processQueue.put(POISON_PROCESS);} catch (final InterruptedException e) {}
try {this.processHandler.join();} catch (final InterruptedException e) {}
this.connector.close();
this.metadataCache.clear();
this.connector = null;
@ -348,22 +255,18 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public void clear() throws IOException {
this.deleteQueue.clear();
this.updateQueue.clear();
try {this.updateQueue.put(POISON_DOCUMENT);} catch (final InterruptedException e) {}
for (Thread t: this.updateHandler) try {t.join();} catch (final InterruptedException e) {}
this.processQueue.clear();
this.connector.clear();
this.metadataCache.clear();
}
@Override
public synchronized void deleteById(String id) throws IOException {
removeIdFromUpdateQueue(id);
this.metadataCache.remove(id);
this.missCache.add(id);
ensureAliveDeletionHandler();
if (this.deletionHandler.isAlive()) {
try {this.deleteQueue.put(id);} catch (final InterruptedException e) {}
ensureAliveProcessHandler();
if (this.processHandler.isAlive()) {
try {this.processQueue.put(id);} catch (final InterruptedException e) {}
} else {
this.connector.deleteById(id);
}
@ -372,13 +275,12 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public synchronized void deleteByIds(Collection<String> ids) throws IOException {
for (String id: ids) {
removeIdFromUpdateQueue(id);
this.metadataCache.remove(id);
this.missCache.add(id);
}
ensureAliveDeletionHandler();
if (this.deletionHandler.isAlive()) {
for (String id: ids) try {this.deleteQueue.put(id);} catch (final InterruptedException e) {}
ensureAliveProcessHandler();
if (this.processHandler.isAlive()) {
for (String id: ids) try {this.processQueue.put(id);} catch (final InterruptedException e) {}
} else {
this.connector.deleteByIds(ids);
}
@ -389,7 +291,6 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
try {
ConcurrentUpdateSolrConnector.this.connector.deleteByQuery(querystring);
ConcurrentUpdateSolrConnector.this.metadataCache.clear();
ConcurrentUpdateSolrConnector.this.missCache.clear();
} catch (final IOException e) {
ConcurrentLog.severe("ConcurrentUpdateSolrConnector", e.getMessage(), e);
}
@ -401,9 +302,9 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
if (this.missCache.contains(id)) {cacheSuccessSign(); return null;}
Metadata md = this.metadataCache.get(id);
if (md != null) {cacheSuccessSign(); return md;}
if (existIdFromDeleteQueue(id)) {cacheSuccessSign(); return null;}
md = existIdFromUpdateQueue(id);
if (md != null) {cacheSuccessSign(); return md;}
if (containsDeleteInProcessQueue(id)) {cacheSuccessSign(); return null;}
SolrInputDocument doc = getFromProcessQueue(id);
if (doc != null) {cacheSuccessSign(); return AbstractSolrConnector.getMetadata(doc);}
md = this.connector.getMetadata(id);
if (md == null) {this.missCache.add(id); return null;}
updateCache(id, md);
@ -413,10 +314,10 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public void add(SolrInputDocument solrdoc) throws IOException, SolrException {
String id = (String) solrdoc.getFieldValue(CollectionSchema.id.getSolrFieldName());
removeIdFromDeleteQueue(id);
updateCache(id, AbstractSolrConnector.getMetadata(solrdoc));
if (anyUpdateHandlerAlive()) {
try {this.updateQueue.put(solrdoc);} catch (final InterruptedException e) {}
ensureAliveProcessHandler();
if (this.processHandler.isAlive()) {
try {this.processQueue.put(solrdoc);} catch (final InterruptedException e) {}
} else {
this.connector.add(solrdoc);
}
@ -426,11 +327,11 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
public void add(Collection<SolrInputDocument> solrdocs) throws IOException, SolrException {
for (SolrInputDocument doc: solrdocs) {
String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
removeIdFromDeleteQueue(id);
updateCache(id, AbstractSolrConnector.getMetadata(doc));
}
if (anyUpdateHandlerAlive()) {
for (SolrInputDocument doc: solrdocs) try {this.updateQueue.put(doc);} catch (final InterruptedException e) {}
ensureAliveProcessHandler();
if (this.processHandler.isAlive()) {
for (SolrInputDocument doc: solrdocs) try {this.processQueue.put(doc);} catch (final InterruptedException e) {}
} else {
this.connector.add(solrdocs);
}
@ -440,8 +341,8 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
public SolrDocument getDocumentById(final String id, String... fields) throws IOException {
assert id.length() == Word.commonHashLength : "wrong id: " + id;
if (this.missCache.contains(id)) return null;
if (existIdFromDeleteQueue(id)) return null;
SolrInputDocument idoc = getFromUpdateQueue(id);
if (containsDeleteInProcessQueue(id)) return null;
SolrInputDocument idoc = getFromProcessQueue(id);
if (idoc != null) {cacheSuccessSign(); return ClientUtils.toSolrDocument(idoc);}
SolrDocument doc = this.connector.getDocumentById(id, AbstractSolrConnector.ensureEssentialFieldsIncluded(fields));
if (doc == null) {

Loading…
Cancel
Save