diff --git a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java index 1e743c47f..2185ed535 100644 --- a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java @@ -27,7 +27,6 @@ import java.util.Iterator; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import net.yacy.cora.sorting.ReversibleScoreMap; import net.yacy.cora.storage.ARC; @@ -57,117 +56,78 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { SolrConnector connector; - private final static String POISON_ID = "________"; - private final static SolrInputDocument POISON_DOCUMENT = new SolrInputDocument(); + private final static Object POISON_PROCESS = new Object(); - private class DeletionHandler implements Runnable { + private class ProcessHandler implements Runnable { @Override public void run() { - String id; try { - while ((id = ConcurrentUpdateSolrConnector.this.deleteQueue.take()) != POISON_ID) { - try { - removeIdFromUpdateQueue(id); - ConcurrentUpdateSolrConnector.this.connector.deleteById(id); - ConcurrentUpdateSolrConnector.this.metadataCache.remove(id); - } catch (final IOException e) { - ConcurrentLog.logException(e); + Object process; + Collection docs = new ArrayList(); + while ((process = ConcurrentUpdateSolrConnector.this.processQueue.take()) != POISON_PROCESS) { + + if (process instanceof String) { + // delete document + if (docs.size() > 0) addSynchronized(docs); + String id = (String) process; + try { + ConcurrentUpdateSolrConnector.this.connector.deleteById(id); + } catch (final IOException e) { + ConcurrentLog.logException(e); + } + } + + if (process instanceof SolrInputDocument) { + SolrInputDocument doc = (SolrInputDocument) process; + docs.add(doc); + } + + if (docs.size() > 0 && + (ConcurrentUpdateSolrConnector.this.processQueue.size() == 0 || + docs.size() >= ConcurrentUpdateSolrConnector.this.processQueue.size() + ConcurrentUpdateSolrConnector.this.processQueue.remainingCapacity())) { + addSynchronized(docs); } } } catch (final InterruptedException e) { ConcurrentLog.logException(e); } } - } - - private class UpdateHandler implements Runnable { - @Override - public void run() { - SolrInputDocument doc; + private void addSynchronized(final Collection docs) { + assert docs.size() > 0; try { - while ((doc = ConcurrentUpdateSolrConnector.this.updateQueue.take()) != POISON_DOCUMENT) { - int getmore = ConcurrentUpdateSolrConnector.this.updateQueue.size(); - if (getmore > 0) { - // accumulate a collection of documents because that is better to send at once to a remote server - Collection docs = new ArrayList(getmore + 1); - docs.add(doc); - String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); - Metadata md = AbstractSolrConnector.getMetadata(doc); - updateCache(id, md); - SolrInputDocument d; - while (getmore-- > 0 && (d = ConcurrentUpdateSolrConnector.this.updateQueue.poll()) != null) { - if (d == POISON_DOCUMENT) { - ConcurrentUpdateSolrConnector.this.updateQueue.put(POISON_DOCUMENT); // make sure that the outer loop terminates as well - break; - } - docs.add(d); - id = (String) d.getFieldValue(CollectionSchema.id.getSolrFieldName()); - md = AbstractSolrConnector.getMetadata(d); - updateCache(id, md); - } - //ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending " + docs.size() + " documents to solr"); - try { - ConcurrentUpdateSolrConnector.this.connector.add(docs); - } catch (final OutOfMemoryError e) { - // clear and try again... - clearCaches(); - try { - ConcurrentUpdateSolrConnector.this.connector.add(docs); - } catch (final IOException ee) { - ConcurrentLog.logException(e); - } - } catch (final IOException e) { - ConcurrentLog.logException(e); - } - } else { - // if there is only a single document, send this directly to solr - //ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending one document to solr"); - String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); - Metadata md = AbstractSolrConnector.getMetadata(doc); - updateCache(id, md); - try { - ConcurrentUpdateSolrConnector.this.connector.add(doc); - } catch (final OutOfMemoryError e) { - // clear and try again... - clearCaches(); - try { - ConcurrentUpdateSolrConnector.this.connector.add(doc); - } catch (final IOException ee) { - ConcurrentLog.logException(e); - } - } catch (final IOException e) { - ConcurrentLog.logException(e); - } - } + ConcurrentUpdateSolrConnector.this.connector.add(docs); + } catch (final OutOfMemoryError e) { + // clear and try again... + clearCaches(); + try { + ConcurrentUpdateSolrConnector.this.connector.add(docs); + } catch (final IOException ee) { + ConcurrentLog.logException(e); } - } catch (final InterruptedException e) { + } catch (final IOException e) { ConcurrentLog.logException(e); } + docs.clear(); } } private ARC metadataCache; private ARH missCache; - private BlockingQueue updateQueue; - private BlockingQueue deleteQueue; - private Thread deletionHandler; - private Thread[] updateHandler; + private BlockingQueue processQueue; + private Thread processHandler; - public ConcurrentUpdateSolrConnector(SolrConnector connector, int updateCapacity, int idCacheCapacity, int concurrency) { + public ConcurrentUpdateSolrConnector(final SolrConnector connector, final int updateCapacity, final int idCacheCapacity, final int concurrency) { this.connector = connector; this.metadataCache = new ConcurrentARC(idCacheCapacity, concurrency); this.missCache = new ConcurrentARH(idCacheCapacity, concurrency); - this.updateQueue = new ArrayBlockingQueue(updateCapacity); - this.deleteQueue = new LinkedBlockingQueue(); - this.deletionHandler = null; - this.updateHandler = null; - ensureAliveDeletionHandler(); - ensureAliveUpdateHandler(); + this.processQueue = new ArrayBlockingQueue(updateCapacity); + this.processHandler = null; + ensureAliveProcessHandler(); } @Override public int bufferSize() { - return this.updateQueue.size() + this.deleteQueue.size(); + return this.processQueue.size(); } @Override @@ -183,71 +143,51 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { private static void cacheSuccessSign() { //ConcurrentLog.info("ConcurrentUpdate", "**** cache hit"); } - - private boolean existIdFromDeleteQueue(String id) { - if (this.deleteQueue.size() == 0) return false; - Iterator i = this.deleteQueue.iterator(); - while (i.hasNext()) { - String docID = i.next(); - if (docID == null) break; - if (docID.equals(id)) return true; - } - return false; - } - private SolrInputDocument getFromUpdateQueue(String id) { - if (this.updateQueue.size() == 0) return null; - Iterator i = this.updateQueue.iterator(); - while (i.hasNext()) { - SolrInputDocument doc = i.next(); - if (doc == null) break; - String docID = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); - if (docID != null && docID.equals(id)) return doc; + private boolean containsDeleteInProcessQueue(final String id) { + boolean delete = false; + boolean ctch = false; + for (Object o: this.processQueue) { + if (o == null) break; + if (checkDelete(o, id)) delete = true; // do not add a break here! + if (checkAdd(o, id)) {delete = false; ctch = true;} // do not add a break here! } - return null; + if (ctch && delete) removeFromProcessQueue(id); // clean up put+remove + return delete; } - private Metadata existIdFromUpdateQueue(String id) { - if (this.updateQueue.size() == 0) return null; - Iterator i = this.updateQueue.iterator(); - while (i.hasNext()) { - SolrInputDocument doc = i.next(); - if (doc == null) break; - String docID = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); - if (docID != null && docID.equals(id)) { - return AbstractSolrConnector.getMetadata(doc); - } + private SolrInputDocument getFromProcessQueue(final String id) { + SolrInputDocument d = null; + boolean ctch = false; + for (Object o: this.processQueue) { + if (o == null) break; + if (checkDelete(o, id)) d = null; // do not add a break here! + if (checkAdd(o, id)) {d = (SolrInputDocument) o; ctch = true;} // do not add a break here! } - return null; + if (ctch && d == null) removeFromProcessQueue(id); // clean up put+remove + return d; } - private void removeIdFromUpdateQueue(String id) { - if (this.updateQueue.size() == 0) return; - Iterator i = this.updateQueue.iterator(); + private void removeFromProcessQueue(final String id) { + Iterator i = this.processQueue.iterator(); while (i.hasNext()) { - SolrInputDocument doc = i.next(); - if (doc == null) break; - String docID = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); - if (docID != null && docID.equals(id)) { - i.remove(); - break; - } + if (checkAdd(i.next(), id)) {i.remove(); break;} } } - private void removeIdFromDeleteQueue(String id) { - if (this.updateQueue.size() == 0) return; - Iterator i = this.deleteQueue.iterator(); - while (i.hasNext()) { - String docID = i.next(); - if (docID == null) break; - if (docID.equals(id)) { - i.remove(); - break; - } - } + private boolean checkDelete(final Object o, final String id) { + if (!(o instanceof String)) return false; + String docID = (String) o; + return (docID != null && docID.equals(id)); } + private boolean checkAdd(final Object o, final String id) { + if (!(o instanceof SolrInputDocument)) return false; + SolrInputDocument doc = (SolrInputDocument) o; + String docID = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); + return (docID != null && docID.equals(id)); + } + private void updateCache(final String id, final Metadata md) { if (id == null) return; if (MemoryControl.shortStatus()) { @@ -258,37 +198,14 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { this.missCache.delete(id); } - public void ensureAliveDeletionHandler() { - if (this.deletionHandler == null || !this.deletionHandler.isAlive()) { - this.deletionHandler = new Thread(new DeletionHandler()); - this.deletionHandler.setName(this.getClass().getName() + "_DeletionHandler"); - this.deletionHandler.start(); - } - } - - public void ensureAliveUpdateHandler() { - if (this.updateHandler == null) { - this.updateHandler = new Thread[1/*Runtime.getRuntime().availableProcessors()*/]; - } - for (int i = 0; i < this.updateHandler.length; i++) { - if (this.updateHandler[i] == null || !this.updateHandler[i].isAlive()) { - this.updateHandler[i] = new Thread(new UpdateHandler()); - this.updateHandler[i].setName(this.getClass().getName() + "_UpdateHandler_" + i); - this.updateHandler[i].start(); - } + public void ensureAliveProcessHandler() { + if (this.processHandler == null || !this.processHandler.isAlive()) { + this.processHandler = new Thread(new ProcessHandler()); + this.processHandler.setName(this.getClass().getName() + "_ProcessHandler"); + this.processHandler.start(); } } - public boolean anyUpdateHandlerAlive() { - if (this.updateHandler == null) { - return false; - } - for (int i = 0; i < this.updateHandler.length; i++) { - if (this.updateHandler[i] != null && this.updateHandler[i].isAlive()) return true; - } - return false; - } - @Override public Iterator iterator() { return this.connector.iterator(); @@ -296,19 +213,14 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public long getSize() { - return this.connector.getSize() + this.updateQueue.size(); + return this.connector.getSize() + this.processQueue.size(); } @Override public void commit(boolean softCommit) { long timeout = System.currentTimeMillis() + 1000; - ensureAliveDeletionHandler(); - while (this.deleteQueue.size() > 0) { - try {Thread.sleep(10);} catch (final InterruptedException e) {} - if (System.currentTimeMillis() > timeout) break; - } - ensureAliveUpdateHandler(); - while (this.updateQueue.size() > 0) { + ensureAliveProcessHandler(); + while (this.processQueue.size() > 0) { try {Thread.sleep(10);} catch (final InterruptedException e) {} if (System.currentTimeMillis() > timeout) break; } @@ -332,14 +244,9 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public void close() { - ensureAliveDeletionHandler(); - try {this.deleteQueue.put(POISON_ID);} catch (final InterruptedException e) {} - ensureAliveUpdateHandler(); - for (int i = 0; i < this.updateHandler.length; i++) { - try {this.updateQueue.put(POISON_DOCUMENT);} catch (final InterruptedException e) {} - } - try {this.deletionHandler.join();} catch (final InterruptedException e) {} - for (Thread t: this.updateHandler) try {t.join();} catch (final InterruptedException e) {} + ensureAliveProcessHandler(); + try {this.processQueue.put(POISON_PROCESS);} catch (final InterruptedException e) {} + try {this.processHandler.join();} catch (final InterruptedException e) {} this.connector.close(); this.metadataCache.clear(); this.connector = null; @@ -348,22 +255,18 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public void clear() throws IOException { - this.deleteQueue.clear(); - this.updateQueue.clear(); - try {this.updateQueue.put(POISON_DOCUMENT);} catch (final InterruptedException e) {} - for (Thread t: this.updateHandler) try {t.join();} catch (final InterruptedException e) {} + this.processQueue.clear(); this.connector.clear(); this.metadataCache.clear(); } @Override public synchronized void deleteById(String id) throws IOException { - removeIdFromUpdateQueue(id); this.metadataCache.remove(id); this.missCache.add(id); - ensureAliveDeletionHandler(); - if (this.deletionHandler.isAlive()) { - try {this.deleteQueue.put(id);} catch (final InterruptedException e) {} + ensureAliveProcessHandler(); + if (this.processHandler.isAlive()) { + try {this.processQueue.put(id);} catch (final InterruptedException e) {} } else { this.connector.deleteById(id); } @@ -372,13 +275,12 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public synchronized void deleteByIds(Collection ids) throws IOException { for (String id: ids) { - removeIdFromUpdateQueue(id); this.metadataCache.remove(id); this.missCache.add(id); } - ensureAliveDeletionHandler(); - if (this.deletionHandler.isAlive()) { - for (String id: ids) try {this.deleteQueue.put(id);} catch (final InterruptedException e) {} + ensureAliveProcessHandler(); + if (this.processHandler.isAlive()) { + for (String id: ids) try {this.processQueue.put(id);} catch (final InterruptedException e) {} } else { this.connector.deleteByIds(ids); } @@ -389,7 +291,6 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { try { ConcurrentUpdateSolrConnector.this.connector.deleteByQuery(querystring); ConcurrentUpdateSolrConnector.this.metadataCache.clear(); - ConcurrentUpdateSolrConnector.this.missCache.clear(); } catch (final IOException e) { ConcurrentLog.severe("ConcurrentUpdateSolrConnector", e.getMessage(), e); } @@ -401,22 +302,22 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { if (this.missCache.contains(id)) {cacheSuccessSign(); return null;} Metadata md = this.metadataCache.get(id); if (md != null) {cacheSuccessSign(); return md;} - if (existIdFromDeleteQueue(id)) {cacheSuccessSign(); return null;} - md = existIdFromUpdateQueue(id); - if (md != null) {cacheSuccessSign(); return md;} + if (containsDeleteInProcessQueue(id)) {cacheSuccessSign(); return null;} + SolrInputDocument doc = getFromProcessQueue(id); + if (doc != null) {cacheSuccessSign(); return AbstractSolrConnector.getMetadata(doc);} md = this.connector.getMetadata(id); if (md == null) {this.missCache.add(id); return null;} updateCache(id, md); return md; } - + @Override public void add(SolrInputDocument solrdoc) throws IOException, SolrException { String id = (String) solrdoc.getFieldValue(CollectionSchema.id.getSolrFieldName()); - removeIdFromDeleteQueue(id); updateCache(id, AbstractSolrConnector.getMetadata(solrdoc)); - if (anyUpdateHandlerAlive()) { - try {this.updateQueue.put(solrdoc);} catch (final InterruptedException e) {} + ensureAliveProcessHandler(); + if (this.processHandler.isAlive()) { + try {this.processQueue.put(solrdoc);} catch (final InterruptedException e) {} } else { this.connector.add(solrdoc); } @@ -426,11 +327,11 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { public void add(Collection solrdocs) throws IOException, SolrException { for (SolrInputDocument doc: solrdocs) { String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); - removeIdFromDeleteQueue(id); updateCache(id, AbstractSolrConnector.getMetadata(doc)); } - if (anyUpdateHandlerAlive()) { - for (SolrInputDocument doc: solrdocs) try {this.updateQueue.put(doc);} catch (final InterruptedException e) {} + ensureAliveProcessHandler(); + if (this.processHandler.isAlive()) { + for (SolrInputDocument doc: solrdocs) try {this.processQueue.put(doc);} catch (final InterruptedException e) {} } else { this.connector.add(solrdocs); } @@ -440,8 +341,8 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { public SolrDocument getDocumentById(final String id, String... fields) throws IOException { assert id.length() == Word.commonHashLength : "wrong id: " + id; if (this.missCache.contains(id)) return null; - if (existIdFromDeleteQueue(id)) return null; - SolrInputDocument idoc = getFromUpdateQueue(id); + if (containsDeleteInProcessQueue(id)) return null; + SolrInputDocument idoc = getFromProcessQueue(id); if (idoc != null) {cacheSuccessSign(); return ClientUtils.toSolrDocument(idoc);} SolrDocument doc = this.connector.getDocumentById(id, AbstractSolrConnector.ensureEssentialFieldsIncluded(fields)); if (doc == null) {