diff --git a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java index ba99a5449..1775859b4 100644 --- a/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java +++ b/source/net/yacy/cora/federate/solr/connector/ConcurrentUpdateSolrConnector.java @@ -31,13 +31,10 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import net.yacy.cora.document.encoding.ASCII; import net.yacy.cora.sorting.ReversibleScoreMap; -import net.yacy.cora.storage.HandleSet; +import net.yacy.cora.storage.ARH; +import net.yacy.cora.storage.ConcurrentARH; import net.yacy.cora.util.ConcurrentLog; -import net.yacy.cora.util.SpaceExceededException; -import net.yacy.kelondro.data.meta.URIMetadataRow; -import net.yacy.kelondro.index.RowHandleSet; import net.yacy.kelondro.util.MemoryControl; import net.yacy.search.schema.CollectionSchema; @@ -71,7 +68,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { try { removeIdFromUpdateQueue(id); ConcurrentUpdateSolrConnector.this.connector.deleteById(id); - ConcurrentUpdateSolrConnector.this.idCache.remove(ASCII.getBytes(id)); + ConcurrentUpdateSolrConnector.this.idCache.delete(id); } catch (final IOException e) { ConcurrentLog.logException(e); } @@ -126,20 +123,18 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { } } - private HandleSet idCache; + private ARH idCache; private BlockingQueue updateQueue; private BlockingQueue deleteQueue; private Thread deletionHandler, updateHandler; - private int idCacheCapacity; - public ConcurrentUpdateSolrConnector(SolrConnector connector, int updateCapacity, int idCacheCapacity) { + public ConcurrentUpdateSolrConnector(SolrConnector connector, int updateCapacity, int idCacheCapacity, int concurrency) { this.connector = connector; - this.idCache = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 100); + this.idCache = new ConcurrentARH(idCacheCapacity, concurrency); this.updateQueue = new ArrayBlockingQueue(updateCapacity); this.deleteQueue = new LinkedBlockingQueue(); this.deletionHandler = null; this.updateHandler = null; - this.idCacheCapacity = idCacheCapacity; ensureAliveDeletionHandler(); ensureAliveUpdateHandler(); } @@ -221,12 +216,8 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { private void updateIdCache(String id) { if (id == null) return; - if (this.idCache.size() >= this.idCacheCapacity || MemoryControl.shortStatus()) this.idCache.clear(); - try { - this.idCache.put(ASCII.getBytes(id)); - } catch (final SpaceExceededException e) { - this.idCache.clear(); - } + if (MemoryControl.shortStatus()) this.idCache.clear(); + this.idCache.add(id); } public void ensureAliveDeletionHandler() { @@ -308,7 +299,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public void deleteById(String id) throws IOException { removeIdFromUpdateQueue(id); - this.idCache.remove(ASCII.getBytes(id)); + this.idCache.delete(id); if (this.deletionHandler.isAlive()) { try {this.deleteQueue.put(id);} catch (final InterruptedException e) {} } else { @@ -320,7 +311,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { public void deleteByIds(Collection ids) throws IOException { for (String id: ids) { removeIdFromUpdateQueue(id); - this.idCache.remove(ASCII.getBytes(id)); + this.idCache.delete(id); } if (this.deletionHandler.isAlive()) { for (String id: ids) try {this.deleteQueue.put(id);} catch (final InterruptedException e) {} @@ -347,7 +338,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { @Override public boolean existsById(String id) throws IOException { - if (this.idCache.has(ASCII.getBytes(id))) {cacheSuccessSign(); return true;} + if (this.idCache.contains(id)) {cacheSuccessSign(); return true;} if (existIdFromDeleteQueue(id)) {cacheSuccessSign(); return false;} if (existIdFromUpdateQueue(id)) {cacheSuccessSign(); return true;} if (this.connector.existsById(id)) { @@ -364,7 +355,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector { if (ids.size() == 1) return existsById(ids.iterator().next()) ? ids : e; Set idsC = new HashSet(); for (String id: ids) { - if (this.idCache.has(ASCII.getBytes(id))) {cacheSuccessSign(); e.add(id); continue;} + if (this.idCache.contains(id)) {cacheSuccessSign(); e.add(id); continue;} if (existIdFromDeleteQueue(id)) {cacheSuccessSign(); continue;} if (existIdFromUpdateQueue(id)) {cacheSuccessSign(); e.add(id); continue;} idsC.add(id); diff --git a/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java b/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java index 42771a03d..c11375de8 100644 --- a/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java +++ b/source/net/yacy/cora/federate/solr/instance/InstanceMirror.java @@ -145,7 +145,7 @@ public class InstanceMirror { if (defaultCoreName == null) return null; EmbeddedSolrConnector esc = this.solr0 == null ? null : new EmbeddedSolrConnector(this.solr0, defaultCoreName); RemoteSolrConnector rsc = this.solr1 == null ? null : new RemoteSolrConnector(this.solr1, true, defaultCoreName); - this.defaultConnector = new ConcurrentUpdateSolrConnector(new MirrorSolrConnector(esc, rsc), 100, 1000000); + this.defaultConnector = new ConcurrentUpdateSolrConnector(new MirrorSolrConnector(esc, rsc), 100, 100000, Runtime.getRuntime().availableProcessors()); this.connectorCache.put(defaultCoreName, this.defaultConnector); return this.defaultConnector; } @@ -155,7 +155,7 @@ public class InstanceMirror { if (msc != null) return msc; EmbeddedSolrConnector esc = this.solr0 == null ? null : new EmbeddedSolrConnector(this.solr0, corename); RemoteSolrConnector rsc = this.solr1 == null ? null : new RemoteSolrConnector(this.solr1, true, corename); - msc = new ConcurrentUpdateSolrConnector(new MirrorSolrConnector(esc, rsc), 100, 1000000); + msc = new ConcurrentUpdateSolrConnector(new MirrorSolrConnector(esc, rsc), 100, 100000, Runtime.getRuntime().availableProcessors()); this.connectorCache.put(corename, msc); return msc; } diff --git a/source/net/yacy/cora/storage/ARH.java b/source/net/yacy/cora/storage/ARH.java new file mode 100644 index 000000000..cd60605d7 --- /dev/null +++ b/source/net/yacy/cora/storage/ARH.java @@ -0,0 +1,81 @@ +/** + * ARH + * an interface for Adaptive Replacement Handles + * Copyright 2014 by Michael Peter Christen, mc@yacy.net, Frankfurt a. M., Germany + * First released 15.01.2014 at http://yacy.net + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program in the file lgpl21.txt + * If not, see . + */ + +package net.yacy.cora.storage; + +import java.util.Iterator; +import java.util.Set; + + +public interface ARH extends Iterable { + + /** + * get the size of the ARH. this returns the sum of main and ghost cache + * @return the complete number of entries in the ARH cache + */ + public int size(); + + /** + * add a value to the cache. + * do not return a previous content value + * @param s + * @return true if this set did not already contain the specified element + */ + public boolean add(K s); + + /** + * check if a value in the cache exist + * @param s + * @return true if the value exist + */ + public boolean contains(Object s); + + /** + * delete an entry from the cache + * @param s + */ + public void delete(K s); + + /** + * clear the cache + */ + public void clear(); + + /** + * iterator implements the Iterable interface + */ + public Iterator iterator(); + + /** + * Return a Set view of the mappings contained in this map. + * This method is the basis for all methods that are implemented + * by a AbstractMap implementation + * + * @return a set view of the mappings contained in this map + */ + public Set set(); + + /** + * a hash code for this ARH + * @return a hash code + */ + int hashCode(); +} diff --git a/source/net/yacy/cora/storage/ConcurrentARH.java b/source/net/yacy/cora/storage/ConcurrentARH.java new file mode 100644 index 000000000..6225869fd --- /dev/null +++ b/source/net/yacy/cora/storage/ConcurrentARH.java @@ -0,0 +1,82 @@ +/** + * ConcurrentARH + * an interface for Adaptive Replacement Handles + * Copyright 2014 by Michael Peter Christen, mc@yacy.net, Frankfurt a. M., Germany + * First released 15.01.2014 at http://yacy.net + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program in the file lgpl21.txt + * If not, see . + */ + +package net.yacy.cora.storage; + +import java.util.AbstractSet; +import java.util.Iterator; +import java.util.Set; + +/** + * An ARH is set for handles in respect to the ARC construction: an Adaptive Replacement Handle Cache. + */ +public class ConcurrentARH extends AbstractSet implements Set, Iterable, ARH { + + private static final Object _EXIST = new Object(); + + private final ConcurrentARC cache; + + public ConcurrentARH(final int cacheSize, final int partitions) { + this.cache = new ConcurrentARC(cacheSize, partitions); + } + + @Override + public int size() { + return this.cache.size(); + } + + @Override + public boolean contains(Object o) { + return this.cache.containsKey(o); + } + + @Override + public void clear() { + this.cache.clear(); + } + + @Override + public Set set() { + return this.cache.keySet(); + } + + @Override + public Iterator iterator() { + return this.cache.keySet().iterator(); + } + + @Override + public boolean isEmpty() { + return this.cache.isEmpty(); + } + + @Override + public boolean add(K e) { + Object o = this.cache.put(e, _EXIST); + return o == null; + } + + @Override + public void delete(Object o) { + this.cache.remove(o); + } + +}