a concurrency enhancement which was not used because tests showed worse

indexing speed. I leave the code there since it may be useful in
SolrCloud environments.
pull/1/head
Michael Peter Christen 11 years ago
parent e644981697
commit 3cc5c0ffdd

@ -94,8 +94,8 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
Metadata md = AbstractSolrConnector.getMetadata(doc);
updateCache(id, md);
for (int i = 0; i < getmore; i++) {
SolrInputDocument d = ConcurrentUpdateSolrConnector.this.updateQueue.take();
SolrInputDocument d;
while (getmore-- > 0 && (d = ConcurrentUpdateSolrConnector.this.updateQueue.poll()) != null) {
if (d == POISON_DOCUMENT) {
ConcurrentUpdateSolrConnector.this.updateQueue.put(POISON_DOCUMENT); // make sure that the outer loop terminates as well
break;
@ -150,7 +150,8 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
private ARH<String> missCache;
private BlockingQueue<SolrInputDocument> updateQueue;
private BlockingQueue<String> deleteQueue;
private Thread deletionHandler, updateHandler;
private Thread deletionHandler;
private Thread[] updateHandler;
public ConcurrentUpdateSolrConnector(SolrConnector connector, int updateCapacity, int idCacheCapacity, int concurrency) {
this.connector = connector;
@ -266,11 +267,26 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
}
public void ensureAliveUpdateHandler() {
if (this.updateHandler == null || !this.updateHandler.isAlive()) {
this.updateHandler = new Thread(new UpdateHandler());
this.updateHandler.setName(this.getClass().getName() + "_UpdateHandler");
this.updateHandler.start();
if (this.updateHandler == null) {
this.updateHandler = new Thread[1/*Runtime.getRuntime().availableProcessors()*/];
}
for (int i = 0; i < this.updateHandler.length; i++) {
if (this.updateHandler[i] == null || !this.updateHandler[i].isAlive()) {
this.updateHandler[i] = new Thread(new UpdateHandler());
this.updateHandler[i].setName(this.getClass().getName() + "_UpdateHandler_" + i);
this.updateHandler[i].start();
}
}
}
public boolean anyUpdateHandlerAlive() {
if (this.updateHandler == null) {
return false;
}
for (int i = 0; i < this.updateHandler.length; i++) {
if (this.updateHandler[i] != null && this.updateHandler[i].isAlive()) return true;
}
return false;
}
@Override
@ -323,7 +339,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
ensureAliveUpdateHandler();
try {this.updateQueue.put(POISON_DOCUMENT);} catch (final InterruptedException e) {}
try {this.deletionHandler.join();} catch (final InterruptedException e) {}
try {this.updateHandler.join();} catch (final InterruptedException e) {}
for (Thread t: this.updateHandler) try {t.join();} catch (final InterruptedException e) {}
this.connector.close();
this.metadataCache.clear();
this.connector = null;
@ -335,7 +351,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
this.deleteQueue.clear();
this.updateQueue.clear();
try {this.updateQueue.put(POISON_DOCUMENT);} catch (final InterruptedException e) {}
try {this.updateHandler.join();} catch (final InterruptedException e) {}
for (Thread t: this.updateHandler) try {t.join();} catch (final InterruptedException e) {}
this.connector.clear();
this.metadataCache.clear();
}
@ -397,7 +413,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
String id = (String) solrdoc.getFieldValue(CollectionSchema.id.getSolrFieldName());
removeIdFromDeleteQueue(id);
updateCache(id, AbstractSolrConnector.getMetadata(solrdoc));
if (this.updateHandler.isAlive()) {
if (anyUpdateHandlerAlive()) {
try {this.updateQueue.put(solrdoc);} catch (final InterruptedException e) {}
} else {
this.connector.add(solrdoc);
@ -411,7 +427,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
removeIdFromDeleteQueue(id);
updateCache(id, AbstractSolrConnector.getMetadata(doc));
}
if (this.updateHandler.isAlive()) {
if (anyUpdateHandlerAlive()) {
for (SolrInputDocument doc: solrdocs) try {this.updateQueue.put(doc);} catch (final InterruptedException e) {}
} else {
this.connector.add(solrdocs);

Loading…
Cancel
Save