replaced old caching in SolrConnector with a new one which is better for

concurrency and should prevent from 100% CPU usage after a long run of a
peer with a large number of documents.
pull/1/head
Michael Peter Christen 11 years ago
parent 84cf7e8e9f
commit a5d7961812

@ -31,13 +31,10 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.yacy.cora.document.encoding.ASCII;
import net.yacy.cora.sorting.ReversibleScoreMap;
import net.yacy.cora.storage.HandleSet;
import net.yacy.cora.storage.ARH;
import net.yacy.cora.storage.ConcurrentARH;
import net.yacy.cora.util.ConcurrentLog;
import net.yacy.cora.util.SpaceExceededException;
import net.yacy.kelondro.data.meta.URIMetadataRow;
import net.yacy.kelondro.index.RowHandleSet;
import net.yacy.kelondro.util.MemoryControl;
import net.yacy.search.schema.CollectionSchema;
@ -71,7 +68,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
try {
removeIdFromUpdateQueue(id);
ConcurrentUpdateSolrConnector.this.connector.deleteById(id);
ConcurrentUpdateSolrConnector.this.idCache.remove(ASCII.getBytes(id));
ConcurrentUpdateSolrConnector.this.idCache.delete(id);
} catch (final IOException e) {
ConcurrentLog.logException(e);
}
@ -126,20 +123,18 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
}
}
private HandleSet idCache;
private ARH<String> idCache;
private BlockingQueue<SolrInputDocument> updateQueue;
private BlockingQueue<String> deleteQueue;
private Thread deletionHandler, updateHandler;
private int idCacheCapacity;
public ConcurrentUpdateSolrConnector(SolrConnector connector, int updateCapacity, int idCacheCapacity) {
public ConcurrentUpdateSolrConnector(SolrConnector connector, int updateCapacity, int idCacheCapacity, int concurrency) {
this.connector = connector;
this.idCache = new RowHandleSet(URIMetadataRow.rowdef.primaryKeyLength, URIMetadataRow.rowdef.objectOrder, 100);
this.idCache = new ConcurrentARH<String>(idCacheCapacity, concurrency);
this.updateQueue = new ArrayBlockingQueue<SolrInputDocument>(updateCapacity);
this.deleteQueue = new LinkedBlockingQueue<String>();
this.deletionHandler = null;
this.updateHandler = null;
this.idCacheCapacity = idCacheCapacity;
ensureAliveDeletionHandler();
ensureAliveUpdateHandler();
}
@ -221,12 +216,8 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
private void updateIdCache(String id) {
if (id == null) return;
if (this.idCache.size() >= this.idCacheCapacity || MemoryControl.shortStatus()) this.idCache.clear();
try {
this.idCache.put(ASCII.getBytes(id));
} catch (final SpaceExceededException e) {
this.idCache.clear();
}
if (MemoryControl.shortStatus()) this.idCache.clear();
this.idCache.add(id);
}
public void ensureAliveDeletionHandler() {
@ -308,7 +299,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public void deleteById(String id) throws IOException {
removeIdFromUpdateQueue(id);
this.idCache.remove(ASCII.getBytes(id));
this.idCache.delete(id);
if (this.deletionHandler.isAlive()) {
try {this.deleteQueue.put(id);} catch (final InterruptedException e) {}
} else {
@ -320,7 +311,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
public void deleteByIds(Collection<String> ids) throws IOException {
for (String id: ids) {
removeIdFromUpdateQueue(id);
this.idCache.remove(ASCII.getBytes(id));
this.idCache.delete(id);
}
if (this.deletionHandler.isAlive()) {
for (String id: ids) try {this.deleteQueue.put(id);} catch (final InterruptedException e) {}
@ -347,7 +338,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
@Override
public boolean existsById(String id) throws IOException {
if (this.idCache.has(ASCII.getBytes(id))) {cacheSuccessSign(); return true;}
if (this.idCache.contains(id)) {cacheSuccessSign(); return true;}
if (existIdFromDeleteQueue(id)) {cacheSuccessSign(); return false;}
if (existIdFromUpdateQueue(id)) {cacheSuccessSign(); return true;}
if (this.connector.existsById(id)) {
@ -364,7 +355,7 @@ public class ConcurrentUpdateSolrConnector implements SolrConnector {
if (ids.size() == 1) return existsById(ids.iterator().next()) ? ids : e;
Set<String> idsC = new HashSet<String>();
for (String id: ids) {
if (this.idCache.has(ASCII.getBytes(id))) {cacheSuccessSign(); e.add(id); continue;}
if (this.idCache.contains(id)) {cacheSuccessSign(); e.add(id); continue;}
if (existIdFromDeleteQueue(id)) {cacheSuccessSign(); continue;}
if (existIdFromUpdateQueue(id)) {cacheSuccessSign(); e.add(id); continue;}
idsC.add(id);

@ -145,7 +145,7 @@ public class InstanceMirror {
if (defaultCoreName == null) return null;
EmbeddedSolrConnector esc = this.solr0 == null ? null : new EmbeddedSolrConnector(this.solr0, defaultCoreName);
RemoteSolrConnector rsc = this.solr1 == null ? null : new RemoteSolrConnector(this.solr1, true, defaultCoreName);
this.defaultConnector = new ConcurrentUpdateSolrConnector(new MirrorSolrConnector(esc, rsc), 100, 1000000);
this.defaultConnector = new ConcurrentUpdateSolrConnector(new MirrorSolrConnector(esc, rsc), 100, 100000, Runtime.getRuntime().availableProcessors());
this.connectorCache.put(defaultCoreName, this.defaultConnector);
return this.defaultConnector;
}
@ -155,7 +155,7 @@ public class InstanceMirror {
if (msc != null) return msc;
EmbeddedSolrConnector esc = this.solr0 == null ? null : new EmbeddedSolrConnector(this.solr0, corename);
RemoteSolrConnector rsc = this.solr1 == null ? null : new RemoteSolrConnector(this.solr1, true, corename);
msc = new ConcurrentUpdateSolrConnector(new MirrorSolrConnector(esc, rsc), 100, 1000000);
msc = new ConcurrentUpdateSolrConnector(new MirrorSolrConnector(esc, rsc), 100, 100000, Runtime.getRuntime().availableProcessors());
this.connectorCache.put(corename, msc);
return msc;
}

@ -0,0 +1,81 @@
/**
* ARH
* an interface for Adaptive Replacement Handles
* Copyright 2014 by Michael Peter Christen, mc@yacy.net, Frankfurt a. M., Germany
* First released 15.01.2014 at http://yacy.net
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
*/
package net.yacy.cora.storage;
import java.util.Iterator;
import java.util.Set;
public interface ARH<K> extends Iterable<K> {
/**
* get the size of the ARH. this returns the sum of main and ghost cache
* @return the complete number of entries in the ARH cache
*/
public int size();
/**
* add a value to the cache.
* do not return a previous content value
* @param s
* @return true if this set did not already contain the specified element
*/
public boolean add(K s);
/**
* check if a value in the cache exist
* @param s
* @return true if the value exist
*/
public boolean contains(Object s);
/**
* delete an entry from the cache
* @param s
*/
public void delete(K s);
/**
* clear the cache
*/
public void clear();
/**
* iterator implements the Iterable interface
*/
public Iterator<K> iterator();
/**
* Return a Set view of the mappings contained in this map.
* This method is the basis for all methods that are implemented
* by a AbstractMap implementation
*
* @return a set view of the mappings contained in this map
*/
public Set<K> set();
/**
* a hash code for this ARH
* @return a hash code
*/
int hashCode();
}

@ -0,0 +1,82 @@
/**
* ConcurrentARH
* an interface for Adaptive Replacement Handles
* Copyright 2014 by Michael Peter Christen, mc@yacy.net, Frankfurt a. M., Germany
* First released 15.01.2014 at http://yacy.net
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file lgpl21.txt
* If not, see <http://www.gnu.org/licenses/>.
*/
package net.yacy.cora.storage;
import java.util.AbstractSet;
import java.util.Iterator;
import java.util.Set;
/**
* An ARH is set for handles in respect to the ARC construction: an Adaptive Replacement Handle Cache.
*/
public class ConcurrentARH<K> extends AbstractSet<K> implements Set<K>, Iterable<K>, ARH<K> {
private static final Object _EXIST = new Object();
private final ConcurrentARC<K, Object> cache;
public ConcurrentARH(final int cacheSize, final int partitions) {
this.cache = new ConcurrentARC<K, Object>(cacheSize, partitions);
}
@Override
public int size() {
return this.cache.size();
}
@Override
public boolean contains(Object o) {
return this.cache.containsKey(o);
}
@Override
public void clear() {
this.cache.clear();
}
@Override
public Set<K> set() {
return this.cache.keySet();
}
@Override
public Iterator<K> iterator() {
return this.cache.keySet().iterator();
}
@Override
public boolean isEmpty() {
return this.cache.isEmpty();
}
@Override
public boolean add(K e) {
Object o = this.cache.put(e, _EXIST);
return o == null;
}
@Override
public void delete(Object o) {
this.cache.remove(o);
}
}
Loading…
Cancel
Save