added partial updates to solr during postprocessing: during

postprocessing the solr documents are now not completely retrieved.
instead, only fiels, needed for the postprocessing are extracted. When
Solr document are written, this is done using partial updates.

This increases postprocessing speed by about 50% for embedded Solr
configurations. For external Solr configurations the enhancement should
be much higher because the postprocessing with remote Solr is very slow.
When doing partial updates to a remote Solr, this method should perform
much better than before, it is expected that this is even much higher
than the increase with local Solr.
pull/1/head
Michael Peter Christen 10 years ago
parent b1cfbc4a04
commit d80418f1b1

@ -506,8 +506,11 @@ public abstract class AbstractSolrConnector implements SolrConnector {
docOut.setField(CollectionSchema.id.name(), docIn.getFieldValue(CollectionSchema.id.name()));
for (Entry<String, SolrInputField> entry: docIn.entrySet()) {
if (entry.getKey().equals(CollectionSchema.id.name())) continue;
SolrInputField sif = entry.getValue();
Map<String, Object> partialUpdate = new HashMap<>(1);
partialUpdate.put("set", entry.getValue());
Object value = sif.getValue();
docOut.removeField(entry.getKey());
partialUpdate.put("set", value);
docOut.setField(entry.getKey(), partialUpdate);
}
return docOut;

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import net.yacy.cora.federate.solr.instance.EmbeddedInstance;
@ -261,11 +262,26 @@ public class EmbeddedSolrConnector extends SolrServerConnector implements SolrCo
return sdl;
}
/**
* The following schemaFieldCache is a hack-patch of a Solr internal request which is really slow.
* The Solr-internal method is flexible because it may respond on a real-time schema change, but that
* effectively never happens. In our case the schema declaration against Solr never changes.
*/
private final Map<String, SchemaField> schemaFieldCache = new ConcurrentHashMap<>();
private final SchemaField getSchemaField(final String fieldName) {
SchemaField sf = schemaFieldCache.get(fieldName);
if (sf == null) {
sf = this.core.getLatestSchema().getFieldOrNull(fieldName);
schemaFieldCache.put(fieldName, sf);
}
return sf;
}
public SolrDocument doc2SolrDoc(Document doc) {
SolrDocument solrDoc = new SolrDocument();
for (IndexableField field : doc) {
String fieldName = field.name();
SchemaField sf = this.core.getLatestSchema().getFieldOrNull(fieldName);
SchemaField sf = getSchemaField(fieldName); // hack-patch of this.core.getLatestSchema().getFieldOrNull(fieldName); makes it a lot faster!!
Object val = null;
try {
FieldType ft = null;

@ -208,9 +208,9 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen
@Override
public void add(final SolrInputDocument solrdoc) throws IOException, SolrException {
if (this.server == null) return;
if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict"
synchronized (this.server) {
try {
if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict"
this.server.add(solrdoc, -1);
} catch (final Throwable e) {
clearCaches(); // prevent further OOM if this was caused by OOM
@ -245,11 +245,11 @@ public abstract class SolrServerConnector extends AbstractSolrConnector implemen
@Override
public void add(final Collection<SolrInputDocument> solrdocs) throws IOException, SolrException {
if (this.server == null) return;
for (SolrInputDocument solrdoc : solrdocs) {
if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict"
}
synchronized (this.server) {
try {
for (SolrInputDocument solrdoc : solrdocs) {
if (solrdoc.containsKey("_version_")) solrdoc.setField("_version_",0L); // prevent Solr "version conflict"
}
this.server.add(solrdocs, -1);
} catch (final Throwable e) {
clearCaches(); // prevent further OOM if this was caused by OOM

@ -1148,7 +1148,12 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
final long count = segment.fulltext().getWebgraphConnector().getCountByQuery(patchquery);
int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4));
ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency);
final BlockingQueue<SolrDocument> docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(patchquery, WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true);
final BlockingQueue<SolrDocument> docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(
patchquery,
WebgraphSchema.source_chars_i.getSolrFieldName() + " asc",
0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true
// TODO: add field list and do partial updates
);
final AtomicInteger proccount = new AtomicInteger(0);
Thread[] t = new Thread[concurrency];
for (final AtomicInteger i = new AtomicInteger(0); i.get() < t.length; i.incrementAndGet()) {
@ -1256,7 +1261,25 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false
CollectionSchema.url_protocol_s.getSolrFieldName() + " asc" // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false
: null, // null sort is faster!
0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true);
0, 100000000, Long.MAX_VALUE, concurrency + 1, concurrency, true,
CollectionSchema.id.getSolrFieldName(),
CollectionSchema.sku.getSolrFieldName(),
CollectionSchema.harvestkey_s.getSolrFieldName(),
CollectionSchema.process_sxt.getSolrFieldName(),
CollectionSchema.canonical_equal_sku_b.getSolrFieldName(),
CollectionSchema.canonical_s.getSolrFieldName(),
CollectionSchema.exact_signature_l.getSolrFieldName(),
CollectionSchema.fuzzy_signature_l.getSolrFieldName(),
CollectionSchema.title_exact_signature_l.getSolrFieldName(),
CollectionSchema.description_exact_signature_l.getSolrFieldName(),
CollectionSchema.host_id_s.getSolrFieldName(),
CollectionSchema.host_s.getSolrFieldName(),
CollectionSchema.host_subdomain_s.getSolrFieldName(),
CollectionSchema.url_chars_i.getSolrFieldName(),
CollectionSchema.url_protocol_s.getSolrFieldName(),
CollectionSchema.httpstatus_i.getSolrFieldName(),
CollectionSchema.inboundlinkscount_i.getSolrFieldName(),
CollectionSchema.robots_i.getSolrFieldName());
final AtomicInteger proccount = new AtomicInteger();
final AtomicInteger proccount_referencechange = new AtomicInteger();
final AtomicInteger proccount_citationchange = new AtomicInteger();
@ -1282,8 +1305,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
try {
DigestURL url = new DigestURL(u, ASCII.getBytes(i));
byte[] id = url.hash();
SolrInputDocument sid = collection.toSolrInputDocument(doc, omitFields);
SolrInputDocument sid = new SolrInputDocument(); //collection.toSolrInputDocument(doc, omitFields);
sid.setField(CollectionSchema.id.getSolrFieldName(), i);
for (Object tag: proctags) try {
// switch over tag types
@ -1323,17 +1346,17 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri
}
// all processing steps checked, remove the processing and harvesting key
sid.removeField(CollectionSchema.process_sxt.getSolrFieldName());
sid.removeField(CollectionSchema.harvestkey_s.getSolrFieldName());
sid.setField(CollectionSchema.process_sxt.getSolrFieldName(), null); // setting this to null will cause a removal when doing a partial update
sid.setField(CollectionSchema.harvestkey_s.getSolrFieldName(), null);
// send back to index
//collectionConnector.deleteById(i);
collectionConnector.add(sid);
collectionConnector.update(sid);
int thiscount = proccount.incrementAndGet(); allcount.incrementAndGet();
long thiscount = proccount.incrementAndGet(); allcount.incrementAndGet();
if (thiscount % 100 == 0) {
postprocessingActivity = "postprocessed " + thiscount + " from " + count + " collection documents; " +
(thiscount * 60000 / (System.currentTimeMillis() - start)) + " ppm; " +
(thiscount * 60000L / (System.currentTimeMillis() - start)) + " ppm; " +
((System.currentTimeMillis() - start) * (count - thiscount) / thiscount / 60000) + " minutes remaining";
ConcurrentLog.info("CollectionConfiguration", postprocessingActivity);
}

Loading…
Cancel
Save