better robustness of Concurrent Solr Connector against update/deletion

thread failure
pull/1/head
Michael Peter Christen 12 years ago
parent f7f3e28c5e
commit f2c9b0b5f2

@ -65,12 +65,15 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
String id;
try {
while ((id = ConcurrentUpdateSolrConnector.this.deleteQueue.take()) != POISON_ID) {
removeIdFromUpdateQueue(id);
ConcurrentUpdateSolrConnector.this.connector.deleteById(id);
ConcurrentUpdateSolrConnector.this.idCache.remove(ASCII.getBytes(id));
try {
removeIdFromUpdateQueue(id);
ConcurrentUpdateSolrConnector.this.connector.deleteById(id);
ConcurrentUpdateSolrConnector.this.idCache.remove(ASCII.getBytes(id));
} catch (IOException e) {
Log.logException(e);
}
}
} catch (InterruptedException e) {
} catch (IOException e) {
Log.logException(e);
}
}
@ -82,11 +85,14 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
SolrInputDocument doc;
try {
while ((doc = ConcurrentUpdateSolrConnector.this.updateQueue.take()) != POISON_DOCUMENT) {
ConcurrentUpdateSolrConnector.this.connector.add(doc);
updateIdCache((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName())); // that should have been done earlier!
try {
updateIdCache((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()));
ConcurrentUpdateSolrConnector.this.connector.add(doc);
} catch (IOException e) {
Log.logException(e);
}
}
} catch (InterruptedException e) {
} catch (IOException e) {
Log.logException(e);
}
}
@ -200,7 +206,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
public void ensureAliveUpdateHandler() {
if (this.updateHandler == null || !this.updateHandler.isAlive()) {
this.updateHandler = new Thread(new UpdateHandler());
this.deletionHandler.setName(this.getClass().getName() + "_UpdateHandler");
this.updateHandler.setName(this.getClass().getName() + "_UpdateHandler");
this.updateHandler.start();
}
}
@ -262,12 +268,26 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public void deleteById(String id) throws IOException {
try {this.deleteQueue.put(id);} catch (InterruptedException e) {}
removeIdFromUpdateQueue(id);
this.idCache.remove(ASCII.getBytes(id));
if (this.deletionHandler.isAlive()) {
try {this.deleteQueue.put(id);} catch (InterruptedException e) {}
} else {
this.connector.deleteById(id);
}
}
@Override
public void deleteByIds(Collection<String> ids) throws IOException {
for (String id: ids) try {this.deleteQueue.put(id);} catch (InterruptedException e) {}
for (String id: ids) {
removeIdFromUpdateQueue(id);
this.idCache.remove(ASCII.getBytes(id));
}
if (this.deletionHandler.isAlive()) {
for (String id: ids) try {this.deleteQueue.put(id);} catch (InterruptedException e) {}
} else {
this.connector.deleteByIds(ids);
}
}
@Override
@ -308,13 +328,25 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
String id = (String) solrdoc.getFieldValue(CollectionSchema.id.getSolrFieldName());
removeIdFromDeleteQueue(id);
updateIdCache(id);
try {this.updateQueue.put(solrdoc);} catch (InterruptedException e) {}
if (this.updateHandler.isAlive()) {
try {this.updateQueue.put(solrdoc);} catch (InterruptedException e) {}
} else {
this.connector.add(solrdoc);
}
}
@Override
public void add(Collection<SolrInputDocument> solrdocs) throws IOException, SolrException {
for (SolrInputDocument doc: solrdocs) updateIdCache((String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()));
for (SolrInputDocument doc: solrdocs) try {this.updateQueue.put(doc);} catch (InterruptedException e) {}
for (SolrInputDocument doc: solrdocs) {
String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName());
removeIdFromDeleteQueue(id);
updateIdCache(id);
}
if (this.updateHandler.isAlive()) {
for (SolrInputDocument doc: solrdocs) try {this.updateQueue.put(doc);} catch (InterruptedException e) {}
} else {
this.connector.add(solrdocs);
}
}
@Override

Loading…
Cancel
Save