diff --git a/source/de/anomic/crawler/Balancer.java b/source/de/anomic/crawler/Balancer.java index e2aedb241..939b02e0e 100644 --- a/source/de/anomic/crawler/Balancer.java +++ b/source/de/anomic/crawler/Balancer.java @@ -32,6 +32,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import net.yacy.kelondro.index.BufferedObjectIndex; import net.yacy.kelondro.index.HandleSet; import net.yacy.kelondro.index.ObjectIndex; import net.yacy.kelondro.index.Row; @@ -49,12 +50,13 @@ public class Balancer { private static final String indexSuffix = "9.db"; private static final int EcoFSBufferSize = 1000; + private static final int objectIndexBufferSize = 1000; // class variables private final ConcurrentHashMap> domainStacks; // a map from domain name part to Lists with url hashs private final ConcurrentLinkedQueue top; private final TreeMap delayed; - private ObjectIndex urlFileIndex; + private BufferedObjectIndex urlFileIndex; private final File cacheStacksPath; private long minimumLocalDelta; private long minimumGlobalDelta; @@ -81,10 +83,10 @@ public class Balancer { cacheStacksPath.mkdirs(); final File f = new File(cacheStacksPath, stackname + indexSuffix); try { - urlFileIndex = new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727); + urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, EcoFSBufferSize, 0, useTailCache, exceed134217727), objectIndexBufferSize); } catch (RowSpaceExceededException e) { try { - urlFileIndex = new Table(f, Request.rowdef, 0, 0, false, exceed134217727); + urlFileIndex = new BufferedObjectIndex(new Table(f, Request.rowdef, 0, 0, false, exceed134217727), objectIndexBufferSize); } catch (RowSpaceExceededException e1) { Log.logException(e1); } @@ -491,7 +493,8 @@ public class Balancer { this.domainStacks.clear(); //synchronized (this.delayed) { delayed.clear(); } this.lastDomainStackFill = System.currentTimeMillis(); - final CloneableIterator i = this.urlFileIndex.keys(true, null); + final HandleSet handles = this.urlFileIndex.keysFromBuffer(objectIndexBufferSize / 2); + final CloneableIterator i = handles.keys(true, null); while (i.hasNext()) { pushHashToDomainStacks(i.next(), 1000); if (this.domainStacks.size() > maxdomstacksize) break; diff --git a/source/de/anomic/search/Switchboard.java b/source/de/anomic/search/Switchboard.java index 971fc15e0..84e19b51d 100644 --- a/source/de/anomic/search/Switchboard.java +++ b/source/de/anomic/search/Switchboard.java @@ -1047,22 +1047,16 @@ public final class Switchboard extends serverSwitch { crawlQueues.urlRemove(hash); } - public void urlRemove(final Segments.Process process, final byte[] hash) { - indexSegments.urlMetadata(process).remove(hash); - crawlResults.remove(new String(hash)); - crawlQueues.urlRemove(hash); - } - public DigestURI getURL(final Segments.Process process, final byte[] urlhash) { if (urlhash == null) return null; if (urlhash.length == 0) return null; - final DigestURI ne = crawlQueues.getURL(urlhash); - if (ne != null) return ne; final URIMetadataRow le = indexSegments.urlMetadata(process).load(urlhash, null, 0); - if (le == null) return null; - Components metadata = le.metadata(); - if (metadata == null) return null; - return metadata.url(); + if (le != null) { + Components metadata = le.metadata(); + if (metadata == null) return null; + return metadata.url(); + } + return crawlQueues.getURL(urlhash); } public RankingProfile getRanking() { @@ -1927,7 +1921,7 @@ public final class Switchboard extends serverSwitch { // 1000 <= wantedPPM : maximum performance if (wPPM <= 10) wPPM = 10; if (wPPM >= 30000) wPPM = 30000; - final int newBusySleep = 30000 / wPPM; // for wantedPPM = 10: 6000; for wantedPPM = 1000: 60 + final int newBusySleep = 60000 / wPPM; // for wantedPPM = 10: 6000; for wantedPPM = 1000: 60 BusyThread thread; diff --git a/source/net/yacy/kelondro/index/BufferedObjectIndex.java b/source/net/yacy/kelondro/index/BufferedObjectIndex.java new file mode 100644 index 000000000..02dea9826 --- /dev/null +++ b/source/net/yacy/kelondro/index/BufferedObjectIndex.java @@ -0,0 +1,261 @@ +/** + * BufferedObjectIndex + * Copyright 2010 by Michael Peter Christen + * First released 18.4.2010 at http://yacy.net + * + * This file is part of YaCy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this program in the file COPYING.LESSER. + * If not, see . + */ + +package net.yacy.kelondro.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; + +import net.yacy.kelondro.index.Row.Entry; +import net.yacy.kelondro.logging.Log; +import net.yacy.kelondro.order.CloneableIterator; +import net.yacy.kelondro.order.MergeIterator; + +/** + * a write buffer for ObjectIndex entries + * @author Michael Peter Christen + * + */ +public class BufferedObjectIndex implements ObjectIndex, Iterable { + + private final ObjectIndex backend; + private final RowSet buffer; + private final int buffersize; + private final Row.EntryComparator entryComparator; + + public BufferedObjectIndex(ObjectIndex backend, int buffersize) { + this.backend = backend; + this.buffersize = buffersize; + this.buffer = new RowSet(backend.row()); + this.entryComparator = new Row.EntryComparator(backend.row().objectOrder); + } + + private final void flushBuffer() throws IOException, RowSpaceExceededException { + if (this.buffer.size() > 0) { + for (Row.Entry e: this.buffer) { + this.backend.put(e); + } + this.buffer.clear(); + } + } + + /** + * check size of buffer in such a way that a put into the buffer is possible + * afterwards without exceeding the given maximal buffersize + * @throws RowSpaceExceededException + * @throws IOException + */ + private final void checkBuffer() throws IOException, RowSpaceExceededException { + if (this.buffer.size() >= this.buffersize) flushBuffer(); + } + + public void addUnique(Entry row) throws RowSpaceExceededException, IOException { + synchronized (this.backend) { + checkBuffer(); + this.buffer.put(row); + } + } + + public void clear() throws IOException { + synchronized (this.backend) { + this.backend.clear(); + this.buffer.clear(); + } + } + + public void close() { + synchronized (this.backend) { + try { + flushBuffer(); + } catch (IOException e) { + Log.logException(e); + } catch (RowSpaceExceededException e) { + Log.logException(e); + } + this.backend.close(); + } + } + + public void deleteOnExit() { + this.backend.deleteOnExit(); + } + + public String filename() { + return this.backend.filename(); + } + + public int size() { + synchronized (this.backend) { + return this.buffer.size() + this.backend.size(); + } + } + + public Entry get(byte[] key) throws IOException { + synchronized (this.backend) { + Entry entry = this.buffer.get(key); + if (entry != null) return entry; + return this.backend.get(key); + } + } + + public boolean has(byte[] key) { + synchronized (this.backend) { + return this.buffer.has(key) || this.backend.has(key); + } + } + + public boolean isEmpty() { + synchronized (this.backend) { + return this.buffer.isEmpty() && this.backend.isEmpty(); + } + } + + public void put(Entry row) throws IOException, RowSpaceExceededException { + synchronized (this.backend) { + checkBuffer(); + this.buffer.put(row); + } + } + + public Entry remove(byte[] key) throws IOException { + synchronized (this.backend) { + Entry entry = this.buffer.remove(key); + if (entry != null) return entry; + return this.backend.remove(key); + } + } + + public ArrayList removeDoubles() throws IOException, RowSpaceExceededException { + synchronized (this.backend) { + flushBuffer(); + return this.backend.removeDoubles(); + } + } + + public Entry removeOne() throws IOException { + synchronized (this.backend) { + if (!this.buffer.isEmpty()) { + Entry entry = this.buffer.removeOne(); + if (entry != null) return entry; + } + return this.backend.removeOne(); + } + } + + public Entry replace(Entry row) throws RowSpaceExceededException, IOException { + synchronized (this.backend) { + Entry entry = this.buffer.replace(row); + if (entry != null) return entry; + return this.backend.replace(row); + } + } + + public Row row() { + return this.buffer.row(); + } + + public CloneableIterator keys(boolean up, byte[] firstKey) throws IOException { + synchronized (this.backend) { + return new MergeIterator( + this.buffer.keys(up, firstKey), + this.backend.keys(up, firstKey), + this.buffer.rowdef.getOrdering(), + MergeIterator.simpleMerge, + true); + } + } + + public Iterator iterator() { + try { + return this.rows(); + } catch (IOException e) { + Log.logException(e); + return null; + } + } + + public CloneableIterator rows(boolean up, byte[] firstKey) throws IOException { + synchronized (this.backend) { + return new MergeIterator( + this.buffer.rows(up, firstKey), + this.backend.rows(up, firstKey), + this.entryComparator, + MergeIterator.simpleMerge, + true); + } + } + + public CloneableIterator rows() throws IOException { + synchronized (this.backend) { + return new MergeIterator( + this.buffer.rows(), + this.backend.rows(), + this.entryComparator, + MergeIterator.simpleMerge, + true); + } + } + + /** + * special iterator for BufferedObjectIndex: + * iterates only objects from the buffer. The use case for this iterator is given + * if first elements are iterated and then all iterated elements are deleted from the index. + * To minimize the IO load the buffer is filled from the backend in such a way that + * it creates a minimum of Read/Write-Head operations which is done using the removeOne() method. + * The buffer will be filled with the demanded number of records. The given load value does + * not denote the number of removeOne() operations but the number of records that are missing in the + * buffer to provide the give load number of record entries. + * The given load number must not exceed the maximal number of entries in the buffer. + * To give room for put()-inserts while the iterator is running it is recommended to set the load + * value at maximum to the maximum number of entries in the buffer divided by two. + * @param load number of records that shall be in the buffer when returning the buffer iterator + * @return an iterator of the elements in the buffer. + * @throws IOException + */ + public HandleSet keysFromBuffer(int load) throws IOException { + if (load > this.buffersize) throw new IOException("buffer load size exceeded"); + synchronized (this.backend) { + int missing = Math.min(this.backend.size(), load - this.buffer.size()); + while (missing-- > 0) { + try { + this.buffer.put(this.backend.removeOne()); + } catch (RowSpaceExceededException e) { + Log.logException(e); + break; + } + } + HandleSet handles = new HandleSet(this.buffer.row().primaryKeyLength, this.buffer.row().objectOrder, this.buffer.size()); + Iterator i = this.buffer.keys(); + while (i.hasNext()) { + try { + handles.put(i.next()); + } catch (RowSpaceExceededException e) { + Log.logException(e); + break; + } + } + return handles; + } + } + +} diff --git a/source/net/yacy/kelondro/index/ObjectIndex.java b/source/net/yacy/kelondro/index/ObjectIndex.java index 0d91274d5..6de591b3b 100644 --- a/source/net/yacy/kelondro/index/ObjectIndex.java +++ b/source/net/yacy/kelondro/index/ObjectIndex.java @@ -38,7 +38,7 @@ import java.util.Iterator; import net.yacy.kelondro.order.CloneableIterator; -public interface ObjectIndex { +public interface ObjectIndex extends Iterable { public String filename(); // returns a unique identified for this index; can be a real or artificial file name public int size(); diff --git a/source/net/yacy/kelondro/rwi/IODispatcher.java b/source/net/yacy/kelondro/rwi/IODispatcher.java index 3e8ae83df..9c95a631f 100644 --- a/source/net/yacy/kelondro/rwi/IODispatcher.java +++ b/source/net/yacy/kelondro/rwi/IODispatcher.java @@ -66,7 +66,7 @@ public class IODispatcher extends Thread { this.terminate = false; } - public synchronized void terminate() { + public void terminate() { if (termination != null && controlQueue != null && this.isAlive()) { this.terminate = true; this.controlQueue.release(); diff --git a/source/net/yacy/kelondro/rwi/IndexCell.java b/source/net/yacy/kelondro/rwi/IndexCell.java index 7a6d05c0f..f797eaefa 100644 --- a/source/net/yacy/kelondro/rwi/IndexCell.java +++ b/source/net/yacy/kelondro/rwi/IndexCell.java @@ -73,7 +73,6 @@ public final class IndexCell extends AbstractBu private long lastCleanup, lastDump; private final long targetFileSize, maxFileSize; private final int writeBufferSize; - private final ARC countCache; private Semaphore dumperSemaphore = new Semaphore(1); private Semaphore cleanerSemaphore = new Semaphore(1); @@ -100,7 +99,6 @@ public final class IndexCell extends AbstractBu this.targetFileSize = targetFileSize; this.maxFileSize = maxFileSize; this.writeBufferSize = writeBufferSize; - this.countCache = new SimpleARC(1000); //cleanCache(); } @@ -163,27 +161,16 @@ public final class IndexCell extends AbstractBu */ public int count(byte[] termHash) { - // check if value is in cache - ByteArray ba = new ByteArray(termHash); - Integer countCache = this.countCache.get(ba); int countFile; - if (countCache == null) { - // read fresh values from file - ReferenceContainer c1; - try { - c1 = this.array.get(termHash); - } catch (Exception e) { - Log.logException(e); - c1 = null; - } - countFile = (c1 == null) ? 0 : c1.size(); - - // store to cache - this.countCache.put(ba, countFile); - } else { - // value was in ram - countFile = countCache.intValue(); + // read fresh values from file + ReferenceContainer c1; + try { + c1 = this.array.get(termHash); + } catch (Exception e) { + Log.logException(e); + c1 = null; } + countFile = (c1 == null) ? 0 : c1.size(); // count from container in ram ReferenceContainer countRam = this.ram.get(termHash, null); @@ -215,7 +202,6 @@ public final class IndexCell extends AbstractBu return c1.merge(c0); } catch (RowSpaceExceededException e) { // try to free some ram - countCache.clear(); try { return c1.merge(c0); } catch (RowSpaceExceededException e1) { @@ -239,7 +225,6 @@ public final class IndexCell extends AbstractBu } if (c1 != null) { this.array.delete(termHash); - this.countCache.remove(new ByteArray(termHash)); } ReferenceContainer c0 = this.ram.delete(termHash); cleanCache(); @@ -249,7 +234,6 @@ public final class IndexCell extends AbstractBu return c1.merge(c0); } catch (RowSpaceExceededException e) { // try to free some ram - countCache.clear(); try { return c1.merge(c0); } catch (RowSpaceExceededException e1) { @@ -269,21 +253,18 @@ public final class IndexCell extends AbstractBu public int remove(byte[] termHash, HandleSet urlHashes) throws IOException { int removed = this.ram.remove(termHash, urlHashes); int reduced = this.array.replace(termHash, new RemoveRewriter(urlHashes)); - this.countCache.remove(new ByteArray(termHash)); return removed + (reduced / this.array.rowdef().objectsize); } public int remove(byte[] termHash, Set urlHashes) throws IOException { int removed = this.ram.remove(termHash, urlHashes); int reduced = this.array.replace(termHash, new RemoveRewriter(urlHashes)); - this.countCache.remove(new ByteArray(termHash)); return removed + (reduced / this.array.rowdef().objectsize); } public boolean remove(byte[] termHash, byte[] urlHashBytes) throws IOException { boolean removed = this.ram.remove(termHash, urlHashBytes); int reduced = this.array.replace(termHash, new RemoveRewriter(urlHashBytes)); - this.countCache.remove(new ByteArray(termHash)); return removed || (reduced > 0); } @@ -358,7 +339,6 @@ public final class IndexCell extends AbstractBu public synchronized void clear() throws IOException { this.ram.clear(); this.array.clear(); - this.countCache.clear(); } /** @@ -371,7 +351,6 @@ public final class IndexCell extends AbstractBu // close all this.ram.close(); this.array.close(); - this.countCache.clear(); } public int size() { @@ -407,7 +386,6 @@ public final class IndexCell extends AbstractBu */ private void cleanCache() { - this.countCache.clear(); // dump the cache if necessary long t = System.currentTimeMillis(); diff --git a/source/net/yacy/kelondro/rwi/ReferenceContainerArray.java b/source/net/yacy/kelondro/rwi/ReferenceContainerArray.java index 226395b55..170646622 100644 --- a/source/net/yacy/kelondro/rwi/ReferenceContainerArray.java +++ b/source/net/yacy/kelondro/rwi/ReferenceContainerArray.java @@ -76,11 +76,11 @@ public final class ReferenceContainerArray { this.merger = merger; } - public synchronized void close() { - this.array.close(true); + public void close() { + this.array.close(true); } - public synchronized void clear() throws IOException { + public void clear() throws IOException { this.array.clear(); } @@ -110,7 +110,7 @@ public final class ReferenceContainerArray { * objects in the cache. * @throws IOException */ - public synchronized CloneableIterator> wordContainerIterator(final byte[] startWordHash, final boolean rot) { + public CloneableIterator> wordContainerIterator(final byte[] startWordHash, final boolean rot) { try { return new heapCacheIterator(startWordHash, rot); } catch (IOException e) { @@ -191,7 +191,7 @@ public final class ReferenceContainerArray { * @return true, if the key is used in the heap; false otherwise * @throws IOException */ - public synchronized boolean has(final byte[] termHash) { + public boolean has(final byte[] termHash) { return this.array.has(termHash); } @@ -242,12 +242,12 @@ public final class ReferenceContainerArray { * @return the indexContainer if the cache contained the container, null otherwise * @throws IOException */ - public synchronized void delete(final byte[] termHash) throws IOException { + public void delete(final byte[] termHash) throws IOException { // returns the index that had been deleted array.remove(termHash); } - public synchronized int replace(final byte[] termHash, ContainerRewriter rewriter) throws IOException { + public int replace(final byte[] termHash, ContainerRewriter rewriter) throws IOException { return array.replace(termHash, new BLOBRewriter(termHash, rewriter)); } @@ -279,7 +279,7 @@ public final class ReferenceContainerArray { return this.array.entries(); } - public synchronized boolean shrink(long targetFileSize, long maxFileSize) { + public boolean shrink(long targetFileSize, long maxFileSize) { if (this.array.entries() < 2) return false; boolean donesomething = false;