From b0f2003792d73444213b88e8835625bd6e90f577 Mon Sep 17 00:00:00 2001 From: orbiter Date: Fri, 21 Nov 2008 23:21:33 +0000 Subject: [PATCH] fast database initialization and fast start.up of yacy: - applied knowledge about concurrent files stream reading and index processing from the wikimedia reader to the EcoTable initialization process: the file reader is now concurrent to the index generation - changed also some initialization processes to avoid some pauses during initialization git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5354 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- .../kelondro/kelondroChunkIterator.java | 307 ++++++++++++++++++ source/de/anomic/kelondro/kelondroEcoFS.java | 63 ---- .../de/anomic/kelondro/kelondroEcoTable.java | 23 +- .../de/anomic/plasma/plasmaSwitchboard.java | 4 +- source/de/anomic/server/serverCodings.java | 98 ++---- source/de/anomic/tools/mediawikiIndex.java | 4 +- source/yacy.java | 4 +- 7 files changed, 348 insertions(+), 155 deletions(-) create mode 100644 source/de/anomic/kelondro/kelondroChunkIterator.java diff --git a/source/de/anomic/kelondro/kelondroChunkIterator.java b/source/de/anomic/kelondro/kelondroChunkIterator.java new file mode 100644 index 000000000..a5c18609a --- /dev/null +++ b/source/de/anomic/kelondro/kelondroChunkIterator.java @@ -0,0 +1,307 @@ +// kelondroChunkIterator.java +// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany +// first published 14.01.2008 on http://yacy.net +// +// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $ +// $LastChangedRevision: 1986 $ +// $LastChangedBy: orbiter $ +// +// LICENSE +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + +package de.anomic.kelondro; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class kelondroChunkIterator implements Iterator { + + private final int chunksize; + + /** + * create a ChunkIterator + * a ChunkIterator uses a BufferedInputStream to iterate through the file + * and is therefore a fast option to get all elements in the file as a sequence + * @param file: the file + * @param recordsize: the size of the elements in the file + * @param chunksize: the size of the chunks that are returned by next(). remaining bytes until the lenght of recordsize are skipped + * @throws FileNotFoundException + */ + + /* + private final DataInputStream stream; + private byte[] nextBytes; + public kelondroChunkIterator(final File file, final int recordsize, final int chunksize) throws FileNotFoundException { + assert (file.exists()); + assert file.length() % recordsize == 0; + this.recordsize = recordsize; + this.chunksize = chunksize; + this.stream = new DataInputStream(new BufferedInputStream(new FileInputStream(file), 64 * 1024)); + this.nextBytes = next0(); + } + + public boolean hasNext() { + return nextBytes != null; + } + + public byte[] next0() { + final byte[] chunk = new byte[chunksize]; + int r, s; + try { + // read the chunk + this.stream.readFully(chunk); + // skip remaining bytes + r = chunksize; + while (r < recordsize) { + s = (int) this.stream.skip(recordsize - r); + assert s > 0; + if (s <= 0) return null; + r += s; + } + return chunk; + } catch (final IOException e) { + return null; + } + } + + public byte[] next() { + final byte[] n = this.nextBytes; + this.nextBytes = next0(); + return n; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + */ + + + ExecutorService service = Executors.newFixedThreadPool(2); + filechunkProducer producer; + filechunkSlicer slicer; + Future producerResult; + Future slicerResult; + byte[] nextRecord; + + + public kelondroChunkIterator(final File file, final int recordsize, final int chunksize) throws FileNotFoundException { + assert (file.exists()); + assert file.length() % recordsize == 0; + this.chunksize = chunksize; + + service = Executors.newFixedThreadPool(2); + // buffer size and count calculation is critical, because wrong values + // will cause blocking of the concurrent consumer/producer threads + int filebuffersize = 1024 * 16; + int chunkbuffercountmin = filebuffersize / recordsize + 1; // minimum + int filebuffercount = 1024 * 1024 / filebuffersize; // max 1 MB + int chunkbuffercount = chunkbuffercountmin * filebuffercount + 1; + producer = new filechunkProducer(file, filebuffersize, filebuffercount); + slicer = new filechunkSlicer(producer, recordsize, chunksize, chunkbuffercount); + producerResult = service.submit(producer); + slicerResult = service.submit(slicer); + service.shutdown(); + nextRecord = slicer.consume(); + } + + public boolean hasNext() { + return nextRecord != null; + } + + public byte[] next() { + if (nextRecord == null) return null; + byte[] n = new byte[chunksize]; + System.arraycopy(nextRecord, 0, n, 0, chunksize); + slicer.recycle(nextRecord); + nextRecord = slicer.consume(); + return n; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + + private static class filechunkSlicer implements Callable { + + private filechunkProducer producer; + private static byte[] poison = new byte[0]; + private BlockingQueue empty; + private BlockingQueue slices; + private int slicesize, head; + + public filechunkSlicer(filechunkProducer producer, final int slicesize, int head, int stacksize) throws FileNotFoundException { + assert producer != null; + this.producer = producer; + this.slices = new ArrayBlockingQueue(stacksize); + this.empty = new ArrayBlockingQueue(stacksize); + this.slicesize = slicesize; + this.head = head; + // fill the empty queue + for (int i = 0; i < stacksize; i++) empty.add(new byte[head]); + } + + public void recycle(byte[] c) { + try { + empty.put(c); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public byte[] consume() { + try { + byte[] b = slices.take(); // leer + if (b == poison) return null; else return b; + } catch (InterruptedException e) { + e.printStackTrace(); + return null; + } + } + + private void slice(byte[] from, int startfrom, byte[] to, int startto, int len) { + if (startto >= head) return; + if (startto + len > head) len = head - startto; + assert to.length == head; + System.arraycopy(from, startfrom, to, startto, len); + } + + public Integer call() { + filechunk c; + int p; + try { + byte[] slice = empty.take(); + int slicec = 0; + consumer: while(true) { + c = producer.consume(); + if (c == null) { + // finished. put poison into slices queue + slices.put(poison); + break consumer; + } + p = 0; + // copy as much as possible to the current slice + slicefiller: while (true) { + assert slicesize > slicec; + if (c.n - p >= slicesize - slicec) { + // a full slice can be produced + slice(c.b, p, slice, slicec, slicesize - slicec); + // the slice is now full + p += slicesize - slicec; + slices.put(slice); + slice = empty.take(); + slicec = 0; + continue slicefiller; + } else { + // fill only a part of the slice and wait for next chunk + slice(c.b, p, slice, slicec, c.n - p); + // the chunk is now fully read + producer.recycle(c); + slicec += c.n - p; + continue consumer; + } + } + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return new Integer(0); + } + + } + + private static class filechunk { + public byte[] b; + public int n; + public filechunk(int len) { + b = new byte[len]; + n = 0; + } + } + + /** + * the filechunkProducer reads an in put file and stores chunks of the results + * into a buffer. All elements stored in the buffer must be recycled. + * The class does not allocate more memory than a given chunk size multiplied with a + * number of chunks that shall be stored in a queue for processing. + */ + private static class filechunkProducer implements Callable { + + private BlockingQueue empty; + private BlockingQueue filed; + private static filechunk poison = new filechunk(0); + private FileInputStream fis; + + public filechunkProducer(File in, int bufferSize, int bufferCount) throws FileNotFoundException { + empty = new ArrayBlockingQueue(bufferCount); + filed = new ArrayBlockingQueue(bufferCount); + fis = new FileInputStream(in); + // fill the empty queue + for (int i = 0; i < bufferCount; i++) empty.add(new filechunk(bufferSize)); + } + + public void recycle(filechunk c) { + try { + empty.put(c); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public filechunk consume() { + try { + filechunk f = filed.take(); // leer + if (f == poison) return null; else return f; + } catch (InterruptedException e) { + e.printStackTrace(); + return null; + } + } + + public Integer call() { + try { + filechunk c; + while(true) { + c = empty.take(); // leer + c.n = fis.read(c.b); + if (c.n <= 0) break; + filed.put(c); + } + // put poison into consumer queue so he can stop consuming + filed.put(poison); + } catch (InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException(e.getMessage()); + } catch (IOException e) { + e.printStackTrace(); + throw new RuntimeException(e.getMessage()); + } + return new Integer(0); + } + + } + +} diff --git a/source/de/anomic/kelondro/kelondroEcoFS.java b/source/de/anomic/kelondro/kelondroEcoFS.java index 3463dc629..85234cab8 100644 --- a/source/de/anomic/kelondro/kelondroEcoFS.java +++ b/source/de/anomic/kelondro/kelondroEcoFS.java @@ -24,15 +24,11 @@ package de.anomic.kelondro; -import java.io.BufferedInputStream; -import java.io.DataInputStream; import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; -import java.util.Iterator; /** * The EcoFS is a flat file with records of fixed length. The file does not contain @@ -556,66 +552,7 @@ public class kelondroEcoFS { assert this.buffercount == 0; this.raf.setLength((s - 1) * this.recordsize); } - - public static class ChunkIterator implements Iterator { - - private final int recordsize, chunksize; - private final DataInputStream stream; - private byte[] nextBytes; - - /** - * create a ChunkIterator - * a ChunkIterator uses a BufferedInputStream to iterate through the file - * and is therefore a fast option to get all elements in the file as a sequence - * @param file: the eco-file - * @param recordsize: the size of the elements in the file - * @param chunksize: the size of the chunks that are returned by next(). remaining bytes until the lenght of recordsize are skipped - * @throws FileNotFoundException - */ - public ChunkIterator(final File file, final int recordsize, final int chunksize) throws FileNotFoundException { - assert (file.exists()); - assert file.length() % recordsize == 0; - this.recordsize = recordsize; - this.chunksize = chunksize; - this.stream = new DataInputStream(new BufferedInputStream(new FileInputStream(file), 64 * 1024)); - this.nextBytes = next0(); - } - - public boolean hasNext() { - return nextBytes != null; - } - - public byte[] next0() { - final byte[] chunk = new byte[chunksize]; - int r, s; - try { - // read the chunk - this.stream.readFully(chunk); - // skip remaining bytes - r = chunksize; - while (r < recordsize) { - s = (int) this.stream.skip(recordsize - r); - assert s != 0; - r += s; - } - return chunk; - } catch (final IOException e) { - return null; - } - } - public byte[] next() { - final byte[] n = this.nextBytes; - this.nextBytes = next0(); - return n; - } - - public void remove() { - throw new UnsupportedOperationException(); - } - - } - /** * main - writes some data and checks the tables size (with time measureing) * @param args diff --git a/source/de/anomic/kelondro/kelondroEcoTable.java b/source/de/anomic/kelondro/kelondroEcoTable.java index c55dac2a6..9330d7681 100644 --- a/source/de/anomic/kelondro/kelondroEcoTable.java +++ b/source/de/anomic/kelondro/kelondroEcoTable.java @@ -126,26 +126,19 @@ public class kelondroEcoTable implements kelondroIndex { int i = 0; byte[] key; if (table == null) { - final Iterator ki = keyIterator(tablefile, rowdef); + final Iterator ki = new kelondroChunkIterator(tablefile, rowdef.objectsize, rowdef.primaryKeyLength); while (ki.hasNext()) { key = ki.next(); - // write the key into the index table assert key != null; if (key == null) {i++; continue;} if (!index.addi(key, i++)) fail++; assert index.size() + fail == i : "index.size() = " + index.size() + ", i = " + i + ", fail = " + fail + ", key = '" + new String(key) + "'"; - /* - if ((i % 10000) == 0) { - System.out.print('.'); - System.out.flush(); - } - */ } } else { byte[] record; key = new byte[rowdef.primaryKeyLength]; - final Iterator ri = new kelondroEcoFS.ChunkIterator(tablefile, rowdef.objectsize, rowdef.objectsize); + final Iterator ri = new kelondroChunkIterator(tablefile, rowdef.objectsize, rowdef.objectsize); while (ri.hasNext()) { record = ri.next(); assert record != null; @@ -219,18 +212,6 @@ public class kelondroEcoTable implements kelondroIndex { return serverMemory.available() < minmemremaining; } - /** - * a KeyIterator - * @param file: the eco-file - * @param rowdef: the row definition - * @throws FileNotFoundException - * @return an iterator for all keys in the file - */ - public Iterator keyIterator(final File file, final kelondroRow rowdef) throws FileNotFoundException { - assert rowdef.primaryKeyIndex == 0; - return new kelondroEcoFS.ChunkIterator(file, rowdef.objectsize, rowdef.primaryKeyLength); - } - public static long tableSize(final File tablefile, final int recordsize) { // returns number of records in table return kelondroEcoFS.tableSize(tablefile, recordsize); diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 7721a316c..2f3aa720b 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -313,7 +313,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch free / 4) a = (int) (free / 4); - final byte[] buf = new byte[a]; - int n; - while ((n = in.read(buf)) > 0) digest.update(buf, 0, n); - in.close(); - // now compute the hex-representation of the md5 digest - return digest.digest(); - } catch (final java.security.NoSuchAlgorithmException e) { - System.out.println("Internal Error at md5:" + e.getMessage()); - } catch (final java.io.FileNotFoundException e) { - System.out.println("file not found:" + file.toString()); - e.printStackTrace(); - } catch (final java.io.IOException e) { - System.out.println("file error with " + file.toString() + ": " + e.getMessage()); - } - return null; - } - */ - - public final static ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1); public static byte[] encodeMD5Raw(final File file) { FileInputStream in; @@ -163,7 +133,9 @@ public final class serverCodings { // create a concurrent thread that consumes data as it is read // and computes the md5 while doing IO md5DataConsumer md5consumer = new md5DataConsumer(1024 * 64, 8); + ExecutorService service = Executors.newSingleThreadExecutor(); Future md5result = service.submit(md5consumer); + service.shutdown(); filechunk c; try { @@ -174,14 +146,13 @@ public final class serverCodings { md5consumer.consume(c); } in.close(); - } catch (final java.io.IOException e) { + } catch (final IOException e) { System.out.println("file error with " + file.toString() + ": " + e.getMessage()); md5consumer.consume(md5DataConsumer.poison); return null; - } finally { - // put in poison into queue to tell the consumer to stop - md5consumer.consume(md5DataConsumer.poison); } + // put in poison into queue to tell the consumer to stop + md5consumer.consume(md5DataConsumer.poison); // return the md5 digest from future task try { @@ -253,43 +224,41 @@ public final class serverCodings { } catch (InterruptedException e) { e.printStackTrace(); } - filed.clear(); - empty.clear(); return digest; } } private static byte[] encodeMD5Raw(final byte[] b) { - try { - final MessageDigest digest = MessageDigest.getInstance("MD5"); - digest.reset(); - final InputStream in = new ByteArrayInputStream(b); - final byte[] buf = new byte[2048]; - int n; - while ((n = in.read(buf)) > 0) digest.update(buf, 0, n); - in.close(); - // now compute the hex-representation of the md5 digest - return digest.digest(); - } catch (final java.security.NoSuchAlgorithmException e) { - System.out.println("Internal Error at md5:" + e.getMessage()); - } catch (final java.io.IOException e) { - System.out.println("byte[] error: " + e.getMessage()); - } - return null; + try { + final MessageDigest digest = MessageDigest.getInstance("MD5"); + digest.reset(); + final InputStream in = new ByteArrayInputStream(b); + final byte[] buf = new byte[2048]; + int n; + while ((n = in.read(buf)) > 0) digest.update(buf, 0, n); + in.close(); + // now compute the hex-representation of the md5 digest + return digest.digest(); + } catch (final java.security.NoSuchAlgorithmException e) { + System.out.println("Internal Error at md5:" + e.getMessage()); + } catch (final java.io.IOException e) { + System.out.println("byte[] error: " + e.getMessage()); + } + return null; } public static Properties s2p(final String s) { - final Properties p = new Properties(); - int pos; - final StringTokenizer st = new StringTokenizer(s, ","); - String token; - while (st.hasMoreTokens()) { - token = st.nextToken().trim(); - pos = token.indexOf("="); - if (pos > 0) p.setProperty(token.substring(0, pos).trim(), token.substring(pos + 1).trim()); - } - return p; + final Properties p = new Properties(); + int pos; + final StringTokenizer st = new StringTokenizer(s, ","); + String token; + while (st.hasMoreTokens()) { + token = st.nextToken().trim(); + pos = token.indexOf("="); + if (pos > 0) p.setProperty(token.substring(0, pos).trim(), token.substring(pos + 1).trim()); + } + return p; } public static HashMap string2map(String string, final String separator) { @@ -376,7 +345,7 @@ public final class serverCodings { } // usage example: - // java -classpath classes de.anomic.server.serverCodings -md5 DATA/HTDOCS/mediawiki/dewiki-latest-pages-articles.xml + // java -classpath classes de.anomic.server.serverCodings -md5 DATA/HTCACHE/mediawiki/wikipedia.de.xml // java -classpath classes de.anomic.server.serverCodings -md5 readme.txt // compare with: // md5 readme.txt @@ -385,7 +354,6 @@ public final class serverCodings { File f = new File(s[1]); System.out.println("MD5 (" + f.getName() + ") = " + encodeMD5Hex(f)); } - service.shutdown(); } } diff --git a/source/de/anomic/tools/mediawikiIndex.java b/source/de/anomic/tools/mediawikiIndex.java index 03ae4a298..ed9dfeb98 100644 --- a/source/de/anomic/tools/mediawikiIndex.java +++ b/source/de/anomic/tools/mediawikiIndex.java @@ -90,11 +90,12 @@ public class mediawikiIndex { // init reader, producer and consumer PositionAwareReader in = new PositionAwareReader(dumpFile); - ExecutorService service = Executors.newFixedThreadPool(2 /*Runtime.getRuntime().availableProcessors() + 1*/); indexProducer producer = new indexProducer(100, idxFromWikimediaXML(dumpFile)); wikiConsumer consumer = new wikiConsumer(100, producer); + ExecutorService service = Executors.newFixedThreadPool(2); Future producerResult = service.submit(consumer); Future consumerResult = service.submit(producer); + service.shutdown(); // read the wiki dump long start, stop; @@ -121,7 +122,6 @@ public class mediawikiIndex { e.printStackTrace(); return; } - service.shutdown(); in.close(); } diff --git a/source/yacy.java b/source/yacy.java index 35fec125b..d4a67cbea 100644 --- a/source/yacy.java +++ b/source/yacy.java @@ -139,7 +139,7 @@ public final class yacy { * Semaphore needed by {@link yacy#setUpdaterCallback(serverUpdaterCallback)} to block * until the {@link plasmaSwitchboard }object was created. */ - private static serverSemaphore sbSync = new serverSemaphore(0); + //private static serverSemaphore sbSync = new serverSemaphore(0); /** * Semaphore needed by {@link yacy#waitForFinishedStartup()} to block @@ -222,7 +222,7 @@ public final class yacy { serverLog.logSevere("STARTUP", "WARNING: the file " + oldconffile + " can not be renamed to "+ newconfFile +"!"); } sb = new plasmaSwitchboard(homePath, "defaults/yacy.init".replace("/", File.separator), newconf, pro); - sbSync.V(); // signal that the sb reference was set + //sbSync.V(); // signal that the sb reference was set // save information about available memory at startup time sb.setConfig("memoryFreeAfterStartup", startupMemFree);