From 37f892b988b2ecd8da84309ac2bd7afe692d90ea Mon Sep 17 00:00:00 2001 From: orbiter Date: Fri, 20 Mar 2009 14:54:37 +0000 Subject: [PATCH] added new concurrent merger class for IndexCell RWI data git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5735 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- source/de/anomic/crawler/Latency.java | 2 +- source/de/anomic/crawler/NoticedURL.java | 2 +- source/de/anomic/kelondro/blob/BLOBArray.java | 41 ++- source/de/anomic/kelondro/text/IndexCell.java | 7 +- .../text/ReferenceContainerArray.java | 126 +-------- .../text/ReferenceContainerMerger.java | 263 ++++++++++++++++++ source/de/anomic/plasma/plasmaWordIndex.java | 9 +- 7 files changed, 324 insertions(+), 126 deletions(-) create mode 100644 source/de/anomic/kelondro/text/ReferenceContainerMerger.java diff --git a/source/de/anomic/crawler/Latency.java b/source/de/anomic/crawler/Latency.java index e42993e0c..06fda7735 100644 --- a/source/de/anomic/crawler/Latency.java +++ b/source/de/anomic/crawler/Latency.java @@ -150,7 +150,7 @@ public class Latency { waiting = Math.min(60000, waiting); // return time that is remaining - System.out.println("Latency: " + (waiting - timeSinceLastAccess)); + //System.out.println("Latency: " + (waiting - timeSinceLastAccess)); return Math.max(0, waiting - timeSinceLastAccess); } diff --git a/source/de/anomic/crawler/NoticedURL.java b/source/de/anomic/crawler/NoticedURL.java index aca55fb94..aebe86074 100755 --- a/source/de/anomic/crawler/NoticedURL.java +++ b/source/de/anomic/crawler/NoticedURL.java @@ -43,7 +43,7 @@ public class NoticedURL { public static final int STACK_TYPE_MOVIE = 12; // put on movie stack public static final int STACK_TYPE_MUSIC = 13; // put on music stack - public static final long minimumLocalDeltaInit = 0; // the minimum time difference between access of the same local domain + public static final long minimumLocalDeltaInit = 10; // the minimum time difference between access of the same local domain public static final long minimumGlobalDeltaInit = 500; // the minimum time difference between access of the same global domain private Balancer coreStack; // links found by crawling to depth-1 diff --git a/source/de/anomic/kelondro/blob/BLOBArray.java b/source/de/anomic/kelondro/blob/BLOBArray.java index bac514baa..06ec6589f 100755 --- a/source/de/anomic/kelondro/blob/BLOBArray.java +++ b/source/de/anomic/kelondro/blob/BLOBArray.java @@ -160,9 +160,46 @@ public class BLOBArray implements BLOB { } } - public synchronized File unmountOldestBLOB() { + public synchronized File unmountSmallestBLOB() { if (this.blobs.size() == 0) return null; - blobItem b = this.blobs.remove(0); + int bestIndex = -1; + long smallest = Long.MAX_VALUE; + for (int i = 0; i < this.blobs.size(); i++) { + if (this.blobs.get(i).location.length() < smallest) { + smallest = this.blobs.get(i).location.length(); + bestIndex = i; + } + } + blobItem b = this.blobs.remove(bestIndex); + b.blob.close(false); + return b.location; + } + + public synchronized File unmountOldestBLOB(boolean smallestFromFirst2) { + if (this.blobs.size() == 0) return null; + int idx = 0; + if (smallestFromFirst2 && this.blobs.get(1).location.length() < this.blobs.get(0).location.length()) idx = 1; + blobItem b = this.blobs.remove(idx); + b.blob.close(false); + return b.location; + } + + public synchronized File unmountSimilarSizeBLOB(long otherSize) { + if (this.blobs.size() == 0 || otherSize == 0) return null; + blobItem b; + double delta, bestDelta = Double.MAX_VALUE; + int bestIndex = -1; + for (int i = 0; i < this.blobs.size(); i++) { + b = this.blobs.get(i); + if (b.location.length() == 0) continue; + delta = ((double) b.location.length()) / ((double) otherSize); + if (delta < 1.0) delta = 1.0 / delta; + if (delta < bestDelta) { + bestDelta = delta; + bestIndex = i; + } + } + b = this.blobs.remove(bestIndex); b.blob.close(false); return b.location; } diff --git a/source/de/anomic/kelondro/text/IndexCell.java b/source/de/anomic/kelondro/text/IndexCell.java index a5b1af20a..e5b82c77d 100644 --- a/source/de/anomic/kelondro/text/IndexCell.java +++ b/source/de/anomic/kelondro/text/IndexCell.java @@ -61,9 +61,10 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn final ByteOrder wordOrder, final Row payloadrow, final int maxRamEntries, - final int maxArrayFiles + final int maxArrayFiles, + ReferenceContainerMerger merger ) throws IOException { - this.array = new ReferenceContainerArray(cellPath, wordOrder, payloadrow); + this.array = new ReferenceContainerArray(cellPath, wordOrder, payloadrow, merger); this.ram = new ReferenceContainerCache(payloadrow, wordOrder); this.ram.initWriteMode(); this.maxRamEntries = maxRamEntries; @@ -269,7 +270,7 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn this.array.mountBLOBContainer(dumpFile); int c = 0; while (this.array.entries() > this.maxArrayFiles && c++ < 3) { - if (!this.array.mergeOldest()) break; + if (!this.array.merge(true)) break; } } diff --git a/source/de/anomic/kelondro/text/ReferenceContainerArray.java b/source/de/anomic/kelondro/text/ReferenceContainerArray.java index 7b71a9ddc..255e74910 100644 --- a/source/de/anomic/kelondro/text/ReferenceContainerArray.java +++ b/source/de/anomic/kelondro/text/ReferenceContainerArray.java @@ -1,9 +1,7 @@ -// indexContainerBLOBHeap.java +// ReferenceContainerArray.java // (C) 2009 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany // first published 04.01.2009 on http://yacy.net // -// This is a part of YaCy, a peer-to-peer based web search engine -// // $LastChangedDate: 2008-03-14 01:16:04 +0100 (Fr, 14 Mrz 2008) $ // $LastChangedRevision: 4558 $ // $LastChangedBy: orbiter $ @@ -34,17 +32,16 @@ import java.util.List; import de.anomic.kelondro.blob.BLOB; import de.anomic.kelondro.blob.BLOBArray; -import de.anomic.kelondro.blob.HeapWriter; import de.anomic.kelondro.index.Row; import de.anomic.kelondro.index.RowSet; import de.anomic.kelondro.order.ByteOrder; import de.anomic.kelondro.order.CloneableIterator; -import de.anomic.kelondro.text.ReferenceContainerCache.blobFileEntries; public final class ReferenceContainerArray { private final Row payloadrow; private final BLOBArray array; + private final ReferenceContainerMerger merger; /** * open a index container based on a BLOB dump. The content of the BLOB will not be read @@ -59,7 +56,8 @@ public final class ReferenceContainerArray { public ReferenceContainerArray( final File heapLocation, final ByteOrder wordOrder, - final Row payloadrow) throws IOException { + final Row payloadrow, + ReferenceContainerMerger merger) throws IOException { this.payloadrow = payloadrow; this.array = new BLOBArray( heapLocation, @@ -67,6 +65,7 @@ public final class ReferenceContainerArray { payloadrow.primaryKeyLength, wordOrder, 0); + this.merger = merger; } public synchronized void close() { @@ -244,120 +243,13 @@ public final class ReferenceContainerArray { return this.array.entries(); } - public synchronized boolean mergeOldest() throws IOException { + public synchronized boolean merge(boolean similar) throws IOException { if (this.array.entries() < 2) return false; - File f1 = this.array.unmountOldestBLOB(); - File f2 = this.array.unmountOldestBLOB(); - System.out.println("*** DEBUG mergeOldest: vvvvvvvvv array has " + this.array.entries() + " entries vvvvvvvvv"); - System.out.println("*** DEBUG mergeOldest: unmounted " + f1.getName()); - System.out.println("*** DEBUG mergeOldest: unmounted " + f2.getName()); - File newFile = merge(f1, f2); - if (newFile == null) return true; - this.array.mountBLOB(newFile); - System.out.println("*** DEBUG mergeOldest: mounted " + newFile.getName()); - System.out.println("*** DEBUG mergeOldest: ^^^^^^^^^^^ array has " + this.array.entries() + " entries ^^^^^^^^^^^"); + File f1 = this.array.unmountOldestBLOB(similar); + File f2 = (similar) ? this.array.unmountSimilarSizeBLOB(f1.length()) : this.array.unmountOldestBLOB(false); + merger.merge(f1, f2, this.array, this.payloadrow, newContainerBLOBFile()); return true; } - private synchronized File merge(File f1, File f2) throws IOException { - // iterate both files and write a new one - - CloneableIterator i1 = new blobFileEntries(f1, this.payloadrow); - CloneableIterator i2 = new blobFileEntries(f2, this.payloadrow); - if (!i1.hasNext()) { - if (i2.hasNext()) { - if (!f1.delete()) f1.deleteOnExit(); - return f2; - } else { - if (!f1.delete()) f1.deleteOnExit(); - if (!f2.delete()) f2.deleteOnExit(); - return null; - } - } else if (!i2.hasNext()) { - if (!f2.delete()) f2.deleteOnExit(); - return f1; - } - assert i1.hasNext(); - assert i2.hasNext(); - File newFile = newContainerBLOBFile(); - HeapWriter writer = new HeapWriter(newFile, this.array.keylength(), this.array.ordering()); - merge(i1, i2, writer); - writer.close(true); - // we don't need the old files any more - if (!f1.delete()) f1.deleteOnExit(); - if (!f2.delete()) f2.deleteOnExit(); - return newFile; - } - private synchronized void merge(CloneableIterator i1, CloneableIterator i2, HeapWriter writer) throws IOException { - assert i1.hasNext(); - assert i2.hasNext(); - ReferenceContainer c1, c2, c1o, c2o; - c1 = i1.next(); - c2 = i2.next(); - int e; - while (true) { - assert c1 != null; - assert c2 != null; - e = this.array.ordering().compare(c1.getWordHash().getBytes(), c2.getWordHash().getBytes()); - if (e < 0) { - writer.add(c1.getWordHash().getBytes(), c1.exportCollection()); - if (i1.hasNext()) { - c1o = c1; - c1 = i1.next(); - assert this.array.ordering().compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0; - continue; - } - break; - } - if (e > 0) { - writer.add(c2.getWordHash().getBytes(), c2.exportCollection()); - if (i2.hasNext()) { - c2o = c2; - c2 = i2.next(); - assert this.array.ordering().compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0; - continue; - } - break; - } - assert e == 0; - // merge the entries - writer.add(c1.getWordHash().getBytes(), (c1.merge(c2)).exportCollection()); - if (i1.hasNext() && i2.hasNext()) { - c1 = i1.next(); - c2 = i2.next(); - continue; - } - if (i1.hasNext()) c1 = i1.next(); - if (i2.hasNext()) c2 = i2.next(); - break; - - } - // catch up remaining entries - assert !(i1.hasNext() && i2.hasNext()); - while (i1.hasNext()) { - //System.out.println("FLUSH REMAINING 1: " + c1.getWordHash()); - writer.add(c1.getWordHash().getBytes(), c1.exportCollection()); - if (i1.hasNext()) { - c1o = c1; - c1 = i1.next(); - assert this.array.ordering().compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0; - continue; - } - break; - } - while (i2.hasNext()) { - //System.out.println("FLUSH REMAINING 2: " + c2.getWordHash()); - writer.add(c2.getWordHash().getBytes(), c2.exportCollection()); - if (i2.hasNext()) { - c2o = c2; - c2 = i2.next(); - assert this.array.ordering().compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0; - continue; - } - break; - } - // finished with writing - } - } diff --git a/source/de/anomic/kelondro/text/ReferenceContainerMerger.java b/source/de/anomic/kelondro/text/ReferenceContainerMerger.java new file mode 100644 index 000000000..f620c589f --- /dev/null +++ b/source/de/anomic/kelondro/text/ReferenceContainerMerger.java @@ -0,0 +1,263 @@ +// ReferenceContainerArray.java +// (C) 2009 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany +// first published 20.03.2009 on http://yacy.net +// +// $LastChangedDate: 2008-03-14 01:16:04 +0100 (Fr, 14 Mrz 2008) $ +// $LastChangedRevision: 4558 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +package de.anomic.kelondro.text; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.ArrayBlockingQueue; + +import de.anomic.kelondro.blob.BLOBArray; +import de.anomic.kelondro.blob.HeapWriter; +import de.anomic.kelondro.index.Row; +import de.anomic.kelondro.order.ByteOrder; +import de.anomic.kelondro.order.CloneableIterator; +import de.anomic.kelondro.text.ReferenceContainerCache.blobFileEntries; + +/** + * merger class for files from ReferenceContainerArray. + * this is a concurrent merger that can merge single files that are queued for merging. + * when several ReferenceContainerArray classes host their ReferenceContainer file arrays, + * they may share a single ReferenceContainerMerger object which does the sharing for all + * of them. This is the best way to do the merging, because it does heavy IO access and + * such access should not be performed concurrently, but queued. This class is the + * manaagement class for queueing of merge jobs. + * + * to use this class, first instantiate a object and then start the concurrent execution + * of merging with a call to the start() - method. To shut down all mergings, call terminate() + * only once. + */ +public class ReferenceContainerMerger extends Thread { + + private Job poison; + private ArrayBlockingQueue queue; + private ArrayBlockingQueue termi; + + public ReferenceContainerMerger(int queueLength) { + this.poison = new Job(); + this.queue = new ArrayBlockingQueue(queueLength); + this.termi = new ArrayBlockingQueue(1); + } + + public synchronized void terminate() { + if (queue == null || !this.isAlive()) return; + try { + queue.put(poison); + } catch (InterruptedException e) { + e.printStackTrace(); + } + // await termination + try { + termi.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public synchronized void merge(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) { + if (queue == null || !this.isAlive()) { + try { + mergeMount(f1, f2, array, payloadrow, newFile); + } catch (IOException e) { + e.printStackTrace(); + } + } else { + Job job = new Job(f1, f2, array, payloadrow, newFile); + try { + queue.put(job); + } catch (InterruptedException e) { + e.printStackTrace(); + try { + mergeMount(f1, f2, array, payloadrow, newFile); + } catch (IOException ee) { + ee.printStackTrace(); + } + } + } + } + + public void run() { + Job job; + try { + while ((job = queue.take()) != poison) { + job.merge(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + try { + termi.put(poison); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public class Job { + + File f1, f2, newFile; + BLOBArray array; + Row payloadrow; + + public Job() { + this.f1 = null; + this.f2 = null; + this.newFile = null; + this.array = null; + this.payloadrow = null; + } + + public Job(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) { + this.f1 = f1; + this.f2 = f2; + this.newFile = newFile; + this.array = array; + this.payloadrow = payloadrow; + } + + public File merge() { + try { + return mergeMount(f1, f2, array, payloadrow, newFile); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + } + + public static File mergeMount(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) throws IOException { + System.out.println("*** DEBUG mergeOldest: vvvvvvvvv array has " + array.entries() + " entries vvvvvvvvv"); + System.out.println("*** DEBUG mergeOldest: unmounted " + f1.getName()); + System.out.println("*** DEBUG mergeOldest: unmounted " + f2.getName()); + File resultFile = mergeWorker(f1, f2, array, payloadrow, newFile); + if (resultFile == null) return null; + array.mountBLOB(resultFile); + System.out.println("*** DEBUG mergeOldest: mounted " + newFile.getName()); + System.out.println("*** DEBUG mergeOldest: ^^^^^^^^^^^ array has " + array.entries() + " entries ^^^^^^^^^^^"); + return resultFile; + } + + private static File mergeWorker(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) throws IOException { + // iterate both files and write a new one + + CloneableIterator i1 = new blobFileEntries(f1, payloadrow); + CloneableIterator i2 = new blobFileEntries(f2, payloadrow); + if (!i1.hasNext()) { + if (i2.hasNext()) { + if (!f1.delete()) f1.deleteOnExit(); + if (f2.renameTo(newFile)) return newFile; + return f2; + } else { + if (!f1.delete()) f1.deleteOnExit(); + if (!f2.delete()) f2.deleteOnExit(); + return null; + } + } else if (!i2.hasNext()) { + if (!f2.delete()) f2.deleteOnExit(); + if (f1.renameTo(newFile)) return newFile; + return f1; + } + assert i1.hasNext(); + assert i2.hasNext(); + HeapWriter writer = new HeapWriter(newFile, array.keylength(), array.ordering()); + merge(i1, i2, array.ordering(), writer); + writer.close(true); + // we don't need the old files any more + if (!f1.delete()) f1.deleteOnExit(); + if (!f2.delete()) f2.deleteOnExit(); + return newFile; + } + + private static void merge(CloneableIterator i1, CloneableIterator i2, ByteOrder ordering, HeapWriter writer) throws IOException { + assert i1.hasNext(); + assert i2.hasNext(); + ReferenceContainer c1, c2, c1o, c2o; + c1 = i1.next(); + c2 = i2.next(); + int e; + while (true) { + assert c1 != null; + assert c2 != null; + e = ordering.compare(c1.getWordHash().getBytes(), c2.getWordHash().getBytes()); + if (e < 0) { + writer.add(c1.getWordHash().getBytes(), c1.exportCollection()); + if (i1.hasNext()) { + c1o = c1; + c1 = i1.next(); + assert ordering.compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0; + continue; + } + break; + } + if (e > 0) { + writer.add(c2.getWordHash().getBytes(), c2.exportCollection()); + if (i2.hasNext()) { + c2o = c2; + c2 = i2.next(); + assert ordering.compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0; + continue; + } + break; + } + assert e == 0; + // merge the entries + writer.add(c1.getWordHash().getBytes(), (c1.merge(c2)).exportCollection()); + if (i1.hasNext() && i2.hasNext()) { + c1 = i1.next(); + c2 = i2.next(); + continue; + } + if (i1.hasNext()) c1 = i1.next(); + if (i2.hasNext()) c2 = i2.next(); + break; + + } + // catch up remaining entries + assert !(i1.hasNext() && i2.hasNext()); + while (i1.hasNext()) { + //System.out.println("FLUSH REMAINING 1: " + c1.getWordHash()); + writer.add(c1.getWordHash().getBytes(), c1.exportCollection()); + if (i1.hasNext()) { + c1o = c1; + c1 = i1.next(); + assert ordering.compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0; + continue; + } + break; + } + while (i2.hasNext()) { + //System.out.println("FLUSH REMAINING 2: " + c2.getWordHash()); + writer.add(c2.getWordHash().getBytes(), c2.exportCollection()); + if (i2.hasNext()) { + c2o = c2; + c2 = i2.next(); + assert ordering.compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0; + continue; + } + break; + } + // finished with writing + } + +} diff --git a/source/de/anomic/plasma/plasmaWordIndex.java b/source/de/anomic/plasma/plasmaWordIndex.java index 2da564477..ffa1011b6 100644 --- a/source/de/anomic/plasma/plasmaWordIndex.java +++ b/source/de/anomic/plasma/plasmaWordIndex.java @@ -47,6 +47,7 @@ import de.anomic.kelondro.text.BufferedIndexCollection; import de.anomic.kelondro.text.IndexCell; import de.anomic.kelondro.text.MetadataRowContainer; import de.anomic.kelondro.text.ReferenceContainer; +import de.anomic.kelondro.text.ReferenceContainerMerger; import de.anomic.kelondro.text.ReferenceRow; import de.anomic.kelondro.text.MetadataRepository; import de.anomic.kelondro.text.Word; @@ -96,7 +97,8 @@ public final class plasmaWordIndex { public CrawlProfile.entry defaultTextSnippetLocalProfile, defaultTextSnippetGlobalProfile; public CrawlProfile.entry defaultMediaSnippetLocalProfile, defaultMediaSnippetGlobalProfile; private final File queuesRoot; - + private ReferenceContainerMerger merger; + public plasmaWordIndex( final String networkName, final Log log, @@ -130,12 +132,14 @@ public final class plasmaWordIndex { } } } + this.merger = (useCell) ? new ReferenceContainerMerger(1) : null; + if (this.merger != null) this.merger.start(); this.index = (useCell) ? new IndexCell( new File(indexPrimaryTextLocation, "RICELL"), wordOrder, ReferenceRow.urlEntryRow, - entityCacheMaxSize, 10) : + entityCacheMaxSize, 10, this.merger) : new BufferedIndexCollection( indexPrimaryTextLocation, wordOrder, @@ -411,6 +415,7 @@ public final class plasmaWordIndex { } public void close() { + if (this.merger != null) this.merger.terminate(); index.close(); metadata.close(); peers.close();