reduced IO-load and synchronization/blocking

- enhanced the Balancer performance when building new domain stacks using a new Table buffer
- added the new Table buffer BufferedObjectIndex class
- changed order of access to LURL-read (prefereing segment over Crawl Queues) will reduced blocking time on balancer
- fixed PPM setting in Crawler_p servlet (had doubled values)
- reduced synchronization in IndexCell because it is not necessary: reduced blocking during indexing/merging/dumping
- removed did-you-mean cache in IndexCell because that caused too much overhead and more memory usage but was not very useful. This reduced also deadlocks that could be causes when searched are performed during indexing.




git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6819 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 15 years ago
parent 7417425e6a
commit 8b8107b2a3

@ -32,6 +32,7 @@ import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import net.yacy.kelondro.index.BufferedObjectIndex;
import net.yacy.kelondro.index.HandleSet;
import net.yacy.kelondro.index.ObjectIndex;
import net.yacy.kelondro.index.Row;
@ -49,12 +50,13 @@ public class Balancer {
private static final String indexSuffix = "9.db";
private static final int EcoFSBufferSize = 1000;
private static final int objectIndexBufferSize = 1000;
// class variables
private final ConcurrentHashMap<String, LinkedList<byte[]>> domainStacks; // a map from domain name part to Lists with url hashs
private final ConcurrentLinkedQueue<byte[]> top;
private final TreeMap<Long, byte[]> delayed;
private ObjectIndex urlFileIndex;
private BufferedObjectIndex urlFileIndex;
private final File cacheStacksPath;
private long minimumLocalDelta;
private long minimumGlobalDelta;
@ -81,10 +83,10 @@ public class Balancer {
cacheStacksPath.mkdirs();
final File f = new File(cacheStacksPath, stackname + indexSuffix);
try {
urlFileIndex = new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727);
urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727), objectIndexBufferSize);
} catch (RowSpaceExceededException e) {
try {
urlFileIndex = new Table(f, Request.rowdef, 0, 0, false, exceed134217727);
urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727), objectIndexBufferSize);
} catch (RowSpaceExceededException e1) {
Log.logException(e1);
}
@ -491,7 +493,8 @@ public class Balancer {
this.domainStacks.clear();
//synchronized (this.delayed) { delayed.clear(); }
this.lastDomainStackFill = System.currentTimeMillis();
final CloneableIterator<byte[]> i = this.urlFileIndex.keys(true, null);
final HandleSet handles = this.urlFileIndex.keysFromBuffer(objectIndexBufferSize / 2);
final CloneableIterator<byte[]> i = handles.keys(true, null);
while (i.hasNext()) {
pushHashToDomainStacks(i.next(), 1000);
if (this.domainStacks.size() > maxdomstacksize) break;

@ -1047,22 +1047,16 @@ public final class Switchboard extends serverSwitch {
crawlQueues.urlRemove(hash);
}
public void urlRemove(final Segments.Process process, final byte[] hash) {
indexSegments.urlMetadata(process).remove(hash);
crawlResults.remove(new String(hash));
crawlQueues.urlRemove(hash);
}
public DigestURI getURL(final Segments.Process process, final byte[] urlhash) {
if (urlhash == null) return null;
if (urlhash.length == 0) return null;
final DigestURI ne = crawlQueues.getURL(urlhash);
if (ne != null) return ne;
final URIMetadataRow le = indexSegments.urlMetadata(process).load(urlhash, null, 0);
if (le == null) return null;
Components metadata = le.metadata();
if (metadata == null) return null;
return metadata.url();
if (le != null) {
Components metadata = le.metadata();
if (metadata == null) return null;
return metadata.url();
}
return crawlQueues.getURL(urlhash);
}
public RankingProfile getRanking() {
@ -1927,7 +1921,7 @@ public final class Switchboard extends serverSwitch {
// 1000 <= wantedPPM : maximum performance
if (wPPM <= 10) wPPM = 10;
if (wPPM >= 30000) wPPM = 30000;
final int newBusySleep = 30000 / wPPM; // for wantedPPM = 10: 6000; for wantedPPM = 1000: 60
final int newBusySleep = 60000 / wPPM; // for wantedPPM = 10: 6000; for wantedPPM = 1000: 60
BusyThread thread;

@ -0,0 +1,261 @@
/**
* BufferedObjectIndex
* Copyright 2010 by Michael Peter Christen
* First released 18.4.2010 at http://yacy.net
*
* This file is part of YaCy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program in the file COPYING.LESSER.
* If not, see <http://www.gnu.org/licenses/>.
*/
package net.yacy.kelondro.index;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import net.yacy.kelondro.index.Row.Entry;
import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.order.CloneableIterator;
import net.yacy.kelondro.order.MergeIterator;
/**
* a write buffer for ObjectIndex entries
* @author Michael Peter Christen
*
*/
public class BufferedObjectIndex implements ObjectIndex, Iterable<Row.Entry> {
private final ObjectIndex backend;
private final RowSet buffer;
private final int buffersize;
private final Row.EntryComparator entryComparator;
public BufferedObjectIndex(ObjectIndex backend, int buffersize) {
this.backend = backend;
this.buffersize = buffersize;
this.buffer = new RowSet(backend.row());
this.entryComparator = new Row.EntryComparator(backend.row().objectOrder);
}
private final void flushBuffer() throws IOException, RowSpaceExceededException {
if (this.buffer.size() > 0) {
for (Row.Entry e: this.buffer) {
this.backend.put(e);
}
this.buffer.clear();
}
}
/**
* check size of buffer in such a way that a put into the buffer is possible
* afterwards without exceeding the given maximal buffersize
* @throws RowSpaceExceededException
* @throws IOException
*/
private final void checkBuffer() throws IOException, RowSpaceExceededException {
if (this.buffer.size() >= this.buffersize) flushBuffer();
}
public void addUnique(Entry row) throws RowSpaceExceededException, IOException {
synchronized (this.backend) {
checkBuffer();
this.buffer.put(row);
}
}
public void clear() throws IOException {
synchronized (this.backend) {
this.backend.clear();
this.buffer.clear();
}
}
public void close() {
synchronized (this.backend) {
try {
flushBuffer();
} catch (IOException e) {
Log.logException(e);
} catch (RowSpaceExceededException e) {
Log.logException(e);
}
this.backend.close();
}
}
public void deleteOnExit() {
this.backend.deleteOnExit();
}
public String filename() {
return this.backend.filename();
}
public int size() {
synchronized (this.backend) {
return this.buffer.size() + this.backend.size();
}
}
public Entry get(byte[] key) throws IOException {
synchronized (this.backend) {
Entry entry = this.buffer.get(key);
if (entry != null) return entry;
return this.backend.get(key);
}
}
public boolean has(byte[] key) {
synchronized (this.backend) {
return this.buffer.has(key) || this.backend.has(key);
}
}
public boolean isEmpty() {
synchronized (this.backend) {
return this.buffer.isEmpty() && this.backend.isEmpty();
}
}
public void put(Entry row) throws IOException, RowSpaceExceededException {
synchronized (this.backend) {
checkBuffer();
this.buffer.put(row);
}
}
public Entry remove(byte[] key) throws IOException {
synchronized (this.backend) {
Entry entry = this.buffer.remove(key);
if (entry != null) return entry;
return this.backend.remove(key);
}
}
public ArrayList<RowCollection> removeDoubles() throws IOException, RowSpaceExceededException {
synchronized (this.backend) {
flushBuffer();
return this.backend.removeDoubles();
}
}
public Entry removeOne() throws IOException {
synchronized (this.backend) {
if (!this.buffer.isEmpty()) {
Entry entry = this.buffer.removeOne();
if (entry != null) return entry;
}
return this.backend.removeOne();
}
}
public Entry replace(Entry row) throws RowSpaceExceededException, IOException {
synchronized (this.backend) {
Entry entry = this.buffer.replace(row);
if (entry != null) return entry;
return this.backend.replace(row);
}
}
public Row row() {
return this.buffer.row();
}
public CloneableIterator<byte[]> keys(boolean up, byte[] firstKey) throws IOException {
synchronized (this.backend) {
return new MergeIterator<byte[]>(
this.buffer.keys(up, firstKey),
this.backend.keys(up, firstKey),
this.buffer.rowdef.getOrdering(),
MergeIterator.simpleMerge,
true);
}
}
public Iterator<Entry> iterator() {
try {
return this.rows();
} catch (IOException e) {
Log.logException(e);
return null;
}
}
public CloneableIterator<Entry> rows(boolean up, byte[] firstKey) throws IOException {
synchronized (this.backend) {
return new MergeIterator<Entry>(
this.buffer.rows(up, firstKey),
this.backend.rows(up, firstKey),
this.entryComparator,
MergeIterator.simpleMerge,
true);
}
}
public CloneableIterator<Entry> rows() throws IOException {
synchronized (this.backend) {
return new MergeIterator<Entry>(
this.buffer.rows(),
this.backend.rows(),
this.entryComparator,
MergeIterator.simpleMerge,
true);
}
}
/**
* special iterator for BufferedObjectIndex:
* iterates only objects from the buffer. The use case for this iterator is given
* if first elements are iterated and then all iterated elements are deleted from the index.
* To minimize the IO load the buffer is filled from the backend in such a way that
* it creates a minimum of Read/Write-Head operations which is done using the removeOne() method.
* The buffer will be filled with the demanded number of records. The given load value does
* not denote the number of removeOne() operations but the number of records that are missing in the
* buffer to provide the give load number of record entries.
* The given load number must not exceed the maximal number of entries in the buffer.
* To give room for put()-inserts while the iterator is running it is recommended to set the load
* value at maximum to the maximum number of entries in the buffer divided by two.
* @param load number of records that shall be in the buffer when returning the buffer iterator
* @return an iterator of the elements in the buffer.
* @throws IOException
*/
public HandleSet keysFromBuffer(int load) throws IOException {
if (load > this.buffersize) throw new IOException("buffer load size exceeded");
synchronized (this.backend) {
int missing = Math.min(this.backend.size(), load - this.buffer.size());
while (missing-- > 0) {
try {
this.buffer.put(this.backend.removeOne());
} catch (RowSpaceExceededException e) {
Log.logException(e);
break;
}
}
HandleSet handles = new HandleSet(this.buffer.row().primaryKeyLength, this.buffer.row().objectOrder, this.buffer.size());
Iterator<byte[]> i = this.buffer.keys();
while (i.hasNext()) {
try {
handles.put(i.next());
} catch (RowSpaceExceededException e) {
Log.logException(e);
break;
}
}
return handles;
}
}
}

@ -38,7 +38,7 @@ import java.util.Iterator;
import net.yacy.kelondro.order.CloneableIterator;
public interface ObjectIndex {
public interface ObjectIndex extends Iterable<Row.Entry> {
public String filename(); // returns a unique identified for this index; can be a real or artificial file name
public int size();

@ -66,7 +66,7 @@ public class IODispatcher extends Thread {
this.terminate = false;
}
public synchronized void terminate() {
public void terminate() {
if (termination != null && controlQueue != null && this.isAlive()) {
this.terminate = true;
this.controlQueue.release();

@ -73,7 +73,6 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
private long lastCleanup, lastDump;
private final long targetFileSize, maxFileSize;
private final int writeBufferSize;
private final ARC<ByteArray, Integer> countCache;
private Semaphore dumperSemaphore = new Semaphore(1);
private Semaphore cleanerSemaphore = new Semaphore(1);
@ -100,7 +99,6 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
this.targetFileSize = targetFileSize;
this.maxFileSize = maxFileSize;
this.writeBufferSize = writeBufferSize;
this.countCache = new SimpleARC<ByteArray, Integer>(1000);
//cleanCache();
}
@ -163,27 +161,16 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
*/
public int count(byte[] termHash) {
// check if value is in cache
ByteArray ba = new ByteArray(termHash);
Integer countCache = this.countCache.get(ba);
int countFile;
if (countCache == null) {
// read fresh values from file
ReferenceContainer<ReferenceType> c1;
try {
c1 = this.array.get(termHash);
} catch (Exception e) {
Log.logException(e);
c1 = null;
}
countFile = (c1 == null) ? 0 : c1.size();
// store to cache
this.countCache.put(ba, countFile);
} else {
// value was in ram
countFile = countCache.intValue();
// read fresh values from file
ReferenceContainer<ReferenceType> c1;
try {
c1 = this.array.get(termHash);
} catch (Exception e) {
Log.logException(e);
c1 = null;
}
countFile = (c1 == null) ? 0 : c1.size();
// count from container in ram
ReferenceContainer<ReferenceType> countRam = this.ram.get(termHash, null);
@ -215,7 +202,6 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
return c1.merge(c0);
} catch (RowSpaceExceededException e) {
// try to free some ram
countCache.clear();
try {
return c1.merge(c0);
} catch (RowSpaceExceededException e1) {
@ -239,7 +225,6 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
}
if (c1 != null) {
this.array.delete(termHash);
this.countCache.remove(new ByteArray(termHash));
}
ReferenceContainer<ReferenceType> c0 = this.ram.delete(termHash);
cleanCache();
@ -249,7 +234,6 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
return c1.merge(c0);
} catch (RowSpaceExceededException e) {
// try to free some ram
countCache.clear();
try {
return c1.merge(c0);
} catch (RowSpaceExceededException e1) {
@ -269,21 +253,18 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
public int remove(byte[] termHash, HandleSet urlHashes) throws IOException {
int removed = this.ram.remove(termHash, urlHashes);
int reduced = this.array.replace(termHash, new RemoveRewriter<ReferenceType>(urlHashes));
this.countCache.remove(new ByteArray(termHash));
return removed + (reduced / this.array.rowdef().objectsize);
}
public int remove(byte[] termHash, Set<String> urlHashes) throws IOException {
int removed = this.ram.remove(termHash, urlHashes);
int reduced = this.array.replace(termHash, new RemoveRewriter<ReferenceType>(urlHashes));
this.countCache.remove(new ByteArray(termHash));
return removed + (reduced / this.array.rowdef().objectsize);
}
public boolean remove(byte[] termHash, byte[] urlHashBytes) throws IOException {
boolean removed = this.ram.remove(termHash, urlHashBytes);
int reduced = this.array.replace(termHash, new RemoveRewriter<ReferenceType>(urlHashBytes));
this.countCache.remove(new ByteArray(termHash));
return removed || (reduced > 0);
}
@ -358,7 +339,6 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
public synchronized void clear() throws IOException {
this.ram.clear();
this.array.clear();
this.countCache.clear();
}
/**
@ -371,7 +351,6 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
// close all
this.ram.close();
this.array.close();
this.countCache.clear();
}
public int size() {
@ -407,7 +386,6 @@ public final class IndexCell<ReferenceType extends Reference> extends AbstractBu
*/
private void cleanCache() {
this.countCache.clear();
// dump the cache if necessary
long t = System.currentTimeMillis();

@ -76,11 +76,11 @@ public final class ReferenceContainerArray<ReferenceType extends Reference> {
this.merger = merger;
}
public synchronized void close() {
this.array.close(true);
public void close() {
this.array.close(true);
}
public synchronized void clear() throws IOException {
public void clear() throws IOException {
this.array.clear();
}
@ -110,7 +110,7 @@ public final class ReferenceContainerArray<ReferenceType extends Reference> {
* objects in the cache.
* @throws IOException
*/
public synchronized CloneableIterator<ReferenceContainer<ReferenceType>> wordContainerIterator(final byte[] startWordHash, final boolean rot) {
public CloneableIterator<ReferenceContainer<ReferenceType>> wordContainerIterator(final byte[] startWordHash, final boolean rot) {
try {
return new heapCacheIterator(startWordHash, rot);
} catch (IOException e) {
@ -191,7 +191,7 @@ public final class ReferenceContainerArray<ReferenceType extends Reference> {
* @return true, if the key is used in the heap; false otherwise
* @throws IOException
*/
public synchronized boolean has(final byte[] termHash) {
public boolean has(final byte[] termHash) {
return this.array.has(termHash);
}
@ -242,12 +242,12 @@ public final class ReferenceContainerArray<ReferenceType extends Reference> {
* @return the indexContainer if the cache contained the container, null otherwise
* @throws IOException
*/
public synchronized void delete(final byte[] termHash) throws IOException {
public void delete(final byte[] termHash) throws IOException {
// returns the index that had been deleted
array.remove(termHash);
}
public synchronized int replace(final byte[] termHash, ContainerRewriter<ReferenceType> rewriter) throws IOException {
public int replace(final byte[] termHash, ContainerRewriter<ReferenceType> rewriter) throws IOException {
return array.replace(termHash, new BLOBRewriter(termHash, rewriter));
}
@ -279,7 +279,7 @@ public final class ReferenceContainerArray<ReferenceType extends Reference> {
return this.array.entries();
}
public synchronized boolean shrink(long targetFileSize, long maxFileSize) {
public boolean shrink(long targetFileSize, long maxFileSize) {
if (this.array.entries() < 2) return false;
boolean donesomething = false;

Loading…
Cancel
Save