From 8b8107b2a36453685279769d3b59691a3ac6eb34 Mon Sep 17 00:00:00 2001 From: orbiter Date: Sun, 18 Apr 2010 21:55:20 +0000 Subject: [PATCH] reduced IO-load and synchronization/blocking - enhanced the Balancer performance when building new domain stacks using a new Table buffer - added the new Table buffer BufferedObjectIndex class - changed order of access to LURL-read (prefereing segment over Crawl Queues) will reduced blocking time on balancer - fixed PPM setting in Crawler_p servlet (had doubled values) - reduced synchronization in IndexCell because it is not necessary: reduced blocking during indexing/merging/dumping - removed did-you-mean cache in IndexCell because that caused too much overhead and more memory usage but was not very useful. This reduced also deadlocks that could be causes when searched are performed during indexing. git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6819 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/de/anomic/crawler/Balancer.java | 11 +- source/de/anomic/search/Switchboard.java | 20 +- .../kelondro/index/BufferedObjectIndex.java | 261 ++++++++++++++++++ .../net/yacy/kelondro/index/ObjectIndex.java | 2 +- .../net/yacy/kelondro/rwi/IODispatcher.java | 2 +- source/net/yacy/kelondro/rwi/IndexCell.java | 38 +-- .../kelondro/rwi/ReferenceContainerArray.java | 16 +- 7 files changed, 293 insertions(+), 57 deletions(-) create mode 100644 source/net/yacy/kelondro/index/BufferedObjectIndex.java diff --git a/source/de/anomic/crawler/Balancer.java b/source/de/anomic/crawler/Balancer.java index e2aedb241..939b02e0e 100644 --- a/source/de/anomic/crawler/Balancer.java +++ b/source/de/anomic/crawler/Balancer.java @@ -32,6 +32,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import net.yacy.kelondro.index.BufferedObjectIndex; import net.yacy.kelondro.index.HandleSet; import net.yacy.kelondro.index.ObjectIndex; import net.yacy.kelondro.index.Row; @@ -49,12 +50,13 @@ public class Balancer { private static final String indexSuffix = "9.db"; private static final int EcoFSBufferSize = 1000; + private static final int objectIndexBufferSize = 1000; // class variables private final ConcurrentHashMap> domainStacks; // a map from domain name part to Lists with url hashs private final ConcurrentLinkedQueue top; private final TreeMap delayed; - private ObjectIndex urlFileIndex; + private BufferedObjectIndex urlFileIndex; private final File cacheStacksPath; private long minimumLocalDelta; private long minimumGlobalDelta; @@ -81,10 +83,10 @@ public class Balancer { cacheStacksPath.mkdirs(); final File f = new File(cacheStacksPath, stackname + indexSuffix); try { - urlFileIndex = new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727); + urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727), objectIndexBufferSize); } catch (RowSpaceExceededException e) { try { - urlFileIndex = new Table(f, Request.rowdef, 0, 0, false, exceed134217727); + urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727), objectIndexBufferSize); } catch (RowSpaceExceededException e1) { Log.logException(e1); } @@ -491,7 +493,8 @@ public class Balancer { this.domainStacks.clear(); //synchronized (this.delayed) { delayed.clear(); } this.lastDomainStackFill = System.currentTimeMillis(); - final CloneableIterator i = this.urlFileIndex.keys(true, null); + final HandleSet handles = this.urlFileIndex.keysFromBuffer(objectIndexBufferSize / 2); + final CloneableIterator i = handles.keys(true, null); while (i.hasNext()) { pushHashToDomainStacks(i.next(), 1000); if (this.domainStacks.size() > maxdomstacksize) break; diff --git a/source/de/anomic/search/Switchboard.java b/source/de/anomic/search/Switchboard.java index 971fc15e0..84e19b51d 100644 --- a/source/de/anomic/search/Switchboard.java +++ b/source/de/anomic/search/Switchboard.java @@ -1047,22 +1047,16 @@ public final class Switchboard extends serverSwitch { crawlQueues.urlRemove(hash); } - public void urlRemove(final Segments.Process process, final byte[] hash) { - indexSegments.urlMetadata(process).remove(hash); - crawlResults.remove(new String(hash)); - crawlQueues.urlRemove(hash); - } - public DigestURI getURL(final Segments.Process process, final byte[] urlhash) { if (urlhash == null) return null; if (urlhash.length == 0) return null; - final DigestURI ne = crawlQueues.getURL(urlhash); - if (ne != null) return ne; final URIMetadataRow le = indexSegments.urlMetadata(process).load(urlhash, null, 0); - if (le == null) return null; - Components metadata = le.metadata(); - if (metadata == null) return null; - return metadata.url(); + if (le != null) { + Components metadata = le.metadata(); + if (metadata == null) return null; + return metadata.url(); + } + return crawlQueues.getURL(urlhash); } public RankingProfile getRanking() { @@ -1927,7 +1921,7 @@ public final class Switchboard extends serverSwitch { // 1000 <= wantedPPM : maximum performance if (wPPM <= 10) wPPM = 10; if (wPPM >= 30000) wPPM = 30000; - final int newBusySleep = 30000 / wPPM; // for wantedPPM = 10: 6000; for wantedPPM = 1000: 60 + final int newBusySleep = 60000 / wPPM; // for wantedPPM = 10: 6000; for wantedPPM = 1000: 60 BusyThread thread; diff --git a/source/net/yacy/kelondro/index/BufferedObjectIndex.java b/source/net/yacy/kelondro/index/BufferedObjectIndex.java new file mode 100644 index 000000000..02dea9826 --- /dev/null +++ b/source/net/yacy/kelondro/index/BufferedObjectIndex.java @@ -0,0 +1,261 @@ +/** + * BufferedObjectIndex + * Copyright 2010 by Michael Peter Christen + * First released 18.4.2010 at http://yacy.net + * + * This file is part of YaCy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program in the file COPYING.LESSER. + * If not, see . + */ + +package net.yacy.kelondro.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; + +import net.yacy.kelondro.index.Row.Entry; +import net.yacy.kelondro.logging.Log; +import net.yacy.kelondro.order.CloneableIterator; +import net.yacy.kelondro.order.MergeIterator; + +/** + * a write buffer for ObjectIndex entries + * @author Michael Peter Christen + * + */ +public class BufferedObjectIndex implements ObjectIndex, Iterable { + + private final ObjectIndex backend; + private final RowSet buffer; + private final int buffersize; + private final Row.EntryComparator entryComparator; + + public BufferedObjectIndex(ObjectIndex backend, int buffersize) { + this.backend = backend; + this.buffersize = buffersize; + this.buffer = new RowSet(backend.row()); + this.entryComparator = new Row.EntryComparator(backend.row().objectOrder); + } + + private final void flushBuffer() throws IOException, RowSpaceExceededException { + if (this.buffer.size() > 0) { + for (Row.Entry e: this.buffer) { + this.backend.put(e); + } + this.buffer.clear(); + } + } + + /** + * check size of buffer in such a way that a put into the buffer is possible + * afterwards without exceeding the given maximal buffersize + * @throws RowSpaceExceededException + * @throws IOException + */ + private final void checkBuffer() throws IOException, RowSpaceExceededException { + if (this.buffer.size() >= this.buffersize) flushBuffer(); + } + + public void addUnique(Entry row) throws RowSpaceExceededException, IOException { + synchronized (this.backend) { + checkBuffer(); + this.buffer.put(row); + } + } + + public void clear() throws IOException { + synchronized (this.backend) { + this.backend.clear(); + this.buffer.clear(); + } + } + + public void close() { + synchronized (this.backend) { + try { + flushBuffer(); + } catch (IOException e) { + Log.logException(e); + } catch (RowSpaceExceededException e) { + Log.logException(e); + } + this.backend.close(); + } + } + + public void deleteOnExit() { + this.backend.deleteOnExit(); + } + + public String filename() { + return this.backend.filename(); + } + + public int size() { + synchronized (this.backend) { + return this.buffer.size() + this.backend.size(); + } + } + + public Entry get(byte[] key) throws IOException { + synchronized (this.backend) { + Entry entry = this.buffer.get(key); + if (entry != null) return entry; + return this.backend.get(key); + } + } + + public boolean has(byte[] key) { + synchronized (this.backend) { + return this.buffer.has(key) || this.backend.has(key); + } + } + + public boolean isEmpty() { + synchronized (this.backend) { + return this.buffer.isEmpty() && this.backend.isEmpty(); + } + } + + public void put(Entry row) throws IOException, RowSpaceExceededException { + synchronized (this.backend) { + checkBuffer(); + this.buffer.put(row); + } + } + + public Entry remove(byte[] key) throws IOException { + synchronized (this.backend) { + Entry entry = this.buffer.remove(key); + if (entry != null) return entry; + return this.backend.remove(key); + } + } + + public ArrayList removeDoubles() throws IOException, RowSpaceExceededException { + synchronized (this.backend) { + flushBuffer(); + return this.backend.removeDoubles(); + } + } + + public Entry removeOne() throws IOException { + synchronized (this.backend) { + if (!this.buffer.isEmpty()) { + Entry entry = this.buffer.removeOne(); + if (entry != null) return entry; + } + return this.backend.removeOne(); + } + } + + public Entry replace(Entry row) throws RowSpaceExceededException, IOException { + synchronized (this.backend) { + Entry entry = this.buffer.replace(row); + if (entry != null) return entry; + return this.backend.replace(row); + } + } + + public Row row() { + return this.buffer.row(); + } + + public CloneableIterator keys(boolean up, byte[] firstKey) throws IOException { + synchronized (this.backend) { + return new MergeIterator( + this.buffer.keys(up, firstKey), + this.backend.keys(up, firstKey), + this.buffer.rowdef.getOrdering(), + MergeIterator.simpleMerge, + true); + } + } + + public Iterator iterator() { + try { + return this.rows(); + } catch (IOException e) { + Log.logException(e); + return null; + } + } + + public CloneableIterator rows(boolean up, byte[] firstKey) throws IOException { + synchronized (this.backend) { + return new MergeIterator( + this.buffer.rows(up, firstKey), + this.backend.rows(up, firstKey), + this.entryComparator, + MergeIterator.simpleMerge, + true); + } + } + + public CloneableIterator rows() throws IOException { + synchronized (this.backend) { + return new MergeIterator( + this.buffer.rows(), + this.backend.rows(), + this.entryComparator, + MergeIterator.simpleMerge, + true); + } + } + + /** + * special iterator for BufferedObjectIndex: + * iterates only objects from the buffer. The use case for this iterator is given + * if first elements are iterated and then all iterated elements are deleted from the index. + * To minimize the IO load the buffer is filled from the backend in such a way that + * it creates a minimum of Read/Write-Head operations which is done using the removeOne() method. + * The buffer will be filled with the demanded number of records. The given load value does + * not denote the number of removeOne() operations but the number of records that are missing in the + * buffer to provide the give load number of record entries. + * The given load number must not exceed the maximal number of entries in the buffer. + * To give room for put()-inserts while the iterator is running it is recommended to set the load + * value at maximum to the maximum number of entries in the buffer divided by two. + * @param load number of records that shall be in the buffer when returning the buffer iterator + * @return an iterator of the elements in the buffer. + * @throws IOException + */ + public HandleSet keysFromBuffer(int load) throws IOException { + if (load > this.buffersize) throw new IOException("buffer load size exceeded"); + synchronized (this.backend) { + int missing = Math.min(this.backend.size(), load - this.buffer.size()); + while (missing-- > 0) { + try { + this.buffer.put(this.backend.removeOne()); + } catch (RowSpaceExceededException e) { + Log.logException(e); + break; + } + } + HandleSet handles = new HandleSet(this.buffer.row().primaryKeyLength, this.buffer.row().objectOrder, this.buffer.size()); + Iterator i = this.buffer.keys(); + while (i.hasNext()) { + try { + handles.put(i.next()); + } catch (RowSpaceExceededException e) { + Log.logException(e); + break; + } + } + return handles; + } + } + +} diff --git a/source/net/yacy/kelondro/index/ObjectIndex.java b/source/net/yacy/kelondro/index/ObjectIndex.java index 0d91274d5..6de591b3b 100644 --- a/source/net/yacy/kelondro/index/ObjectIndex.java +++ b/source/net/yacy/kelondro/index/ObjectIndex.java @@ -38,7 +38,7 @@ import java.util.Iterator; import net.yacy.kelondro.order.CloneableIterator; -public interface ObjectIndex { +public interface ObjectIndex extends Iterable { public String filename(); // returns a unique identified for this index; can be a real or artificial file name public int size(); diff --git a/source/net/yacy/kelondro/rwi/IODispatcher.java b/source/net/yacy/kelondro/rwi/IODispatcher.java index 3e8ae83df..9c95a631f 100644 --- a/source/net/yacy/kelondro/rwi/IODispatcher.java +++ b/source/net/yacy/kelondro/rwi/IODispatcher.java @@ -66,7 +66,7 @@ public class IODispatcher extends Thread { this.terminate = false; } - public synchronized void terminate() { + public void terminate() { if (termination != null && controlQueue != null && this.isAlive()) { this.terminate = true; this.controlQueue.release(); diff --git a/source/net/yacy/kelondro/rwi/IndexCell.java b/source/net/yacy/kelondro/rwi/IndexCell.java index 7a6d05c0f..f797eaefa 100644 --- a/source/net/yacy/kelondro/rwi/IndexCell.java +++ b/source/net/yacy/kelondro/rwi/IndexCell.java @@ -73,7 +73,6 @@ public final class IndexCell extends AbstractBu private long lastCleanup, lastDump; private final long targetFileSize, maxFileSize; private final int writeBufferSize; - private final ARC countCache; private Semaphore dumperSemaphore = new Semaphore(1); private Semaphore cleanerSemaphore = new Semaphore(1); @@ -100,7 +99,6 @@ public final class IndexCell extends AbstractBu this.targetFileSize = targetFileSize; this.maxFileSize = maxFileSize; this.writeBufferSize = writeBufferSize; - this.countCache = new SimpleARC(1000); //cleanCache(); } @@ -163,27 +161,16 @@ public final class IndexCell extends AbstractBu */ public int count(byte[] termHash) { - // check if value is in cache - ByteArray ba = new ByteArray(termHash); - Integer countCache = this.countCache.get(ba); int countFile; - if (countCache == null) { - // read fresh values from file - ReferenceContainer c1; - try { - c1 = this.array.get(termHash); - } catch (Exception e) { - Log.logException(e); - c1 = null; - } - countFile = (c1 == null) ? 0 : c1.size(); - - // store to cache - this.countCache.put(ba, countFile); - } else { - // value was in ram - countFile = countCache.intValue(); + // read fresh values from file + ReferenceContainer c1; + try { + c1 = this.array.get(termHash); + } catch (Exception e) { + Log.logException(e); + c1 = null; } + countFile = (c1 == null) ? 0 : c1.size(); // count from container in ram ReferenceContainer countRam = this.ram.get(termHash, null); @@ -215,7 +202,6 @@ public final class IndexCell extends AbstractBu return c1.merge(c0); } catch (RowSpaceExceededException e) { // try to free some ram - countCache.clear(); try { return c1.merge(c0); } catch (RowSpaceExceededException e1) { @@ -239,7 +225,6 @@ public final class IndexCell extends AbstractBu } if (c1 != null) { this.array.delete(termHash); - this.countCache.remove(new ByteArray(termHash)); } ReferenceContainer c0 = this.ram.delete(termHash); cleanCache(); @@ -249,7 +234,6 @@ public final class IndexCell extends AbstractBu return c1.merge(c0); } catch (RowSpaceExceededException e) { // try to free some ram - countCache.clear(); try { return c1.merge(c0); } catch (RowSpaceExceededException e1) { @@ -269,21 +253,18 @@ public final class IndexCell extends AbstractBu public int remove(byte[] termHash, HandleSet urlHashes) throws IOException { int removed = this.ram.remove(termHash, urlHashes); int reduced = this.array.replace(termHash, new RemoveRewriter(urlHashes)); - this.countCache.remove(new ByteArray(termHash)); return removed + (reduced / this.array.rowdef().objectsize); } public int remove(byte[] termHash, Set urlHashes) throws IOException { int removed = this.ram.remove(termHash, urlHashes); int reduced = this.array.replace(termHash, new RemoveRewriter(urlHashes)); - this.countCache.remove(new ByteArray(termHash)); return removed + (reduced / this.array.rowdef().objectsize); } public boolean remove(byte[] termHash, byte[] urlHashBytes) throws IOException { boolean removed = this.ram.remove(termHash, urlHashBytes); int reduced = this.array.replace(termHash, new RemoveRewriter(urlHashBytes)); - this.countCache.remove(new ByteArray(termHash)); return removed || (reduced > 0); } @@ -358,7 +339,6 @@ public final class IndexCell extends AbstractBu public synchronized void clear() throws IOException { this.ram.clear(); this.array.clear(); - this.countCache.clear(); } /** @@ -371,7 +351,6 @@ public final class IndexCell extends AbstractBu // close all this.ram.close(); this.array.close(); - this.countCache.clear(); } public int size() { @@ -407,7 +386,6 @@ public final class IndexCell extends AbstractBu */ private void cleanCache() { - this.countCache.clear(); // dump the cache if necessary long t = System.currentTimeMillis(); diff --git a/source/net/yacy/kelondro/rwi/ReferenceContainerArray.java b/source/net/yacy/kelondro/rwi/ReferenceContainerArray.java index 226395b55..170646622 100644 --- a/source/net/yacy/kelondro/rwi/ReferenceContainerArray.java +++ b/source/net/yacy/kelondro/rwi/ReferenceContainerArray.java @@ -76,11 +76,11 @@ public final class ReferenceContainerArray { this.merger = merger; } - public synchronized void close() { - this.array.close(true); + public void close() { + this.array.close(true); } - public synchronized void clear() throws IOException { + public void clear() throws IOException { this.array.clear(); } @@ -110,7 +110,7 @@ public final class ReferenceContainerArray { * objects in the cache. * @throws IOException */ - public synchronized CloneableIterator> wordContainerIterator(final byte[] startWordHash, final boolean rot) { + public CloneableIterator> wordContainerIterator(final byte[] startWordHash, final boolean rot) { try { return new heapCacheIterator(startWordHash, rot); } catch (IOException e) { @@ -191,7 +191,7 @@ public final class ReferenceContainerArray { * @return true, if the key is used in the heap; false otherwise * @throws IOException */ - public synchronized boolean has(final byte[] termHash) { + public boolean has(final byte[] termHash) { return this.array.has(termHash); } @@ -242,12 +242,12 @@ public final class ReferenceContainerArray { * @return the indexContainer if the cache contained the container, null otherwise * @throws IOException */ - public synchronized void delete(final byte[] termHash) throws IOException { + public void delete(final byte[] termHash) throws IOException { // returns the index that had been deleted array.remove(termHash); } - public synchronized int replace(final byte[] termHash, ContainerRewriter rewriter) throws IOException { + public int replace(final byte[] termHash, ContainerRewriter rewriter) throws IOException { return array.replace(termHash, new BLOBRewriter(termHash, rewriter)); } @@ -279,7 +279,7 @@ public final class ReferenceContainerArray { return this.array.entries(); } - public synchronized boolean shrink(long targetFileSize, long maxFileSize) { + public boolean shrink(long targetFileSize, long maxFileSize) { if (this.array.entries() < 2) return false; boolean donesomething = false;