added new concurrent merger class for IndexCell RWI data

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5735 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 16 years ago
parent 8c494afcfe
commit 37f892b988

@ -150,7 +150,7 @@ public class Latency {
waiting = Math.min(60000, waiting); waiting = Math.min(60000, waiting);
// return time that is remaining // return time that is remaining
System.out.println("Latency: " + (waiting - timeSinceLastAccess)); //System.out.println("Latency: " + (waiting - timeSinceLastAccess));
return Math.max(0, waiting - timeSinceLastAccess); return Math.max(0, waiting - timeSinceLastAccess);
} }

@ -43,7 +43,7 @@ public class NoticedURL {
public static final int STACK_TYPE_MOVIE = 12; // put on movie stack public static final int STACK_TYPE_MOVIE = 12; // put on movie stack
public static final int STACK_TYPE_MUSIC = 13; // put on music stack public static final int STACK_TYPE_MUSIC = 13; // put on music stack
public static final long minimumLocalDeltaInit = 0; // the minimum time difference between access of the same local domain public static final long minimumLocalDeltaInit = 10; // the minimum time difference between access of the same local domain
public static final long minimumGlobalDeltaInit = 500; // the minimum time difference between access of the same global domain public static final long minimumGlobalDeltaInit = 500; // the minimum time difference between access of the same global domain
private Balancer coreStack; // links found by crawling to depth-1 private Balancer coreStack; // links found by crawling to depth-1

@ -160,9 +160,46 @@ public class BLOBArray implements BLOB {
} }
} }
public synchronized File unmountOldestBLOB() { public synchronized File unmountSmallestBLOB() {
if (this.blobs.size() == 0) return null; if (this.blobs.size() == 0) return null;
blobItem b = this.blobs.remove(0); int bestIndex = -1;
long smallest = Long.MAX_VALUE;
for (int i = 0; i < this.blobs.size(); i++) {
if (this.blobs.get(i).location.length() < smallest) {
smallest = this.blobs.get(i).location.length();
bestIndex = i;
}
}
blobItem b = this.blobs.remove(bestIndex);
b.blob.close(false);
return b.location;
}
public synchronized File unmountOldestBLOB(boolean smallestFromFirst2) {
if (this.blobs.size() == 0) return null;
int idx = 0;
if (smallestFromFirst2 && this.blobs.get(1).location.length() < this.blobs.get(0).location.length()) idx = 1;
blobItem b = this.blobs.remove(idx);
b.blob.close(false);
return b.location;
}
public synchronized File unmountSimilarSizeBLOB(long otherSize) {
if (this.blobs.size() == 0 || otherSize == 0) return null;
blobItem b;
double delta, bestDelta = Double.MAX_VALUE;
int bestIndex = -1;
for (int i = 0; i < this.blobs.size(); i++) {
b = this.blobs.get(i);
if (b.location.length() == 0) continue;
delta = ((double) b.location.length()) / ((double) otherSize);
if (delta < 1.0) delta = 1.0 / delta;
if (delta < bestDelta) {
bestDelta = delta;
bestIndex = i;
}
}
b = this.blobs.remove(bestIndex);
b.blob.close(false); b.blob.close(false);
return b.location; return b.location;
} }

@ -61,9 +61,10 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn
final ByteOrder wordOrder, final ByteOrder wordOrder,
final Row payloadrow, final Row payloadrow,
final int maxRamEntries, final int maxRamEntries,
final int maxArrayFiles final int maxArrayFiles,
ReferenceContainerMerger merger
) throws IOException { ) throws IOException {
this.array = new ReferenceContainerArray(cellPath, wordOrder, payloadrow); this.array = new ReferenceContainerArray(cellPath, wordOrder, payloadrow, merger);
this.ram = new ReferenceContainerCache(payloadrow, wordOrder); this.ram = new ReferenceContainerCache(payloadrow, wordOrder);
this.ram.initWriteMode(); this.ram.initWriteMode();
this.maxRamEntries = maxRamEntries; this.maxRamEntries = maxRamEntries;
@ -269,7 +270,7 @@ public final class IndexCell extends AbstractBufferedIndex implements BufferedIn
this.array.mountBLOBContainer(dumpFile); this.array.mountBLOBContainer(dumpFile);
int c = 0; int c = 0;
while (this.array.entries() > this.maxArrayFiles && c++ < 3) { while (this.array.entries() > this.maxArrayFiles && c++ < 3) {
if (!this.array.mergeOldest()) break; if (!this.array.merge(true)) break;
} }
} }

@ -1,9 +1,7 @@
// indexContainerBLOBHeap.java // ReferenceContainerArray.java
// (C) 2009 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany // (C) 2009 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 04.01.2009 on http://yacy.net // first published 04.01.2009 on http://yacy.net
// //
// This is a part of YaCy, a peer-to-peer based web search engine
//
// $LastChangedDate: 2008-03-14 01:16:04 +0100 (Fr, 14 Mrz 2008) $ // $LastChangedDate: 2008-03-14 01:16:04 +0100 (Fr, 14 Mrz 2008) $
// $LastChangedRevision: 4558 $ // $LastChangedRevision: 4558 $
// $LastChangedBy: orbiter $ // $LastChangedBy: orbiter $
@ -34,17 +32,16 @@ import java.util.List;
import de.anomic.kelondro.blob.BLOB; import de.anomic.kelondro.blob.BLOB;
import de.anomic.kelondro.blob.BLOBArray; import de.anomic.kelondro.blob.BLOBArray;
import de.anomic.kelondro.blob.HeapWriter;
import de.anomic.kelondro.index.Row; import de.anomic.kelondro.index.Row;
import de.anomic.kelondro.index.RowSet; import de.anomic.kelondro.index.RowSet;
import de.anomic.kelondro.order.ByteOrder; import de.anomic.kelondro.order.ByteOrder;
import de.anomic.kelondro.order.CloneableIterator; import de.anomic.kelondro.order.CloneableIterator;
import de.anomic.kelondro.text.ReferenceContainerCache.blobFileEntries;
public final class ReferenceContainerArray { public final class ReferenceContainerArray {
private final Row payloadrow; private final Row payloadrow;
private final BLOBArray array; private final BLOBArray array;
private final ReferenceContainerMerger merger;
/** /**
* open a index container based on a BLOB dump. The content of the BLOB will not be read * open a index container based on a BLOB dump. The content of the BLOB will not be read
@ -59,7 +56,8 @@ public final class ReferenceContainerArray {
public ReferenceContainerArray( public ReferenceContainerArray(
final File heapLocation, final File heapLocation,
final ByteOrder wordOrder, final ByteOrder wordOrder,
final Row payloadrow) throws IOException { final Row payloadrow,
ReferenceContainerMerger merger) throws IOException {
this.payloadrow = payloadrow; this.payloadrow = payloadrow;
this.array = new BLOBArray( this.array = new BLOBArray(
heapLocation, heapLocation,
@ -67,6 +65,7 @@ public final class ReferenceContainerArray {
payloadrow.primaryKeyLength, payloadrow.primaryKeyLength,
wordOrder, wordOrder,
0); 0);
this.merger = merger;
} }
public synchronized void close() { public synchronized void close() {
@ -244,120 +243,13 @@ public final class ReferenceContainerArray {
return this.array.entries(); return this.array.entries();
} }
public synchronized boolean mergeOldest() throws IOException { public synchronized boolean merge(boolean similar) throws IOException {
if (this.array.entries() < 2) return false; if (this.array.entries() < 2) return false;
File f1 = this.array.unmountOldestBLOB(); File f1 = this.array.unmountOldestBLOB(similar);
File f2 = this.array.unmountOldestBLOB(); File f2 = (similar) ? this.array.unmountSimilarSizeBLOB(f1.length()) : this.array.unmountOldestBLOB(false);
System.out.println("*** DEBUG mergeOldest: vvvvvvvvv array has " + this.array.entries() + " entries vvvvvvvvv"); merger.merge(f1, f2, this.array, this.payloadrow, newContainerBLOBFile());
System.out.println("*** DEBUG mergeOldest: unmounted " + f1.getName());
System.out.println("*** DEBUG mergeOldest: unmounted " + f2.getName());
File newFile = merge(f1, f2);
if (newFile == null) return true;
this.array.mountBLOB(newFile);
System.out.println("*** DEBUG mergeOldest: mounted " + newFile.getName());
System.out.println("*** DEBUG mergeOldest: ^^^^^^^^^^^ array has " + this.array.entries() + " entries ^^^^^^^^^^^");
return true; return true;
} }
private synchronized File merge(File f1, File f2) throws IOException {
// iterate both files and write a new one
CloneableIterator<ReferenceContainer> i1 = new blobFileEntries(f1, this.payloadrow);
CloneableIterator<ReferenceContainer> i2 = new blobFileEntries(f2, this.payloadrow);
if (!i1.hasNext()) {
if (i2.hasNext()) {
if (!f1.delete()) f1.deleteOnExit();
return f2;
} else {
if (!f1.delete()) f1.deleteOnExit();
if (!f2.delete()) f2.deleteOnExit();
return null;
}
} else if (!i2.hasNext()) {
if (!f2.delete()) f2.deleteOnExit();
return f1;
}
assert i1.hasNext();
assert i2.hasNext();
File newFile = newContainerBLOBFile();
HeapWriter writer = new HeapWriter(newFile, this.array.keylength(), this.array.ordering());
merge(i1, i2, writer);
writer.close(true);
// we don't need the old files any more
if (!f1.delete()) f1.deleteOnExit();
if (!f2.delete()) f2.deleteOnExit();
return newFile;
}
private synchronized void merge(CloneableIterator<ReferenceContainer> i1, CloneableIterator<ReferenceContainer> i2, HeapWriter writer) throws IOException {
assert i1.hasNext();
assert i2.hasNext();
ReferenceContainer c1, c2, c1o, c2o;
c1 = i1.next();
c2 = i2.next();
int e;
while (true) {
assert c1 != null;
assert c2 != null;
e = this.array.ordering().compare(c1.getWordHash().getBytes(), c2.getWordHash().getBytes());
if (e < 0) {
writer.add(c1.getWordHash().getBytes(), c1.exportCollection());
if (i1.hasNext()) {
c1o = c1;
c1 = i1.next();
assert this.array.ordering().compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
if (e > 0) {
writer.add(c2.getWordHash().getBytes(), c2.exportCollection());
if (i2.hasNext()) {
c2o = c2;
c2 = i2.next();
assert this.array.ordering().compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
assert e == 0;
// merge the entries
writer.add(c1.getWordHash().getBytes(), (c1.merge(c2)).exportCollection());
if (i1.hasNext() && i2.hasNext()) {
c1 = i1.next();
c2 = i2.next();
continue;
}
if (i1.hasNext()) c1 = i1.next();
if (i2.hasNext()) c2 = i2.next();
break;
}
// catch up remaining entries
assert !(i1.hasNext() && i2.hasNext());
while (i1.hasNext()) {
//System.out.println("FLUSH REMAINING 1: " + c1.getWordHash());
writer.add(c1.getWordHash().getBytes(), c1.exportCollection());
if (i1.hasNext()) {
c1o = c1;
c1 = i1.next();
assert this.array.ordering().compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
while (i2.hasNext()) {
//System.out.println("FLUSH REMAINING 2: " + c2.getWordHash());
writer.add(c2.getWordHash().getBytes(), c2.exportCollection());
if (i2.hasNext()) {
c2o = c2;
c2 = i2.next();
assert this.array.ordering().compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
// finished with writing
}
} }

@ -0,0 +1,263 @@
// ReferenceContainerArray.java
// (C) 2009 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 20.03.2009 on http://yacy.net
//
// $LastChangedDate: 2008-03-14 01:16:04 +0100 (Fr, 14 Mrz 2008) $
// $LastChangedRevision: 4558 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.kelondro.text;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import de.anomic.kelondro.blob.BLOBArray;
import de.anomic.kelondro.blob.HeapWriter;
import de.anomic.kelondro.index.Row;
import de.anomic.kelondro.order.ByteOrder;
import de.anomic.kelondro.order.CloneableIterator;
import de.anomic.kelondro.text.ReferenceContainerCache.blobFileEntries;
/**
* merger class for files from ReferenceContainerArray.
* this is a concurrent merger that can merge single files that are queued for merging.
* when several ReferenceContainerArray classes host their ReferenceContainer file arrays,
* they may share a single ReferenceContainerMerger object which does the sharing for all
* of them. This is the best way to do the merging, because it does heavy IO access and
* such access should not be performed concurrently, but queued. This class is the
* manaagement class for queueing of merge jobs.
*
* to use this class, first instantiate a object and then start the concurrent execution
* of merging with a call to the start() - method. To shut down all mergings, call terminate()
* only once.
*/
public class ReferenceContainerMerger extends Thread {
private Job poison;
private ArrayBlockingQueue<Job> queue;
private ArrayBlockingQueue<Job> termi;
public ReferenceContainerMerger(int queueLength) {
this.poison = new Job();
this.queue = new ArrayBlockingQueue<Job>(queueLength);
this.termi = new ArrayBlockingQueue<Job>(1);
}
public synchronized void terminate() {
if (queue == null || !this.isAlive()) return;
try {
queue.put(poison);
} catch (InterruptedException e) {
e.printStackTrace();
}
// await termination
try {
termi.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public synchronized void merge(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) {
if (queue == null || !this.isAlive()) {
try {
mergeMount(f1, f2, array, payloadrow, newFile);
} catch (IOException e) {
e.printStackTrace();
}
} else {
Job job = new Job(f1, f2, array, payloadrow, newFile);
try {
queue.put(job);
} catch (InterruptedException e) {
e.printStackTrace();
try {
mergeMount(f1, f2, array, payloadrow, newFile);
} catch (IOException ee) {
ee.printStackTrace();
}
}
}
}
public void run() {
Job job;
try {
while ((job = queue.take()) != poison) {
job.merge();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
termi.put(poison);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public class Job {
File f1, f2, newFile;
BLOBArray array;
Row payloadrow;
public Job() {
this.f1 = null;
this.f2 = null;
this.newFile = null;
this.array = null;
this.payloadrow = null;
}
public Job(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) {
this.f1 = f1;
this.f2 = f2;
this.newFile = newFile;
this.array = array;
this.payloadrow = payloadrow;
}
public File merge() {
try {
return mergeMount(f1, f2, array, payloadrow, newFile);
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
public static File mergeMount(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) throws IOException {
System.out.println("*** DEBUG mergeOldest: vvvvvvvvv array has " + array.entries() + " entries vvvvvvvvv");
System.out.println("*** DEBUG mergeOldest: unmounted " + f1.getName());
System.out.println("*** DEBUG mergeOldest: unmounted " + f2.getName());
File resultFile = mergeWorker(f1, f2, array, payloadrow, newFile);
if (resultFile == null) return null;
array.mountBLOB(resultFile);
System.out.println("*** DEBUG mergeOldest: mounted " + newFile.getName());
System.out.println("*** DEBUG mergeOldest: ^^^^^^^^^^^ array has " + array.entries() + " entries ^^^^^^^^^^^");
return resultFile;
}
private static File mergeWorker(File f1, File f2, BLOBArray array, Row payloadrow, File newFile) throws IOException {
// iterate both files and write a new one
CloneableIterator<ReferenceContainer> i1 = new blobFileEntries(f1, payloadrow);
CloneableIterator<ReferenceContainer> i2 = new blobFileEntries(f2, payloadrow);
if (!i1.hasNext()) {
if (i2.hasNext()) {
if (!f1.delete()) f1.deleteOnExit();
if (f2.renameTo(newFile)) return newFile;
return f2;
} else {
if (!f1.delete()) f1.deleteOnExit();
if (!f2.delete()) f2.deleteOnExit();
return null;
}
} else if (!i2.hasNext()) {
if (!f2.delete()) f2.deleteOnExit();
if (f1.renameTo(newFile)) return newFile;
return f1;
}
assert i1.hasNext();
assert i2.hasNext();
HeapWriter writer = new HeapWriter(newFile, array.keylength(), array.ordering());
merge(i1, i2, array.ordering(), writer);
writer.close(true);
// we don't need the old files any more
if (!f1.delete()) f1.deleteOnExit();
if (!f2.delete()) f2.deleteOnExit();
return newFile;
}
private static void merge(CloneableIterator<ReferenceContainer> i1, CloneableIterator<ReferenceContainer> i2, ByteOrder ordering, HeapWriter writer) throws IOException {
assert i1.hasNext();
assert i2.hasNext();
ReferenceContainer c1, c2, c1o, c2o;
c1 = i1.next();
c2 = i2.next();
int e;
while (true) {
assert c1 != null;
assert c2 != null;
e = ordering.compare(c1.getWordHash().getBytes(), c2.getWordHash().getBytes());
if (e < 0) {
writer.add(c1.getWordHash().getBytes(), c1.exportCollection());
if (i1.hasNext()) {
c1o = c1;
c1 = i1.next();
assert ordering.compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
if (e > 0) {
writer.add(c2.getWordHash().getBytes(), c2.exportCollection());
if (i2.hasNext()) {
c2o = c2;
c2 = i2.next();
assert ordering.compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
assert e == 0;
// merge the entries
writer.add(c1.getWordHash().getBytes(), (c1.merge(c2)).exportCollection());
if (i1.hasNext() && i2.hasNext()) {
c1 = i1.next();
c2 = i2.next();
continue;
}
if (i1.hasNext()) c1 = i1.next();
if (i2.hasNext()) c2 = i2.next();
break;
}
// catch up remaining entries
assert !(i1.hasNext() && i2.hasNext());
while (i1.hasNext()) {
//System.out.println("FLUSH REMAINING 1: " + c1.getWordHash());
writer.add(c1.getWordHash().getBytes(), c1.exportCollection());
if (i1.hasNext()) {
c1o = c1;
c1 = i1.next();
assert ordering.compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
while (i2.hasNext()) {
//System.out.println("FLUSH REMAINING 2: " + c2.getWordHash());
writer.add(c2.getWordHash().getBytes(), c2.exportCollection());
if (i2.hasNext()) {
c2o = c2;
c2 = i2.next();
assert ordering.compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
// finished with writing
}
}

@ -47,6 +47,7 @@ import de.anomic.kelondro.text.BufferedIndexCollection;
import de.anomic.kelondro.text.IndexCell; import de.anomic.kelondro.text.IndexCell;
import de.anomic.kelondro.text.MetadataRowContainer; import de.anomic.kelondro.text.MetadataRowContainer;
import de.anomic.kelondro.text.ReferenceContainer; import de.anomic.kelondro.text.ReferenceContainer;
import de.anomic.kelondro.text.ReferenceContainerMerger;
import de.anomic.kelondro.text.ReferenceRow; import de.anomic.kelondro.text.ReferenceRow;
import de.anomic.kelondro.text.MetadataRepository; import de.anomic.kelondro.text.MetadataRepository;
import de.anomic.kelondro.text.Word; import de.anomic.kelondro.text.Word;
@ -96,6 +97,7 @@ public final class plasmaWordIndex {
public CrawlProfile.entry defaultTextSnippetLocalProfile, defaultTextSnippetGlobalProfile; public CrawlProfile.entry defaultTextSnippetLocalProfile, defaultTextSnippetGlobalProfile;
public CrawlProfile.entry defaultMediaSnippetLocalProfile, defaultMediaSnippetGlobalProfile; public CrawlProfile.entry defaultMediaSnippetLocalProfile, defaultMediaSnippetGlobalProfile;
private final File queuesRoot; private final File queuesRoot;
private ReferenceContainerMerger merger;
public plasmaWordIndex( public plasmaWordIndex(
final String networkName, final String networkName,
@ -130,12 +132,14 @@ public final class plasmaWordIndex {
} }
} }
} }
this.merger = (useCell) ? new ReferenceContainerMerger(1) : null;
if (this.merger != null) this.merger.start();
this.index = (useCell) ? this.index = (useCell) ?
new IndexCell( new IndexCell(
new File(indexPrimaryTextLocation, "RICELL"), new File(indexPrimaryTextLocation, "RICELL"),
wordOrder, wordOrder,
ReferenceRow.urlEntryRow, ReferenceRow.urlEntryRow,
entityCacheMaxSize, 10) : entityCacheMaxSize, 10, this.merger) :
new BufferedIndexCollection( new BufferedIndexCollection(
indexPrimaryTextLocation, indexPrimaryTextLocation,
wordOrder, wordOrder,
@ -411,6 +415,7 @@ public final class plasmaWordIndex {
} }
public void close() { public void close() {
if (this.merger != null) this.merger.terminate();
index.close(); index.close();
metadata.close(); metadata.close();
peers.close(); peers.close();

Loading…
Cancel
Save