From 3771b10b89062d644b7d535cbcb3486b25f70d52 Mon Sep 17 00:00:00 2001 From: orbiter Date: Wed, 1 Jun 2005 14:24:25 +0000 Subject: [PATCH] implemented automated migration indexCache 0.37 -> indexAssortmentCluster git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@205 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- .../de/anomic/plasma/plasmaSwitchboard.java | 14 +- .../anomic/plasma/plasmaWordIndexCache.java | 46 +++-- .../plasmaWordIndexClassicCacheMigration.java | 176 ++++++++++++++++++ .../de/anomic/server/serverInstantThread.java | 14 +- 4 files changed, 230 insertions(+), 20 deletions(-) create mode 100644 source/de/anomic/plasma/plasmaWordIndexClassicCacheMigration.java diff --git a/source/de/anomic/plasma/plasmaSwitchboard.java b/source/de/anomic/plasma/plasmaSwitchboard.java index 5f8e181f0..93074a9f4 100644 --- a/source/de/anomic/plasma/plasmaSwitchboard.java +++ b/source/de/anomic/plasma/plasmaSwitchboard.java @@ -181,6 +181,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser public HashMap outgoingCookies, incomingCookies; public kelondroTables facilityDB; public plasmaParser parser; + public plasmaWordIndexClassicCacheMigration classicCache; private serverSemaphore shutdownSync = new serverSemaphore(0); private boolean terminate = false; @@ -196,7 +197,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser setLog(new serverLog("PLASMA", loglevel)); // load values from configs - plasmaPath = new File(rootPath, getConfig("dbPath", "DATABASE")); + plasmaPath = new File(rootPath, getConfig("dbPath", "PLASMADB")); listsPath = new File(rootPath, getConfig("listsPath", "LISTS")); remoteProxyHost = getConfig("remoteProxyHost", ""); try { @@ -336,6 +337,14 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser indexDistribution = new distributeIndex(100 /*indexCount*/, 8000, 1 /*peerCount*/); deployThread("20_dhtdistribution", "DHT Distribution (currently by juniors only)", "selection, transfer and deletion of index entries that are not searched on your peer, but on others", new serverInstantThread(indexDistribution, "job", null), 120000); + + // init migratiion from 0.37 -> 0.38 + classicCache = new plasmaWordIndexClassicCacheMigration(plasmaPath, wordIndex); + setConfig("99_indexcachemigration_idlesleep" , 10000); + setConfig("99_indexcachemigration_busysleep" , 40); + deployThread("99_indexcachemigration", "index cache migration", "migration of index cache data structures 0.37 -> 0.38", + new serverInstantThread(classicCache, "oneStepMigration", "size"), 30000); + } private static String ppRamString(int bytes) { @@ -609,7 +618,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser // start a global crawl, if possible plasmaCrawlNURL.entry urlEntry = noticeURL.limitPop(); - if (urlEntry.url() == null) return false; + if (urlEntry.url() == null) return true; String profileHandle = urlEntry.profileHandle(); //System.out.println("DEBUG plasmaSwitchboard.processCrawling: profileHandle = " + profileHandle + ", urlEntry.url = " + urlEntry.url()); plasmaCrawlProfile.entry profile = profiles.getEntry(profileHandle); @@ -622,6 +631,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser ", permission=" + ((yacyCore.seedDB == null) ? "undefined" : (((yacyCore.seedDB.mySeed.isSenior()) || (yacyCore.seedDB.mySeed.isPrincipal())) ? "true" : "false"))); boolean tryRemote = + ((noticeURL.coreStackSize() != 0) || (processStack.size() != 0)) /* should do ourself */ && (profile.remoteIndexing()) /* granted */ && (urlEntry.initiator() != null) && (!(urlEntry.initiator().equals(plasmaURL.dummyHash))) /* not proxy */ && ((yacyCore.seedDB.mySeed.isSenior()) || diff --git a/source/de/anomic/plasma/plasmaWordIndexCache.java b/source/de/anomic/plasma/plasmaWordIndexCache.java index dc99ca7eb..c49cca0c7 100644 --- a/source/de/anomic/plasma/plasmaWordIndexCache.java +++ b/source/de/anomic/plasma/plasmaWordIndexCache.java @@ -268,18 +268,8 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { if (pause) { try {this.sleep(300);} catch (InterruptedException e) {} } else { - nextHash = (String) hashDate.getMinObject(); - if (nextHash != null) { - try { - flushFromMem(nextHash, true); - } catch (Exception e) { - log.logError("flushThread: " + e.getMessage()); - e.printStackTrace(); - } - try {this.sleep(10 + java.lang.Math.min(1000, 10 * maxWords/(cache.size() + 1)));} catch (InterruptedException e) {} - } else { - try {this.sleep(2000);} catch (InterruptedException e) {} - } + flushFromMem(); + try {this.sleep(10 + java.lang.Math.min(1000, 10 * maxWords/(cache.size() + 1)));} catch (InterruptedException e) {} } } } @@ -297,6 +287,32 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { } } + private void flushFromMem() { + // select appropriate hash + // we have 2 different methods to find a good hash: + // - the oldest entry in the cache + // - the entry with maximum count + if (cache.size() == 0) return; + flushThread.pause(); + try { + int count = hashScore.getMaxScore(); + String hash = (String) hashScore.getMaxObject(); + long time = (hash == null) ? System.currentTimeMillis() : longTime(hashDate.getScore(hash)); + if ((count > ramcacheLimit) && (System.currentTimeMillis() - time > 10000)) { + // flush high-score entries + flushFromMem(hash, true); + } else { + // flush oldest entries + hash = (String) hashDate.getMinObject(); + flushFromMem(hash, true); + } + } catch (Exception e) { + log.logError("flushFromMem: " + e.getMessage()); + e.printStackTrace(); + } + flushThread.proceed(); + } + private int flushFromMem(String key, boolean reintegrate) { // this method flushes indexes out from the ram to the disc. // at first we check the singleton database and act accordingly @@ -376,6 +392,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { } } + /* private synchronized int flushFromMemToLimit() { if ((hashScore.size() == 0) || (cache.size() == 0)) return 0; @@ -468,12 +485,13 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { } } } - } flushThread.proceed(); return count; } + */ + public plasmaWordIndexEntity getIndex(String wordHash, boolean deleteIfEmpty) { flushThread.pause(); flushFromMem(wordHash, false); @@ -517,7 +535,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface { public synchronized int addEntries(plasmaWordIndexEntryContainer container, long updateTime) { flushThread.pause(); //serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem: cache.size=" + cache.size() + "; hashScore.size=" + hashScore.size()); - if (cache.size() >= this.maxWords) flushFromMemToLimit(); + while (cache.size() >= this.maxWords) flushFromMem(); //if (flushc > 0) serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem - flushed " + flushc + " entries"); // put new words into cache diff --git a/source/de/anomic/plasma/plasmaWordIndexClassicCacheMigration.java b/source/de/anomic/plasma/plasmaWordIndexClassicCacheMigration.java new file mode 100644 index 000000000..d9220b1bf --- /dev/null +++ b/source/de/anomic/plasma/plasmaWordIndexClassicCacheMigration.java @@ -0,0 +1,176 @@ +// plasmaWordIndexFileCacheMigration.java +// -------------------------------------- +// part of YACY +// (C) by Michael Peter Christen; mc@anomic.de +// first published on http://www.anomic.de +// Frankfurt, Germany, 2004 +// last major change: 22.01.2004 +// +// This program is free software; you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation; either version 2 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program; if not, write to the Free Software +// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +// +// Using this software in any meaning (reading, learning, copying, compiling, +// running) means that you agree that the Author(s) is (are) not responsible +// for cost, loss of data or any harm that may be caused directly or indirectly +// by usage of this softare or this documentation. The usage of this software +// is on your own risk. The installation and usage (starting/running) of this +// software may allow other people or application to access your computer and +// any attached devices and is highly dependent on the configuration of the +// software which must be done by the user of the software; the author(s) is +// (are) also not responsible for proper configuration and usage of the +// software, even if provoked by documentation provided together with +// the software. +// +// Any changes to this file according to the GPL as documented in the file +// gpl.txt aside this file in the shipment you received can be done to the +// lines that follows this copyright notice here, but changes must not be +// done inside the copyright notive above. A re-distribution must contain +// the intact and unchanged copyright notice. +// Contributions and changes to the program code must be marked as such. + + +/* + The plasmaIndexCache manages a database table with a list of + indexEntries in it. This is done in a completely different fashion + as organized by the plasmaIndex tables. The entries are not + sorted and just stored in a buffer. + Whenever during a seach an index is retrieved, first it's buffer + is flushed into the corresponding index table, so that it can be + sorted into the remaining index entry elements. + The cache database consist of + - the word hash as primary key + - one column with a one-byte counter + - a number of more columns with indexEntry elements +*/ + + +// compile with +// javac -classpath classes -sourcepath source -d classes -g source/de/anomic/plasma/*.java + +package de.anomic.plasma; + +import java.io.*; +import java.util.*; +import de.anomic.server.*; +import de.anomic.kelondro.*; + +public class plasmaWordIndexClassicCacheMigration { + + private static final String indexCacheFileName = "indexCache.db"; + + // class variables + private File databaseRoot; + private kelondroTree indexCache; + private plasmaWordIndex fresh; + + public plasmaWordIndexClassicCacheMigration(File databaseRoot, plasmaWordIndex fresh) throws IOException { + this.fresh = fresh; + this.databaseRoot = databaseRoot; + File indexCacheFile = new File(databaseRoot, indexCacheFileName); + if (indexCacheFile.exists()) { + // simply open the file + indexCache = new kelondroTree(indexCacheFile, 0x400); + if (indexCache.size() == 0) { + indexCache.close(); + indexCacheFile.delete(); + indexCache = null; + } + } else { + indexCache = null; + } + } + + protected void close() throws IOException { + if (indexCache != null) indexCache.close(); + indexCache = null; + } + + private byte[][] getCache(String wordHash) throws IOException { + if (indexCache == null) return null; + // read one line from the cache; if none exists: construct one + byte[][] row; + try { + row = indexCache.get(wordHash.getBytes()); + } catch (Exception e) { + // we had some negativeSeekOffsetExceptions here, and also loops may cause this + // in that case the indexCache is corrupt + System.out.println("Error in plasmaWordINdexFileCache.getCache: index for hash " + wordHash + " is corrupt:" + e.toString()); + //e.printStackTrace(); + row = null; + } + if (row == null) { + row = new byte[indexCache.columns()][]; + row[0] = wordHash.getBytes(); + row[1] = new byte[1]; + row[1][0] = (byte) 0; + } + return row; + } + + protected Iterator wordHashes() throws IOException { + if (indexCache == null) return new HashSet().iterator(); + try { + return indexCache.rows(true, false, null); + } catch (kelondroException e) { + de.anomic.server.serverLog.logError("PLASMA", "kelondro error in plasmaWordIndexFileCache: " + e.getMessage()); + return new HashSet().iterator(); + } + } + + protected void remove(String wordHash) throws IOException { + if (indexCache == null) return; + indexCache.remove(wordHash.getBytes()); + } + + private int size(String wordHash) throws IOException { + if (indexCache == null) return 0; + // return number of entries in specific cache + byte[][] row = indexCache.get(wordHash.getBytes()); + if (row == null) return 0; + return (int) row[1][0]; + } + + public int size() { + if (indexCache == null) return 0; else return indexCache.size(); + } + + public boolean oneStepMigration() { + try { + Iterator i = wordHashes(); + if (!(i.hasNext())) return false; + byte[][] row = (byte[][]) i.next(); + if (row == null) return false; + String hash = new String(row[0]); + if (hash == null) return false; + int size = (int) row[1][0]; + plasmaWordIndexEntryContainer container = new plasmaWordIndexEntryContainer(hash); + plasmaWordIndexEntry[] entries = new plasmaWordIndexEntry[size]; + for (int j = 0; j < size; j++) { + entries[j] = new plasmaWordIndexEntry( + new String(row[j + 2], 0, plasmaCrawlLURL.urlHashLength), + new String(row[j + 2], plasmaCrawlLURL.urlHashLength, plasmaWordIndexEntry.attrSpaceShort)); + } + container.add(entries, System.currentTimeMillis()); + fresh.addEntries(container); + i = null; + remove(hash); + return true; + } catch (Exception e) { + serverLog.logError("PLASMA MIGRATION", "oneStepMigration error: " + e.getMessage()); + e.printStackTrace(); + return false; + } + } + +} diff --git a/source/de/anomic/server/serverInstantThread.java b/source/de/anomic/server/serverInstantThread.java index 87b65551b..345174235 100644 --- a/source/de/anomic/server/serverInstantThread.java +++ b/source/de/anomic/server/serverInstantThread.java @@ -10,18 +10,24 @@ public final class serverInstantThread extends serverAbstractThread implements s private Object environment; public serverInstantThread(Object env, String jobExec, String jobCount) { - // job is the name of a method of the object 'env' + // jobExec is the name of a method of the object 'env' that executes the one-step-run + // jobCount is the name of a method that returns the size of the job try { this.jobExecMethod = env.getClass().getMethod(jobExec, new Class[0]); + } catch (NoSuchMethodException e) { + throw new RuntimeException("serverInstantThread, wrong declaration of jobExec: " + e.getMessage()); + } + try { if (jobCount == null) this.jobCountMethod = null; else this.jobCountMethod = env.getClass().getMethod(jobCount, new Class[0]); - this.environment = env; - this.setName(env.getClass().getName() + "." + jobExec); + } catch (NoSuchMethodException e) { - throw new RuntimeException("Internal Error in serverInstantThread, wrong declaration: " + e.getMessage()); + throw new RuntimeException("serverInstantThread, wrong declaration of jobCount: " + e.getMessage()); } + this.environment = env; + this.setName(env.getClass().getName() + "." + jobExec); } public int getJobCount() {