From cfb647db6ef0f29e830a2eb6be8f914837b654d0 Mon Sep 17 00:00:00 2001 From: orbiter Date: Mon, 24 Feb 2014 23:42:50 +0100 Subject: [PATCH] - introduced a miss cache in ConcurrentUpdateSolrConnector - better usage of cache - bugfix for postprocessing --- .../federate/solr/SchemaConfiguration.java | 6 +- .../ConcurrentUpdateSolrConnector.java | 83 +++++++++++-------- source/net/yacy/search/index/Fulltext.java | 12 +-- .../schema/CollectionConfiguration.java | 5 +- 4 files changed, 57 insertions(+), 49 deletions(-) diff --git a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java index aa6a9ddf6..1b2f832ab 100644 --- a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java +++ b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputField; @@ -157,8 +158,9 @@ public class SchemaConfiguration extends Configuration implements Serializable { continue uniquecheck; } try { - final SolrDocument doc = segment.fulltext().getDefaultConnector().getDocumentById(CollectionSchema.host_id_s + ":\"" + hostid + "\" AND " + signaturefield.getSolrFieldName() + ":\"" + checkhash.toString() + "\""); - if (doc != null) { + final SolrDocumentList docs = segment.fulltext().getDefaultConnector().getDocumentListByQuery(CollectionSchema.host_id_s + ":\"" + hostid + "\" AND " + signaturefield.getSolrFieldName() + ":\"" + checkhash.toString() + "\"", 0, 1); + if (docs != null && !docs.isEmpty()) { + SolrDocument doc = docs.get(0); // switch unique attribute in new document sid.setField(uniquefield.getSolrFieldName(), false); // switch attribute in existing document diff --git a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java index 113f8cb06..5ebb3c7c2 100644 --- a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java @@ -31,7 +31,9 @@ import java.util.concurrent.LinkedBlockingQueue; import net.yacy.cora.sorting.ReversibleScoreMap; import net.yacy.cora.storage.ARC; +import net.yacy.cora.storage.ARH; import net.yacy.cora.storage.ConcurrentARC; +import net.yacy.cora.storage.ConcurrentARH; import net.yacy.cora.util.ConcurrentLog; import net.yacy.kelondro.util.MemoryControl; import net.yacy.search.schema.CollectionSchema; @@ -66,7 +68,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { try { removeIdFromUpdateQueue(id); ConcurrentUpdateSolrConnector.this.connector.deleteById(id); - ConcurrentUpdateSolrConnector.this.idCache.remove(id); + ConcurrentUpdateSolrConnector.this.metadataCache.remove(id); } catch (final IOException e) { ConcurrentLog.logException(e); } @@ -90,7 +92,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { docs.add(doc); String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); Metadata md = AbstractSolrConnector.getMetadata(doc); - updateIdCache(id, md); + updateCache(id, md); for (int i = 0; i < getmore; i++) { SolrInputDocument d = ConcurrentUpdateSolrConnector.this.updateQueue.take(); if (d == POISON_DOCUMENT) { @@ -100,7 +102,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { docs.add(d); id = (String) d.getFieldValue(CollectionSchema.id.getSolrFieldName()); md = AbstractSolrConnector.getMetadata(d); - updateIdCache(id, md); + updateCache(id, md); } //ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending " + docs.size() + " documents to solr"); try { @@ -113,7 +115,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { //ConcurrentLog.info("ConcurrentUpdateSolrConnector", "sending one document to solr"); String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); Metadata md = AbstractSolrConnector.getMetadata(doc); - updateIdCache(id, md); + updateCache(id, md); try { ConcurrentUpdateSolrConnector.this.connector.add(doc); } catch (final OutOfMemoryError e) { @@ -135,14 +137,16 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { } } - private ARC idCache; + private ARC metadataCache; + private ARH missCache; private BlockingQueue updateQueue; private BlockingQueue deleteQueue; private Thread deletionHandler, updateHandler; public ConcurrentUpdateSolrConnector(SolrConnector connector, int updateCapacity, int idCacheCapacity, int concurrency) { this.connector = connector; - this.idCache = new ConcurrentARC(idCacheCapacity, concurrency); // url hash to load time + this.metadataCache = new ConcurrentARC(idCacheCapacity, concurrency); + this.missCache = new ConcurrentARH(idCacheCapacity, concurrency); this.updateQueue = new ArrayBlockingQueue(updateCapacity); this.deleteQueue = new LinkedBlockingQueue(); this.deletionHandler = null; @@ -159,7 +163,8 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public void clearCaches() { this.connector.clearCaches(); - this.idCache.clear(); + this.metadataCache.clear(); + this.missCache.clear(); } /** @@ -233,10 +238,14 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { } } - private void updateIdCache(final String id, final Metadata md) { + private void updateCache(final String id, final Metadata md) { if (id == null) return; - if (MemoryControl.shortStatus()) this.idCache.clear(); - this.idCache.put(id, md); + if (MemoryControl.shortStatus()) { + this.metadataCache.clear(); + this.missCache.clear(); + } + this.metadataCache.put(id, md); + this.missCache.delete(id); } public void ensureAliveDeletionHandler() { @@ -307,9 +316,9 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { try {this.deletionHandler.join();} catch (final InterruptedException e) {} try {this.updateHandler.join();} catch (final InterruptedException e) {} this.connector.close(); - this.idCache.clear(); + this.metadataCache.clear(); this.connector = null; - this.idCache = null; + this.metadataCache = null; } @Override @@ -319,13 +328,14 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { try {this.updateQueue.put(POISON_DOCUMENT);} catch (final InterruptedException e) {} try {this.updateHandler.join();} catch (final InterruptedException e) {} this.connector.clear(); - this.idCache.clear(); + this.metadataCache.clear(); } @Override - public void deleteById(String id) throws IOException { + public synchronized void deleteById(String id) throws IOException { removeIdFromUpdateQueue(id); - this.idCache.remove(id); + this.metadataCache.remove(id); + this.missCache.add(id); if (this.deletionHandler.isAlive()) { try {this.deleteQueue.put(id);} catch (final InterruptedException e) {} } else { @@ -334,10 +344,11 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { } @Override - public void deleteByIds(Collection ids) throws IOException { + public synchronized void deleteByIds(Collection ids) throws IOException { for (String id: ids) { removeIdFromUpdateQueue(id); - this.idCache.remove(id); + this.metadataCache.remove(id); + this.missCache.add(id); } if (this.deletionHandler.isAlive()) { for (String id: ids) try {this.deleteQueue.put(id);} catch (final InterruptedException e) {} @@ -348,30 +359,27 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public void deleteByQuery(final String querystring) throws IOException { - //new Thread() { - // public void run() { - ConcurrentUpdateSolrConnector.this.idCache.clear(); - try { - ConcurrentUpdateSolrConnector.this.connector.deleteByQuery(querystring); - ConcurrentUpdateSolrConnector.this.idCache.clear(); - } catch (final IOException e) { - ConcurrentLog.severe("ConcurrentUpdateSolrConnector", e.getMessage(), e); - } - ConcurrentUpdateSolrConnector.this.connector.commit(true); - // } - //}.start(); + try { + ConcurrentUpdateSolrConnector.this.connector.deleteByQuery(querystring); + ConcurrentUpdateSolrConnector.this.metadataCache.clear(); + ConcurrentUpdateSolrConnector.this.missCache.clear(); + } catch (final IOException e) { + ConcurrentLog.severe("ConcurrentUpdateSolrConnector", e.getMessage(), e); + } + ConcurrentUpdateSolrConnector.this.connector.commit(true); } @Override public Metadata getMetadata(String id) throws IOException { - Metadata md = this.idCache.get(id); + if (this.missCache.contains(id)) {cacheSuccessSign(); return null;} + Metadata md = this.metadataCache.get(id); if (md != null) {cacheSuccessSign(); return md;} if (existIdFromDeleteQueue(id)) {cacheSuccessSign(); return null;} md = existIdFromUpdateQueue(id); if (md != null) {cacheSuccessSign(); return md;} md = this.connector.getMetadata(id); - if (md == null) return null; - updateIdCache(id, md); + if (md == null) {this.missCache.add(id); return null;} + updateCache(id, md); return md; } @@ -379,7 +387,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { public void add(SolrInputDocument solrdoc) throws IOException, SolrException { String id = (String) solrdoc.getFieldValue(CollectionSchema.id.getSolrFieldName()); removeIdFromDeleteQueue(id); - updateIdCache(id, AbstractSolrConnector.getMetadata(solrdoc)); + updateCache(id, AbstractSolrConnector.getMetadata(solrdoc)); if (this.updateHandler.isAlive()) { try {this.updateQueue.put(solrdoc);} catch (final InterruptedException e) {} } else { @@ -392,7 +400,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { for (SolrInputDocument doc: solrdocs) { String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); removeIdFromDeleteQueue(id); - updateIdCache(id, AbstractSolrConnector.getMetadata(doc)); + updateCache(id, AbstractSolrConnector.getMetadata(doc)); } if (this.updateHandler.isAlive()) { for (SolrInputDocument doc: solrdocs) try {this.updateQueue.put(doc);} catch (final InterruptedException e) {} @@ -403,11 +411,16 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public SolrDocument getDocumentById(final String id, String... fields) throws IOException { + if (this.missCache.contains(id)) return null; if (existIdFromDeleteQueue(id)) return null; SolrInputDocument idoc = getFromUpdateQueue(id); if (idoc != null) {cacheSuccessSign(); return ClientUtils.toSolrDocument(idoc);} SolrDocument doc = this.connector.getDocumentById(id, AbstractSolrConnector.ensureEssentialFieldsIncluded(fields)); - if (doc != null) updateIdCache(id, AbstractSolrConnector.getMetadata(doc)); + if (doc == null) { + this.missCache.add(id); + } else { + updateCache(id, AbstractSolrConnector.getMetadata(doc)); + } return doc; } diff --git a/source/net/yacy/search/index/Fulltext.java b/source/net/yacy/search/index/Fulltext.java index d679a91e8..63954f37c 100644 --- a/source/net/yacy/search/index/Fulltext.java +++ b/source/net/yacy/search/index/Fulltext.java @@ -332,8 +332,8 @@ public final class Fulltext { String id = ASCII.String(idb); try { // because node entries are richer than metadata entries we must check if they exist to prevent that they are overwritten - SolrDocument sd = this.getDefaultConnector().getDocumentById(id); - if (sd == null || (new URIMetadataNode(sd)).isOlder(entry)) { + long date = this.getLoadTime(id); + if (date < entry.loaddate().getTime()) { putDocument(getDefaultConfiguration().metadata2solr(entry)); } } catch (final SolrException e) { @@ -496,14 +496,6 @@ public final class Fulltext { } return -1l; } - - public String failReason(final String urlHash) throws IOException { - if (urlHash == null) return null; - SolrDocument doc = this.getDefaultConnector().getDocumentById(urlHash, CollectionSchema.failreason_s.getSolrFieldName()); - Object reason = doc == null ? null : doc.getFieldValue(CollectionSchema.failreason_s.getSolrFieldName()); - if (reason == null) return null; - return reason instanceof String && ((String) reason).length() == 0 ? null : (String) reason; - } public List dumpFiles() { EmbeddedInstance esc = this.solrInstances.getEmbedded(); diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index 2ae516356..b50c6bd0f 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -56,6 +56,7 @@ import net.yacy.cora.federate.solr.ProcessType; import net.yacy.cora.federate.solr.SchemaDeclaration; import net.yacy.cora.federate.solr.connector.AbstractSolrConnector; import net.yacy.cora.federate.solr.connector.SolrConnector; +import net.yacy.cora.federate.solr.connector.SolrConnector.Metadata; import net.yacy.cora.order.Base64Order; import net.yacy.cora.protocol.Domains; import net.yacy.cora.protocol.HeaderFramework; @@ -1234,8 +1235,8 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri for (Map.Entry entry: rm.entrySet()) { if (entry == null || entry.getValue() == null) continue; try { - String url = (String) connector.getDocumentById(ASCII.String(entry.getKey()), CollectionSchema.sku.getSolrFieldName()).getFieldValue(CollectionSchema.sku.getSolrFieldName()); - ConcurrentLog.info("CollectionConfiguration", "CR for " + url); + Metadata md = connector.getMetadata(ASCII.String(entry.getKey())); + ConcurrentLog.info("CollectionConfiguration", "CR for " + md.url); ConcurrentLog.info("CollectionConfiguration", ">> " + entry.getValue().toString()); } catch (final IOException e) { ConcurrentLog.logException(e);