fast database initialization and fast start.up of yacy:

- applied knowledge about concurrent files stream reading and index processing from the wikimedia reader
   to the EcoTable initialization process: the file reader is now concurrent to the index generation
- changed also some initialization processes to avoid some pauses during initialization

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5354 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 17 years ago
parent ba5b274b8c
commit b0f2003792

@ -0,0 +1,307 @@
// kelondroChunkIterator.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 14.01.2008 on http://yacy.net
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
package de.anomic.kelondro;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class kelondroChunkIterator implements Iterator<byte[]> {
private final int chunksize;
/**
* create a ChunkIterator
* a ChunkIterator uses a BufferedInputStream to iterate through the file
* and is therefore a fast option to get all elements in the file as a sequence
* @param file: the file
* @param recordsize: the size of the elements in the file
* @param chunksize: the size of the chunks that are returned by next(). remaining bytes until the lenght of recordsize are skipped
* @throws FileNotFoundException
*/
/*
private final DataInputStream stream;
private byte[] nextBytes;
public kelondroChunkIterator(final File file, final int recordsize, final int chunksize) throws FileNotFoundException {
assert (file.exists());
assert file.length() % recordsize == 0;
this.recordsize = recordsize;
this.chunksize = chunksize;
this.stream = new DataInputStream(new BufferedInputStream(new FileInputStream(file), 64 * 1024));
this.nextBytes = next0();
}
public boolean hasNext() {
return nextBytes != null;
}
public byte[] next0() {
final byte[] chunk = new byte[chunksize];
int r, s;
try {
// read the chunk
this.stream.readFully(chunk);
// skip remaining bytes
r = chunksize;
while (r < recordsize) {
s = (int) this.stream.skip(recordsize - r);
assert s > 0;
if (s <= 0) return null;
r += s;
}
return chunk;
} catch (final IOException e) {
return null;
}
}
public byte[] next() {
final byte[] n = this.nextBytes;
this.nextBytes = next0();
return n;
}
public void remove() {
throw new UnsupportedOperationException();
}
*/
ExecutorService service = Executors.newFixedThreadPool(2);
filechunkProducer producer;
filechunkSlicer slicer;
Future<Integer> producerResult;
Future<Integer> slicerResult;
byte[] nextRecord;
public kelondroChunkIterator(final File file, final int recordsize, final int chunksize) throws FileNotFoundException {
assert (file.exists());
assert file.length() % recordsize == 0;
this.chunksize = chunksize;
service = Executors.newFixedThreadPool(2);
// buffer size and count calculation is critical, because wrong values
// will cause blocking of the concurrent consumer/producer threads
int filebuffersize = 1024 * 16;
int chunkbuffercountmin = filebuffersize / recordsize + 1; // minimum
int filebuffercount = 1024 * 1024 / filebuffersize; // max 1 MB
int chunkbuffercount = chunkbuffercountmin * filebuffercount + 1;
producer = new filechunkProducer(file, filebuffersize, filebuffercount);
slicer = new filechunkSlicer(producer, recordsize, chunksize, chunkbuffercount);
producerResult = service.submit(producer);
slicerResult = service.submit(slicer);
service.shutdown();
nextRecord = slicer.consume();
}
public boolean hasNext() {
return nextRecord != null;
}
public byte[] next() {
if (nextRecord == null) return null;
byte[] n = new byte[chunksize];
System.arraycopy(nextRecord, 0, n, 0, chunksize);
slicer.recycle(nextRecord);
nextRecord = slicer.consume();
return n;
}
public void remove() {
throw new UnsupportedOperationException();
}
private static class filechunkSlicer implements Callable<Integer> {
private filechunkProducer producer;
private static byte[] poison = new byte[0];
private BlockingQueue<byte[]> empty;
private BlockingQueue<byte[]> slices;
private int slicesize, head;
public filechunkSlicer(filechunkProducer producer, final int slicesize, int head, int stacksize) throws FileNotFoundException {
assert producer != null;
this.producer = producer;
this.slices = new ArrayBlockingQueue<byte[]>(stacksize);
this.empty = new ArrayBlockingQueue<byte[]>(stacksize);
this.slicesize = slicesize;
this.head = head;
// fill the empty queue
for (int i = 0; i < stacksize; i++) empty.add(new byte[head]);
}
public void recycle(byte[] c) {
try {
empty.put(c);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public byte[] consume() {
try {
byte[] b = slices.take(); // leer
if (b == poison) return null; else return b;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}
private void slice(byte[] from, int startfrom, byte[] to, int startto, int len) {
if (startto >= head) return;
if (startto + len > head) len = head - startto;
assert to.length == head;
System.arraycopy(from, startfrom, to, startto, len);
}
public Integer call() {
filechunk c;
int p;
try {
byte[] slice = empty.take();
int slicec = 0;
consumer: while(true) {
c = producer.consume();
if (c == null) {
// finished. put poison into slices queue
slices.put(poison);
break consumer;
}
p = 0;
// copy as much as possible to the current slice
slicefiller: while (true) {
assert slicesize > slicec;
if (c.n - p >= slicesize - slicec) {
// a full slice can be produced
slice(c.b, p, slice, slicec, slicesize - slicec);
// the slice is now full
p += slicesize - slicec;
slices.put(slice);
slice = empty.take();
slicec = 0;
continue slicefiller;
} else {
// fill only a part of the slice and wait for next chunk
slice(c.b, p, slice, slicec, c.n - p);
// the chunk is now fully read
producer.recycle(c);
slicec += c.n - p;
continue consumer;
}
}
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return new Integer(0);
}
}
private static class filechunk {
public byte[] b;
public int n;
public filechunk(int len) {
b = new byte[len];
n = 0;
}
}
/**
* the filechunkProducer reads an in put file and stores chunks of the results
* into a buffer. All elements stored in the buffer must be recycled.
* The class does not allocate more memory than a given chunk size multiplied with a
* number of chunks that shall be stored in a queue for processing.
*/
private static class filechunkProducer implements Callable<Integer> {
private BlockingQueue<filechunk> empty;
private BlockingQueue<filechunk> filed;
private static filechunk poison = new filechunk(0);
private FileInputStream fis;
public filechunkProducer(File in, int bufferSize, int bufferCount) throws FileNotFoundException {
empty = new ArrayBlockingQueue<filechunk>(bufferCount);
filed = new ArrayBlockingQueue<filechunk>(bufferCount);
fis = new FileInputStream(in);
// fill the empty queue
for (int i = 0; i < bufferCount; i++) empty.add(new filechunk(bufferSize));
}
public void recycle(filechunk c) {
try {
empty.put(c);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public filechunk consume() {
try {
filechunk f = filed.take(); // leer
if (f == poison) return null; else return f;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}
public Integer call() {
try {
filechunk c;
while(true) {
c = empty.take(); // leer
c.n = fis.read(c.b);
if (c.n <= 0) break;
filed.put(c);
}
// put poison into consumer queue so he can stop consuming
filed.put(poison);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
return new Integer(0);
}
}
}

@ -24,15 +24,11 @@
package de.anomic.kelondro;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Iterator;
/**
* The EcoFS is a flat file with records of fixed length. The file does not contain
@ -556,66 +552,7 @@ public class kelondroEcoFS {
assert this.buffercount == 0;
this.raf.setLength((s - 1) * this.recordsize);
}
public static class ChunkIterator implements Iterator<byte[]> {
private final int recordsize, chunksize;
private final DataInputStream stream;
private byte[] nextBytes;
/**
* create a ChunkIterator
* a ChunkIterator uses a BufferedInputStream to iterate through the file
* and is therefore a fast option to get all elements in the file as a sequence
* @param file: the eco-file
* @param recordsize: the size of the elements in the file
* @param chunksize: the size of the chunks that are returned by next(). remaining bytes until the lenght of recordsize are skipped
* @throws FileNotFoundException
*/
public ChunkIterator(final File file, final int recordsize, final int chunksize) throws FileNotFoundException {
assert (file.exists());
assert file.length() % recordsize == 0;
this.recordsize = recordsize;
this.chunksize = chunksize;
this.stream = new DataInputStream(new BufferedInputStream(new FileInputStream(file), 64 * 1024));
this.nextBytes = next0();
}
public boolean hasNext() {
return nextBytes != null;
}
public byte[] next0() {
final byte[] chunk = new byte[chunksize];
int r, s;
try {
// read the chunk
this.stream.readFully(chunk);
// skip remaining bytes
r = chunksize;
while (r < recordsize) {
s = (int) this.stream.skip(recordsize - r);
assert s != 0;
r += s;
}
return chunk;
} catch (final IOException e) {
return null;
}
}
public byte[] next() {
final byte[] n = this.nextBytes;
this.nextBytes = next0();
return n;
}
public void remove() {
throw new UnsupportedOperationException();
}
}
/**
* main - writes some data and checks the tables size (with time measureing)
* @param args

@ -126,26 +126,19 @@ public class kelondroEcoTable implements kelondroIndex {
int i = 0;
byte[] key;
if (table == null) {
final Iterator<byte[]> ki = keyIterator(tablefile, rowdef);
final Iterator<byte[]> ki = new kelondroChunkIterator(tablefile, rowdef.objectsize, rowdef.primaryKeyLength);
while (ki.hasNext()) {
key = ki.next();
// write the key into the index table
assert key != null;
if (key == null) {i++; continue;}
if (!index.addi(key, i++)) fail++;
assert index.size() + fail == i : "index.size() = " + index.size() + ", i = " + i + ", fail = " + fail + ", key = '" + new String(key) + "'";
/*
if ((i % 10000) == 0) {
System.out.print('.');
System.out.flush();
}
*/
}
} else {
byte[] record;
key = new byte[rowdef.primaryKeyLength];
final Iterator<byte[]> ri = new kelondroEcoFS.ChunkIterator(tablefile, rowdef.objectsize, rowdef.objectsize);
final Iterator<byte[]> ri = new kelondroChunkIterator(tablefile, rowdef.objectsize, rowdef.objectsize);
while (ri.hasNext()) {
record = ri.next();
assert record != null;
@ -219,18 +212,6 @@ public class kelondroEcoTable implements kelondroIndex {
return serverMemory.available() < minmemremaining;
}
/**
* a KeyIterator
* @param file: the eco-file
* @param rowdef: the row definition
* @throws FileNotFoundException
* @return an iterator for all keys in the file
*/
public Iterator<byte[]> keyIterator(final File file, final kelondroRow rowdef) throws FileNotFoundException {
assert rowdef.primaryKeyIndex == 0;
return new kelondroEcoFS.ChunkIterator(file, rowdef.objectsize, rowdef.primaryKeyLength);
}
public static long tableSize(final File tablefile, final int recordsize) {
// returns number of records in table
return kelondroEcoFS.tableSize(tablefile, recordsize);

@ -313,7 +313,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
log.logConfig("Starting YaCy Protocol Core");
this.yc = new yacyCore(this);
serverInstantBusyThread.oneTimeJob(this, "loadSeedLists", yacyCore.log, 0);
final long startedSeedListAquisition = System.currentTimeMillis();
//final long startedSeedListAquisition = System.currentTimeMillis();
// set up local robots.txt
this.robotstxtConfig = httpdRobotsTxtConfig.init(this);
@ -558,7 +558,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch<IndexingStack.
// init robinson cluster
// before we do that, we wait some time until the seed list is loaded.
while (((System.currentTimeMillis() - startedSeedListAquisition) < 8000) && (this.webIndex.seedDB.sizeConnected() == 0)) try {Thread.sleep(1000);} catch (final InterruptedException e) {}
//while (((System.currentTimeMillis() - startedSeedListAquisition) < 8000) && (this.webIndex.seedDB.sizeConnected() == 0)) try {Thread.sleep(1000);} catch (final InterruptedException e) {}
try {Thread.sleep(1000);} catch (final InterruptedException e) {}
this.clusterhashes = this.webIndex.seedDB.clusterHashes(getConfig("cluster.peers.yacydomain", ""));

@ -28,6 +28,7 @@ package de.anomic.server;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
@ -118,37 +119,6 @@ public final class serverCodings {
}
return null;
}
/*
public static byte[] encodeMD5Raw(final File file) {
try {
final MessageDigest digest = MessageDigest.getInstance("MD5");
digest.reset();
// we read directly from a FileInputStream
final FileInputStream in = new FileInputStream(file);
int a = in.available();
if (a <= 0) a = 4096;
long free = Runtime.getRuntime().freeMemory();
if (a > free / 4) a = (int) (free / 4);
final byte[] buf = new byte[a];
int n;
while ((n = in.read(buf)) > 0) digest.update(buf, 0, n);
in.close();
// now compute the hex-representation of the md5 digest
return digest.digest();
} catch (final java.security.NoSuchAlgorithmException e) {
System.out.println("Internal Error at md5:" + e.getMessage());
} catch (final java.io.FileNotFoundException e) {
System.out.println("file not found:" + file.toString());
e.printStackTrace();
} catch (final java.io.IOException e) {
System.out.println("file error with " + file.toString() + ": " + e.getMessage());
}
return null;
}
*/
public final static ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
public static byte[] encodeMD5Raw(final File file) {
FileInputStream in;
@ -163,7 +133,9 @@ public final class serverCodings {
// create a concurrent thread that consumes data as it is read
// and computes the md5 while doing IO
md5DataConsumer md5consumer = new md5DataConsumer(1024 * 64, 8);
ExecutorService service = Executors.newSingleThreadExecutor();
Future<MessageDigest> md5result = service.submit(md5consumer);
service.shutdown();
filechunk c;
try {
@ -174,14 +146,13 @@ public final class serverCodings {
md5consumer.consume(c);
}
in.close();
} catch (final java.io.IOException e) {
} catch (final IOException e) {
System.out.println("file error with " + file.toString() + ": " + e.getMessage());
md5consumer.consume(md5DataConsumer.poison);
return null;
} finally {
// put in poison into queue to tell the consumer to stop
md5consumer.consume(md5DataConsumer.poison);
}
// put in poison into queue to tell the consumer to stop
md5consumer.consume(md5DataConsumer.poison);
// return the md5 digest from future task
try {
@ -253,43 +224,41 @@ public final class serverCodings {
} catch (InterruptedException e) {
e.printStackTrace();
}
filed.clear();
empty.clear();
return digest;
}
}
private static byte[] encodeMD5Raw(final byte[] b) {
try {
final MessageDigest digest = MessageDigest.getInstance("MD5");
digest.reset();
final InputStream in = new ByteArrayInputStream(b);
final byte[] buf = new byte[2048];
int n;
while ((n = in.read(buf)) > 0) digest.update(buf, 0, n);
in.close();
// now compute the hex-representation of the md5 digest
return digest.digest();
} catch (final java.security.NoSuchAlgorithmException e) {
System.out.println("Internal Error at md5:" + e.getMessage());
} catch (final java.io.IOException e) {
System.out.println("byte[] error: " + e.getMessage());
}
return null;
try {
final MessageDigest digest = MessageDigest.getInstance("MD5");
digest.reset();
final InputStream in = new ByteArrayInputStream(b);
final byte[] buf = new byte[2048];
int n;
while ((n = in.read(buf)) > 0) digest.update(buf, 0, n);
in.close();
// now compute the hex-representation of the md5 digest
return digest.digest();
} catch (final java.security.NoSuchAlgorithmException e) {
System.out.println("Internal Error at md5:" + e.getMessage());
} catch (final java.io.IOException e) {
System.out.println("byte[] error: " + e.getMessage());
}
return null;
}
public static Properties s2p(final String s) {
final Properties p = new Properties();
int pos;
final StringTokenizer st = new StringTokenizer(s, ",");
String token;
while (st.hasMoreTokens()) {
token = st.nextToken().trim();
pos = token.indexOf("=");
if (pos > 0) p.setProperty(token.substring(0, pos).trim(), token.substring(pos + 1).trim());
}
return p;
final Properties p = new Properties();
int pos;
final StringTokenizer st = new StringTokenizer(s, ",");
String token;
while (st.hasMoreTokens()) {
token = st.nextToken().trim();
pos = token.indexOf("=");
if (pos > 0) p.setProperty(token.substring(0, pos).trim(), token.substring(pos + 1).trim());
}
return p;
}
public static HashMap<String, String> string2map(String string, final String separator) {
@ -376,7 +345,7 @@ public final class serverCodings {
}
// usage example:
// java -classpath classes de.anomic.server.serverCodings -md5 DATA/HTDOCS/mediawiki/dewiki-latest-pages-articles.xml
// java -classpath classes de.anomic.server.serverCodings -md5 DATA/HTCACHE/mediawiki/wikipedia.de.xml
// java -classpath classes de.anomic.server.serverCodings -md5 readme.txt
// compare with:
// md5 readme.txt
@ -385,7 +354,6 @@ public final class serverCodings {
File f = new File(s[1]);
System.out.println("MD5 (" + f.getName() + ") = " + encodeMD5Hex(f));
}
service.shutdown();
}
}

@ -90,11 +90,12 @@ public class mediawikiIndex {
// init reader, producer and consumer
PositionAwareReader in = new PositionAwareReader(dumpFile);
ExecutorService service = Executors.newFixedThreadPool(2 /*Runtime.getRuntime().availableProcessors() + 1*/);
indexProducer producer = new indexProducer(100, idxFromWikimediaXML(dumpFile));
wikiConsumer consumer = new wikiConsumer(100, producer);
ExecutorService service = Executors.newFixedThreadPool(2);
Future<Integer> producerResult = service.submit(consumer);
Future<Integer> consumerResult = service.submit(producer);
service.shutdown();
// read the wiki dump
long start, stop;
@ -121,7 +122,6 @@ public class mediawikiIndex {
e.printStackTrace();
return;
}
service.shutdown();
in.close();
}

@ -139,7 +139,7 @@ public final class yacy {
* Semaphore needed by {@link yacy#setUpdaterCallback(serverUpdaterCallback)} to block
* until the {@link plasmaSwitchboard }object was created.
*/
private static serverSemaphore sbSync = new serverSemaphore(0);
//private static serverSemaphore sbSync = new serverSemaphore(0);
/**
* Semaphore needed by {@link yacy#waitForFinishedStartup()} to block
@ -222,7 +222,7 @@ public final class yacy {
serverLog.logSevere("STARTUP", "WARNING: the file " + oldconffile + " can not be renamed to "+ newconfFile +"!");
}
sb = new plasmaSwitchboard(homePath, "defaults/yacy.init".replace("/", File.separator), newconf, pro);
sbSync.V(); // signal that the sb reference was set
//sbSync.V(); // signal that the sb reference was set
// save information about available memory at startup time
sb.setConfig("memoryFreeAfterStartup", startupMemFree);

Loading…
Cancel
Save