From 8ad41a882c3b3e6d15289e3bbff0a92730f85620 Mon Sep 17 00:00:00 2001 From: Michael Peter Christen Date: Thu, 29 May 2014 13:24:24 +0200 Subject: [PATCH] fixed several problems with postprocessing: - unique-postprocessing was destroying results from other postprocessings; removed cross-updates as they had been not necessary - unique-postprocessing did not restrict on same protocol - inefficient concurrent update cache was redesigned completely - increased limits for concurrent blocking queues to prevent early time-out --- htroot/Vocabulary_p.java | 2 +- .../federate/solr/SchemaConfiguration.java | 11 +- .../solr/connector/AbstractSolrConnector.java | 7 +- .../ConcurrentUpdateSolrConnector.java | 276 ++++++++---------- .../solr/instance/InstanceMirror.java | 13 +- .../solr/instance/RemoteInstance.java | 2 +- source/net/yacy/search/index/Fulltext.java | 6 +- source/net/yacy/search/index/Segment.java | 2 +- .../schema/CollectionConfiguration.java | 15 +- .../yacy/search/schema/HyperlinkGraph.java | 2 +- 10 files changed, 146 insertions(+), 190 deletions(-) diff --git a/htroot/Vocabulary_p.java b/htroot/Vocabulary_p.java index d660fa10b..dccffef60 100644 --- a/htroot/Vocabulary_p.java +++ b/htroot/Vocabulary_p.java @@ -72,7 +72,7 @@ public class Vocabulary_p { Segment segment = sb.index; String t; if (!discoverNot) { - Iterator ui = segment.urlSelector(discoveruri, 600000L, 100000); + Iterator ui = segment.urlSelector(discoveruri, Long.MAX_VALUE, 100000); while (ui.hasNext()) { DigestURL u = ui.next(); String u0 = u.toNormalform(true); diff --git a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java index e0884ac47..aa8de480d 100644 --- a/source/net/yacy/cora/federate/solr/SchemaConfiguration.java +++ b/source/net/yacy/cora/federate/solr/SchemaConfiguration.java @@ -145,6 +145,7 @@ public class SchemaConfiguration extends Configuration implements Serializable { boolean changed = false; // FIND OUT IF THIS IS A DOUBLE DOCUMENT String hostid = url.hosthash(); + String protocol = url.getProtocol(); for (CollectionSchema[] checkfields: new CollectionSchema[][]{ {CollectionSchema.exact_signature_l, CollectionSchema.exact_signature_unique_b, CollectionSchema.exact_signature_copycount_i}, {CollectionSchema.fuzzy_signature_l, CollectionSchema.fuzzy_signature_unique_b, CollectionSchema.fuzzy_signature_copycount_i}}) { @@ -155,7 +156,7 @@ public class SchemaConfiguration extends Configuration implements Serializable { // lookup the document with the same signature long signature = ((Long) sid.getField(checkfield.getSolrFieldName()).getValue()).longValue(); try { - long count = segment.fulltext().getDefaultConnector().getCountByQuery(CollectionSchema.host_id_s + ":\"" + hostid + "\" AND " + checkfield.getSolrFieldName() + ":\"" + Long.toString(signature) + "\""); + long count = segment.fulltext().getDefaultConnector().getCountByQuery(CollectionSchema.url_protocol_s.getSolrFieldName() + ":\"" + protocol + "\" AND " + CollectionSchema.host_id_s.getSolrFieldName() + ":\"" + hostid + "\" AND " + checkfield.getSolrFieldName() + ":\"" + Long.toString(signature) + "\""); if (count > 1) { String urlhash = ASCII.String(url.hash()); if (uniqueURLs.contains(urlhash)) { @@ -172,7 +173,6 @@ public class SchemaConfiguration extends Configuration implements Serializable { } catch (final IOException e) {} } } - // CHECK IF TITLE AND DESCRIPTION IS UNIQUE (this is by default not switched on) if (segment.fulltext().getDefaultConfiguration().contains(CollectionSchema.host_id_s)) { uniquecheck: for (CollectionSchema[] checkfields: new CollectionSchema[][]{ @@ -191,15 +191,10 @@ public class SchemaConfiguration extends Configuration implements Serializable { continue uniquecheck; } try { - final SolrDocumentList docs = segment.fulltext().getDefaultConnector().getDocumentListByQuery(CollectionSchema.host_id_s + ":\"" + hostid + "\" AND " + signaturefield.getSolrFieldName() + ":\"" + checkhash.toString() + "\"", null, 0, 1); + final SolrDocumentList docs = segment.fulltext().getDefaultConnector().getDocumentListByQuery(CollectionSchema.url_protocol_s.getSolrFieldName() + ":\"" + protocol + "\" AND " + CollectionSchema.host_id_s.getSolrFieldName() + ":\"" + hostid + "\" AND " + signaturefield.getSolrFieldName() + ":\"" + checkhash.toString() + "\"", null, 0, 1); if (docs != null && !docs.isEmpty()) { - SolrDocument doc = docs.get(0); // switch unique attribute in new document sid.setField(uniquefield.getSolrFieldName(), false); - // switch attribute in existing document - SolrInputDocument sidContext = segment.fulltext().getDefaultConfiguration().toSolrInputDocument(doc); - sidContext.setField(uniquefield.getSolrFieldName(), false); - segment.putDocument(sidContext); changed = true; } else { sid.setField(uniquefield.getSolrFieldName(), true); diff --git a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java index cb8313aef..20484a62e 100644 --- a/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/AbstractSolrConnector.java @@ -167,7 +167,10 @@ public abstract class AbstractSolrConnector implements SolrConnector { try {queue.put(d);} catch (final InterruptedException e) {break;} count++; } - if (sdl.size() < pagesize) break; + if (sdl.size() < pagesize) { + //System.out.println("sdl.size() = " + sdl.size() + ", pagesize = " + pagesize); + break; + } o += sdl.size(); } catch (final SolrException e) { break; @@ -175,7 +178,7 @@ public abstract class AbstractSolrConnector implements SolrConnector { break; } } - for (int i = 0; i < concurrency; i++) { + for (int i = 0; i < Math.max(1, concurrency); i++) { try {queue.put(AbstractSolrConnector.POISON_DOCUMENT);} catch (final InterruptedException e1) {} } } diff --git a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java index 755819b28..c8e2e62a7 100644 --- a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java @@ -21,11 +21,10 @@ package net.yacy.cora.federate.solr.connector; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import net.yacy.cora.sorting.ReversibleScoreMap; @@ -54,75 +53,40 @@ import org.apache.solr.common.params.ModifiableSolrParams; */ public class ConcurrentUpdateSolrConnector implements SolrConnector { - SolrConnector connector; - - private final static Object POISON_PROCESS = new Object(); + private final static long AUTOCOMMIT = 3000; // milliseconds - private class ProcessHandler extends Thread { + private class CommitHandler extends Thread { @Override public void run() { - try { - Object process; - Collection docs = new ArrayList(); - while ((process = ConcurrentUpdateSolrConnector.this.processQueue.take()) != POISON_PROCESS) { - - if (process instanceof String) { - // delete document - if (docs.size() > 0) addSynchronized(docs); - String id = (String) process; - try { - ConcurrentUpdateSolrConnector.this.connector.deleteById(id); - } catch (final IOException e) { - ConcurrentLog.logException(e); - } - } - - if (process instanceof SolrInputDocument) { - SolrInputDocument doc = (SolrInputDocument) process; - docs.add(doc); - } - - if (docs.size() > 0 && - (ConcurrentUpdateSolrConnector.this.processQueue.size() == 0 || - docs.size() >= ConcurrentUpdateSolrConnector.this.processQueue.size() + ConcurrentUpdateSolrConnector.this.processQueue.remainingCapacity())) { - addSynchronized(docs); + while (ConcurrentUpdateSolrConnector.this.commitProcessRunning) { + commitDocBuffer(); + try {Thread.sleep(AUTOCOMMIT);} catch (final InterruptedException e) { + ConcurrentLog.logException(e); } } - } catch (final InterruptedException e) { - ConcurrentLog.logException(e); + } finally { + commitDocBuffer(); } } - private void addSynchronized(final Collection docs) { - assert docs.size() > 0; - try { - ConcurrentUpdateSolrConnector.this.connector.add(docs); - } catch (final OutOfMemoryError e) { - // clear and try again... - clearCaches(); - try { - ConcurrentUpdateSolrConnector.this.connector.add(docs); - } catch (final IOException ee) { - ConcurrentLog.logException(e); - } - } catch (final IOException e) { - ConcurrentLog.logException(e); - } - docs.clear(); - } } + private SolrConnector connector; private ARC metadataCache; - private ARH missCache; - private BlockingQueue processQueue; - private ProcessHandler processHandler; + private final ARH missCache; + private final LinkedHashMap docBuffer; + private CommitHandler processHandler; + private final int updateCapacity; + private boolean commitProcessRunning; public ConcurrentUpdateSolrConnector(final SolrConnector connector, final int updateCapacity, final int idCacheCapacity, final int concurrency) { this.connector = connector; - this.metadataCache = new ConcurrentARC(idCacheCapacity, concurrency); - this.missCache = new ConcurrentARH(idCacheCapacity, concurrency); - this.processQueue = new ArrayBlockingQueue(updateCapacity); + this.updateCapacity = updateCapacity; + this.metadataCache = new ConcurrentARC<>(idCacheCapacity, concurrency); + this.missCache = new ConcurrentARH<>(idCacheCapacity, concurrency); + this.docBuffer = new LinkedHashMap<>(); this.processHandler = null; + this.commitProcessRunning = true; ensureAliveProcessHandler(); } @@ -136,9 +100,34 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { return o instanceof ConcurrentUpdateSolrConnector && this.connector.equals(((ConcurrentUpdateSolrConnector) o).connector); } + private void commitDocBuffer() { + synchronized (this.docBuffer) { + //System.out.println("*** commit of " + this.docBuffer.size() + " documents"); + //Thread.dumpStack(); + if (this.docBuffer.size() > 0) try { + this.connector.add(this.docBuffer.values()); + } catch (final OutOfMemoryError e) { + // clear and try again... + clearCaches(); + try { + this.connector.add(this.docBuffer.values()); + } catch (final IOException ee) { + ConcurrentLog.logException(e); + } + } catch (final IOException e) { + ConcurrentLog.logException(e); + } + // move documents to metadata cache + for (Map.Entry entry: this.docBuffer.entrySet()) { + updateCache(entry.getKey(), AbstractSolrConnector.getMetadata(entry.getValue())); + } + this.docBuffer.clear(); + } + } + @Override public int bufferSize() { - return this.processQueue.size(); + return this.updateCapacity; } @Override @@ -148,57 +137,6 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { this.missCache.clear(); } - /** - * used for debugging - */ - private static void cacheSuccessSign() { - //ConcurrentLog.info("ConcurrentUpdate", "**** cache hit"); - } - - private boolean containsDeleteInProcessQueue(final String id) { - boolean delete = false; - boolean ctch = false; - for (Object o: this.processQueue) { - if (o == null) break; - if (checkDelete(o, id)) delete = true; // do not add a break here! - if (checkAdd(o, id)) {delete = false; ctch = true;} // do not add a break here! - } - if (ctch && delete) removeFromProcessQueue(id); // clean up put+remove - return delete; - } - - private SolrInputDocument getFromProcessQueue(final String id) { - SolrInputDocument d = null; - boolean ctch = false; - for (Object o: this.processQueue) { - if (o == null) break; - if (checkDelete(o, id)) d = null; // do not add a break here! - if (checkAdd(o, id)) {d = (SolrInputDocument) o; ctch = true;} // do not add a break here! - } - if (ctch && d == null) removeFromProcessQueue(id); // clean up put+remove - return d; - } - - private void removeFromProcessQueue(final String id) { - Iterator i = this.processQueue.iterator(); - while (i.hasNext()) { - if (checkAdd(i.next(), id)) {i.remove(); break;} - } - } - - private boolean checkDelete(final Object o, final String id) { - if (!(o instanceof String)) return false; - String docID = (String) o; - return (docID != null && docID.equals(id)); - } - - private boolean checkAdd(final Object o, final String id) { - if (!(o instanceof SolrInputDocument)) return false; - SolrInputDocument doc = (SolrInputDocument) o; - String docID = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); - return (docID != null && docID.equals(id)); - } - private void updateCache(final String id, final Metadata md) { if (id == null) return; if (MemoryControl.shortStatus()) { @@ -211,7 +149,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { public void ensureAliveProcessHandler() { if (this.processHandler == null || !this.processHandler.isAlive()) { - this.processHandler = new ProcessHandler(); + this.processHandler = new CommitHandler(); this.processHandler.setName(this.getClass().getName() + "_ProcessHandler"); this.processHandler.start(); } @@ -224,22 +162,19 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public long getSize() { - return this.connector.getSize() + this.processQueue.size(); + return this.connector.getSize() + this.docBuffer.size(); } @Override public void commit(boolean softCommit) { - long timeout = System.currentTimeMillis() + 1000; ensureAliveProcessHandler(); - while (this.processQueue.size() > 0) { - try {Thread.sleep(10);} catch (final InterruptedException e) {} - if (System.currentTimeMillis() > timeout) break; - } + commitDocBuffer(); this.connector.commit(softCommit); } @Override public void optimize(int maxSegments) { + commitDocBuffer(); this.connector.optimize(maxSegments); } @@ -256,7 +191,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public void close() { ensureAliveProcessHandler(); - try {this.processQueue.put(POISON_PROCESS);} catch (final InterruptedException e) {} + this.commitProcessRunning = false; try {this.processHandler.join();} catch (final InterruptedException e) {} this.connector.close(); this.metadataCache.clear(); @@ -266,21 +201,20 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public void clear() throws IOException { - this.processQueue.clear(); + this.docBuffer.clear(); this.connector.clear(); this.metadataCache.clear(); + this.missCache.clear(); } @Override public synchronized void deleteById(String id) throws IOException { this.metadataCache.remove(id); this.missCache.add(id); - ensureAliveProcessHandler(); - if (this.processHandler.isAlive()) { - try {this.processQueue.put(id);} catch (final InterruptedException e) {} - } else { - this.connector.deleteById(id); + synchronized (this.docBuffer) { + this.docBuffer.remove(id); } + this.connector.deleteById(id); } @Override @@ -289,33 +223,40 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { this.metadataCache.remove(id); this.missCache.add(id); } - ensureAliveProcessHandler(); - if (this.processHandler.isAlive()) { - for (String id: ids) try {this.processQueue.put(id);} catch (final InterruptedException e) {} - } else { - this.connector.deleteByIds(ids); + synchronized (this.docBuffer) { + for (String id: ids) { + this.docBuffer.remove(id); + } } + this.connector.deleteByIds(ids); } @Override public void deleteByQuery(final String querystring) throws IOException { + commitDocBuffer(); try { - ConcurrentUpdateSolrConnector.this.connector.deleteByQuery(querystring); - ConcurrentUpdateSolrConnector.this.metadataCache.clear(); + this.connector.deleteByQuery(querystring); + this.metadataCache.clear(); } catch (final IOException e) { ConcurrentLog.severe("ConcurrentUpdateSolrConnector", e.getMessage(), e); } - ConcurrentUpdateSolrConnector.this.connector.commit(true); } @Override public Metadata getMetadata(String id) throws IOException { - if (this.missCache.contains(id)) {cacheSuccessSign(); return null;} + if (this.missCache.contains(id)) return null; Metadata md = this.metadataCache.get(id); - if (md != null) {cacheSuccessSign(); return md;} - if (containsDeleteInProcessQueue(id)) {cacheSuccessSign(); return null;} - SolrInputDocument doc = getFromProcessQueue(id); - if (doc != null) {cacheSuccessSign(); return AbstractSolrConnector.getMetadata(doc);} + if (md != null) { + //System.out.println("*** metadata cache hit; metadataCache.size() = " + metadataCache.size()); + //Thread.dumpStack(); + return md; + } + SolrInputDocument doc = this.docBuffer.get(id); + if (doc != null) { + //System.out.println("*** docBuffer cache hit; docBuffer.size() = " + docBuffer.size()); + //Thread.dumpStack(); + return AbstractSolrConnector.getMetadata(doc); + } md = this.connector.getMetadata(id); if (md == null) {this.missCache.add(id); return null;} updateCache(id, md); @@ -325,26 +266,34 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public void add(SolrInputDocument solrdoc) throws IOException, SolrException { String id = (String) solrdoc.getFieldValue(CollectionSchema.id.getSolrFieldName()); - updateCache(id, AbstractSolrConnector.getMetadata(solrdoc)); + this.metadataCache.remove(id); // remove the id from the metadata cache because it will be overwritten by the update process anyway ensureAliveProcessHandler(); if (this.processHandler.isAlive()) { - try {this.processQueue.put(solrdoc);} catch (final InterruptedException e) {} + synchronized (this.docBuffer) {this.docBuffer.put(id, solrdoc);} } else { this.connector.add(solrdoc); + updateCache(id, AbstractSolrConnector.getMetadata(solrdoc)); + } + if (MemoryControl.shortStatus() || this.docBuffer.size() > this.updateCapacity) { + commitDocBuffer(); } } @Override public void add(Collection solrdocs) throws IOException, SolrException { - for (SolrInputDocument doc: solrdocs) { - String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); - updateCache(id, AbstractSolrConnector.getMetadata(doc)); - } ensureAliveProcessHandler(); - if (this.processHandler.isAlive()) { - for (SolrInputDocument doc: solrdocs) try {this.processQueue.put(doc);} catch (final InterruptedException e) {} - } else { - this.connector.add(solrdocs); + synchronized (this.docBuffer) { + for (SolrInputDocument solrdoc: solrdocs) { + String id = (String) solrdoc.getFieldValue(CollectionSchema.id.getSolrFieldName()); + if (this.processHandler.isAlive()) { + this.docBuffer.put(id, solrdoc); + } else { + this.connector.add(solrdoc); + } + } + } + if (MemoryControl.shortStatus() || this.docBuffer.size() > this.updateCapacity) { + commitDocBuffer(); } } @@ -352,26 +301,36 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { public SolrDocument getDocumentById(final String id, String... fields) throws IOException { assert id.length() == Word.commonHashLength : "wrong id: " + id; if (this.missCache.contains(id)) return null; - if (containsDeleteInProcessQueue(id)) return null; - SolrInputDocument idoc = getFromProcessQueue(id); - if (idoc != null) {cacheSuccessSign(); return ClientUtils.toSolrDocument(idoc);} - SolrDocument doc = this.connector.getDocumentById(id, AbstractSolrConnector.ensureEssentialFieldsIncluded(fields)); - if (doc == null) { + SolrInputDocument idoc = this.docBuffer.get(id); + if (idoc != null) { + //System.out.println("*** docBuffer cache hit; docBuffer.size() = " + docBuffer.size()); + //Thread.dumpStack(); + return ClientUtils.toSolrDocument(idoc); + } + SolrDocument solrdoc = this.connector.getDocumentById(id, AbstractSolrConnector.ensureEssentialFieldsIncluded(fields)); + if (solrdoc == null) { this.missCache.add(id); + this.metadataCache.remove(id); } else { - updateCache(id, AbstractSolrConnector.getMetadata(doc)); + updateCache(id, AbstractSolrConnector.getMetadata(solrdoc)); } - return doc; + return solrdoc; } @Override public QueryResponse getResponseByParams(ModifiableSolrParams query) throws IOException, SolrException { + commitDocBuffer(); return this.connector.getResponseByParams(query); } @Override public SolrDocumentList getDocumentListByParams(ModifiableSolrParams params) throws IOException, SolrException { + commitDocBuffer(); SolrDocumentList sdl = this.connector.getDocumentListByParams(params); + for (SolrDocument doc: sdl) { + String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); + updateCache(id, AbstractSolrConnector.getMetadata(doc)); + } return sdl; } @@ -383,6 +342,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public SolrDocumentList getDocumentListByQuery(String querystring, String sort, int offset, int count, String... fields) throws IOException, SolrException { + commitDocBuffer(); if (offset == 0 && count == 1 && querystring.startsWith("id:") && ((querystring.length() == 17 && querystring.charAt(3) == '"' && querystring.charAt(16) == '"') || querystring.length() == 15)) { @@ -393,34 +353,30 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { } SolrDocumentList sdl = this.connector.getDocumentListByQuery(querystring, sort, offset, count, AbstractSolrConnector.ensureEssentialFieldsIncluded(fields)); - /* - Iterator i = sdl.iterator(); - while (i.hasNext()) { - SolrDocument doc = i.next(); - String id = (String) doc.getFieldValue(CollectionSchema.id.getSolrFieldName()); - if (doc != null) updateIdCache(id, AbstractSolrConnector.getLoadDate(doc)); - } - */ return sdl; } @Override public long getCountByQuery(String querystring) throws IOException { + commitDocBuffer(); return this.connector.getCountByQuery(querystring); } @Override public Map> getFacets(String query, int maxresults, String... fields) throws IOException { + commitDocBuffer(); return this.connector.getFacets(query, maxresults, fields); } @Override public BlockingQueue concurrentDocumentsByQuery(String querystring, String sort, int offset, int maxcount, long maxtime, int buffersize, final int concurrency, String... fields) { + commitDocBuffer(); return this.connector.concurrentDocumentsByQuery(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency, fields); } @Override public BlockingQueue concurrentIDsByQuery(String querystring, String sort, int offset, int maxcount, long maxtime, int buffersize, final int concurrency) { + commitDocBuffer(); return this.connector.concurrentIDsByQuery(querystring, sort, offset, maxcount, maxtime, buffersize, concurrency); } diff --git a/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java b/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java index d9073f586..0dcca5efd 100644 --- a/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java +++ b/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java @@ -37,16 +37,16 @@ public class InstanceMirror { private EmbeddedInstance embeddedSolrInstance; private ShardInstance remoteSolrInstance; - private Map mirrorConnectorCache; + private Map mirrorConnectorCache; private Map embeddedConnectorCache; private Map remoteConnectorCache; public InstanceMirror() { this.embeddedSolrInstance = null; this.remoteSolrInstance = null; - this.mirrorConnectorCache = new ConcurrentHashMap(); - this.embeddedConnectorCache = new ConcurrentHashMap(); - this.remoteConnectorCache = new ConcurrentHashMap(); + this.mirrorConnectorCache = new ConcurrentHashMap<>(); + this.embeddedConnectorCache = new ConcurrentHashMap<>(); + this.remoteConnectorCache = new ConcurrentHashMap<>(); } public boolean isConnectedEmbedded() { @@ -161,11 +161,12 @@ public class InstanceMirror { } public SolrConnector getGenericMirrorConnector(String corename) { - ConcurrentUpdateSolrConnector msc = this.mirrorConnectorCache.get(corename); + SolrConnector msc = this.mirrorConnectorCache.get(corename); if (msc != null) return msc; EmbeddedSolrConnector esc = getEmbeddedConnector(corename); RemoteSolrConnector rsc = getRemoteConnector(corename); - msc = new ConcurrentUpdateSolrConnector(new MirrorSolrConnector(esc, rsc), RemoteInstance.queueSizeByMemory(), 100000, Runtime.getRuntime().availableProcessors()); + msc = new ConcurrentUpdateSolrConnector(new MirrorSolrConnector(esc, rsc), RemoteInstance.queueSizeByMemory(), 10000, Runtime.getRuntime().availableProcessors()); + //msc = new MirrorSolrConnector(esc, rsc); this.mirrorConnectorCache.put(corename, msc); return msc; } diff --git a/source/net/yacy/cora/federate/solr/instance/RemoteInstance.java b/source/net/yacy/cora/federate/solr/instance/RemoteInstance.java index 510f4299e..642fde268 100644 --- a/source/net/yacy/cora/federate/solr/instance/RemoteInstance.java +++ b/source/net/yacy/cora/federate/solr/instance/RemoteInstance.java @@ -256,6 +256,6 @@ public class RemoteInstance implements SolrInstance { } public static int queueSizeByMemory() { - return (int) Math.min(500, Math.max(1, MemoryControl.maxMemory() / 1024 / 1024 / 12)); + return (int) Math.min(30, Math.max(1, MemoryControl.maxMemory() / 1024 / 1024 / 12)); } } diff --git a/source/net/yacy/search/index/Fulltext.java b/source/net/yacy/search/index/Fulltext.java index e62a1b947..d06696c55 100644 --- a/source/net/yacy/search/index/Fulltext.java +++ b/source/net/yacy/search/index/Fulltext.java @@ -241,7 +241,7 @@ public final class Fulltext { public long collectionSize() { long t = System.currentTimeMillis(); if (t - this.collectionSizeLastAccess < 1000) return this.collectionSizeLastValue; - SolrConnector sc = this.solrInstances.getDefaultMirrorConnector(); + SolrConnector sc = getDefaultConnector(); if (sc == null) return 0; long size = sc.getSize(); this.collectionSizeLastAccess = t; @@ -429,7 +429,7 @@ public final class Fulltext { final String collectionQuery = CollectionSchema.host_s.getSolrFieldName() + ":\"" + host + "\"" + ((freshdate != null && freshdate.before(new Date())) ? (" AND " + CollectionSchema.load_date_dt.getSolrFieldName() + ":[* TO " + ISO8601Formatter.FORMATTER.format(freshdate) + "]") : ""); final AtomicInteger count = new AtomicInteger(0); - final BlockingQueue docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, null, 0, 1000000, 600000, 100, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); + final BlockingQueue docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(collectionQuery, null, 0, 1000000, Long.MAX_VALUE, 100, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName()); try { Set deleteIDs = new HashSet(); SolrDocument doc; @@ -665,7 +665,7 @@ public final class Fulltext { this.count++; } } else { - BlockingQueue docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", null, 0, 100000000, 10 * 60 * 60 * 1000, 100, 1, + BlockingQueue docs = Fulltext.this.getDefaultConnector().concurrentDocumentsByQuery(this.query + " AND " + CollectionSchema.httpstatus_i.getSolrFieldName() + ":200", null, 0, 100000000, Long.MAX_VALUE, 100, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.title.getSolrFieldName(), CollectionSchema.author.getSolrFieldName(), CollectionSchema.description_txt.getSolrFieldName(), CollectionSchema.size_i.getSolrFieldName(), CollectionSchema.last_modified.getSolrFieldName()); SolrDocument doc; diff --git a/source/net/yacy/search/index/Segment.java b/source/net/yacy/search/index/Segment.java index 59d2a8cb3..cb9fc2a8d 100644 --- a/source/net/yacy/search/index/Segment.java +++ b/source/net/yacy/search/index/Segment.java @@ -268,7 +268,7 @@ public class Segment { if ((internalIDs.size() == 0 || !connectedCitation()) && Segment.this.fulltext.useWebgraph()) { // reqd the references from the webgraph SolrConnector webgraph = Segment.this.fulltext.getWebgraphConnector(); - BlockingQueue docs = webgraph.concurrentDocumentsByQuery("{!raw f=" + WebgraphSchema.target_id_s.getSolrFieldName() + "}" + ASCII.String(id), WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 10000000, 1000, 100, 1, WebgraphSchema.source_id_s.getSolrFieldName()); + BlockingQueue docs = webgraph.concurrentDocumentsByQuery("{!raw f=" + WebgraphSchema.target_id_s.getSolrFieldName() + "}" + ASCII.String(id), WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 10000000, 10000, 100, 1, WebgraphSchema.source_id_s.getSolrFieldName()); SolrDocument doc; try { while ((doc = docs.take()) != AbstractSolrConnector.POISON_DOCUMENT) { diff --git a/source/net/yacy/search/schema/CollectionConfiguration.java b/source/net/yacy/search/schema/CollectionConfiguration.java index fe12f4bf9..ea611012d 100644 --- a/source/net/yacy/search/schema/CollectionConfiguration.java +++ b/source/net/yacy/search/schema/CollectionConfiguration.java @@ -996,7 +996,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri collection.contains(CollectionSchema.cr_host_chance_d) && collection.contains(CollectionSchema.cr_host_norm_i)))) try { int concurrency = Math.min(hostscore.size(), Runtime.getRuntime().availableProcessors()); - ConcurrentLog.info("CollectionConfiguration", "collecting " + hostscore.size() + " hosts, concrrency = " + concurrency); + ConcurrentLog.info("CollectionConfiguration", "collecting " + hostscore.size() + " hosts, concurrency = " + concurrency); int countcheck = 0; for (String host: hostscore.keyList(true)) { // Patch the citation index for links with canonical tags. @@ -1005,7 +1005,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri // To do so, we first must collect all canonical links, find all references to them, get the anchor list of the documents and patch the citation reference of these links String patchquery = CollectionSchema.host_s.getSolrFieldName() + ":" + host + " AND " + CollectionSchema.canonical_s.getSolrFieldName() + AbstractSolrConnector.CATCHALL_DTERM; long patchquerycount = collectionConnector.getCountByQuery(patchquery); - BlockingQueue documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, 86400000, 200, 1, + BlockingQueue documents_with_canonical_tag = collectionConnector.concurrentDocumentsByQuery(patchquery, CollectionSchema.url_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 200, 1, CollectionSchema.id.getSolrFieldName(), CollectionSchema.sku.getSolrFieldName(), CollectionSchema.canonical_s.getSolrFieldName()); SolrDocument doc_B; int patchquerycountcheck = 0; @@ -1087,7 +1087,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri final long count = segment.fulltext().getWebgraphConnector().getCountByQuery(query); int concurrency = Math.min((int) count, Math.max(1, Runtime.getRuntime().availableProcessors() / 4)); ConcurrentLog.info("CollectionConfiguration", "collecting " + count + " documents from the webgraph, concurrency = " + concurrency); - final BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(query, WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 100000000, 86400000, 200, concurrency); + final BlockingQueue docs = segment.fulltext().getWebgraphConnector().concurrentDocumentsByQuery(query, WebgraphSchema.source_chars_i.getSolrFieldName() + " asc", 0, 100000000, Long.MAX_VALUE, 200, concurrency); final AtomicInteger proccount = new AtomicInteger(0); Thread[] t = new Thread[concurrency]; for (final AtomicInteger i = new AtomicInteger(0); i.get() < t.length; i.incrementAndGet()) { @@ -1127,7 +1127,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri try { sid.removeField(WebgraphSchema.process_sxt.getSolrFieldName()); sid.removeField(WebgraphSchema.harvestkey_s.getSolrFieldName()); - segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(WebgraphSchema.id.getSolrFieldName())); + //segment.fulltext().getWebgraphConnector().deleteById((String) sid.getFieldValue(WebgraphSchema.id.getSolrFieldName())); segment.fulltext().getWebgraphConnector().add(sid); } catch (SolrException e) { ConcurrentLog.logException(e); @@ -1173,9 +1173,9 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri BlockingQueue docs = collectionConnector.concurrentDocumentsByQuery( query, CollectionSchema.host_subdomain_s.getSolrFieldName() + " asc," + // sort on subdomain to get hosts without subdomain first; that gives an opportunity to set www_unique_b flag to false - CollectionSchema.url_protocol_s.getSolrFieldName() + " asc," + // sort on protocol to get http before htts; that gives an opportunity to set http_unique_b flag to false + CollectionSchema.url_protocol_s.getSolrFieldName() + " asc," + // sort on protocol to get http before https; that gives an opportunity to set http_unique_b flag to false CollectionSchema.url_chars_i.getSolrFieldName() + " asc", - 0, 100000000, 86400000, 200, 1); + 0, 100000000, Long.MAX_VALUE, 200, 1); int countcheck = 0; Collection failids = new ArrayList(); SolrDocument doc; @@ -1232,7 +1232,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri sid.removeField(CollectionSchema.harvestkey_s.getSolrFieldName()); // send back to index - collectionConnector.deleteById(i); + //collectionConnector.deleteById(i); collectionConnector.add(sid); proccount++; allcount.incrementAndGet(); @@ -1260,6 +1260,7 @@ public class CollectionConfiguration extends SchemaConfiguration implements Seri } catch (IOException e3) { ConcurrentLog.warn("CollectionConfiguration", e3.getMessage(), e3); } + collectionConnector.commit(true); // make changes available directly to prevent that the process repeats again return allcount.get(); } diff --git a/source/net/yacy/search/schema/HyperlinkGraph.java b/source/net/yacy/search/schema/HyperlinkGraph.java index 33e2c1936..3a22464a9 100644 --- a/source/net/yacy/search/schema/HyperlinkGraph.java +++ b/source/net/yacy/search/schema/HyperlinkGraph.java @@ -57,7 +57,7 @@ public class HyperlinkGraph implements Iterable { this.hostname = null; } - public void fill(final SolrConnector solrConnector, String hostname, final DigestURL stopURL, final int maxtime, final int maxnodes) { + public void fill(final SolrConnector solrConnector, String hostname, final DigestURL stopURL, final long maxtime, final int maxnodes) { this.hostname = hostname; if (hostname.startsWith("www.")) hostname = hostname.substring(4); StringBuilder q = new StringBuilder();