implemented automated migration indexCache 0.37 -> indexAssortmentCluster

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@205 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 20 years ago
parent e89ded9e41
commit 3771b10b89

@ -181,6 +181,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public HashMap outgoingCookies, incomingCookies;
public kelondroTables facilityDB;
public plasmaParser parser;
public plasmaWordIndexClassicCacheMigration classicCache;
private serverSemaphore shutdownSync = new serverSemaphore(0);
private boolean terminate = false;
@ -196,7 +197,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
setLog(new serverLog("PLASMA", loglevel));
// load values from configs
plasmaPath = new File(rootPath, getConfig("dbPath", "DATABASE"));
plasmaPath = new File(rootPath, getConfig("dbPath", "PLASMADB"));
listsPath = new File(rootPath, getConfig("listsPath", "LISTS"));
remoteProxyHost = getConfig("remoteProxyHost", "");
try {
@ -336,6 +337,14 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
indexDistribution = new distributeIndex(100 /*indexCount*/, 8000, 1 /*peerCount*/);
deployThread("20_dhtdistribution", "DHT Distribution (currently by juniors only)", "selection, transfer and deletion of index entries that are not searched on your peer, but on others",
new serverInstantThread(indexDistribution, "job", null), 120000);
// init migratiion from 0.37 -> 0.38
classicCache = new plasmaWordIndexClassicCacheMigration(plasmaPath, wordIndex);
setConfig("99_indexcachemigration_idlesleep" , 10000);
setConfig("99_indexcachemigration_busysleep" , 40);
deployThread("99_indexcachemigration", "index cache migration", "migration of index cache data structures 0.37 -> 0.38",
new serverInstantThread(classicCache, "oneStepMigration", "size"), 30000);
}
private static String ppRamString(int bytes) {
@ -609,7 +618,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
// start a global crawl, if possible
plasmaCrawlNURL.entry urlEntry = noticeURL.limitPop();
if (urlEntry.url() == null) return false;
if (urlEntry.url() == null) return true;
String profileHandle = urlEntry.profileHandle();
//System.out.println("DEBUG plasmaSwitchboard.processCrawling: profileHandle = " + profileHandle + ", urlEntry.url = " + urlEntry.url());
plasmaCrawlProfile.entry profile = profiles.getEntry(profileHandle);
@ -622,6 +631,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
", permission=" + ((yacyCore.seedDB == null) ? "undefined" : (((yacyCore.seedDB.mySeed.isSenior()) || (yacyCore.seedDB.mySeed.isPrincipal())) ? "true" : "false")));
boolean tryRemote =
((noticeURL.coreStackSize() != 0) || (processStack.size() != 0)) /* should do ourself */ &&
(profile.remoteIndexing()) /* granted */ &&
(urlEntry.initiator() != null) && (!(urlEntry.initiator().equals(plasmaURL.dummyHash))) /* not proxy */ &&
((yacyCore.seedDB.mySeed.isSenior()) ||

@ -268,18 +268,8 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
if (pause) {
try {this.sleep(300);} catch (InterruptedException e) {}
} else {
nextHash = (String) hashDate.getMinObject();
if (nextHash != null) {
try {
flushFromMem(nextHash, true);
} catch (Exception e) {
log.logError("flushThread: " + e.getMessage());
e.printStackTrace();
}
try {this.sleep(10 + java.lang.Math.min(1000, 10 * maxWords/(cache.size() + 1)));} catch (InterruptedException e) {}
} else {
try {this.sleep(2000);} catch (InterruptedException e) {}
}
flushFromMem();
try {this.sleep(10 + java.lang.Math.min(1000, 10 * maxWords/(cache.size() + 1)));} catch (InterruptedException e) {}
}
}
}
@ -297,6 +287,32 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
}
private void flushFromMem() {
// select appropriate hash
// we have 2 different methods to find a good hash:
// - the oldest entry in the cache
// - the entry with maximum count
if (cache.size() == 0) return;
flushThread.pause();
try {
int count = hashScore.getMaxScore();
String hash = (String) hashScore.getMaxObject();
long time = (hash == null) ? System.currentTimeMillis() : longTime(hashDate.getScore(hash));
if ((count > ramcacheLimit) && (System.currentTimeMillis() - time > 10000)) {
// flush high-score entries
flushFromMem(hash, true);
} else {
// flush oldest entries
hash = (String) hashDate.getMinObject();
flushFromMem(hash, true);
}
} catch (Exception e) {
log.logError("flushFromMem: " + e.getMessage());
e.printStackTrace();
}
flushThread.proceed();
}
private int flushFromMem(String key, boolean reintegrate) {
// this method flushes indexes out from the ram to the disc.
// at first we check the singleton database and act accordingly
@ -376,6 +392,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
}
/*
private synchronized int flushFromMemToLimit() {
if ((hashScore.size() == 0) || (cache.size() == 0)) return 0;
@ -468,12 +485,13 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
}
}
}
}
flushThread.proceed();
return count;
}
*/
public plasmaWordIndexEntity getIndex(String wordHash, boolean deleteIfEmpty) {
flushThread.pause();
flushFromMem(wordHash, false);
@ -517,7 +535,7 @@ public final class plasmaWordIndexCache implements plasmaWordIndexInterface {
public synchronized int addEntries(plasmaWordIndexEntryContainer container, long updateTime) {
flushThread.pause();
//serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem: cache.size=" + cache.size() + "; hashScore.size=" + hashScore.size());
if (cache.size() >= this.maxWords) flushFromMemToLimit();
while (cache.size() >= this.maxWords) flushFromMem();
//if (flushc > 0) serverLog.logDebug("PLASMA INDEXING", "addEntryToIndexMem - flushed " + flushc + " entries");
// put new words into cache

@ -0,0 +1,176 @@
// plasmaWordIndexFileCacheMigration.java
// --------------------------------------
// part of YACY
// (C) by Michael Peter Christen; mc@anomic.de
// first published on http://www.anomic.de
// Frankfurt, Germany, 2004
// last major change: 22.01.2004
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
// Using this software in any meaning (reading, learning, copying, compiling,
// running) means that you agree that the Author(s) is (are) not responsible
// for cost, loss of data or any harm that may be caused directly or indirectly
// by usage of this softare or this documentation. The usage of this software
// is on your own risk. The installation and usage (starting/running) of this
// software may allow other people or application to access your computer and
// any attached devices and is highly dependent on the configuration of the
// software which must be done by the user of the software; the author(s) is
// (are) also not responsible for proper configuration and usage of the
// software, even if provoked by documentation provided together with
// the software.
//
// Any changes to this file according to the GPL as documented in the file
// gpl.txt aside this file in the shipment you received can be done to the
// lines that follows this copyright notice here, but changes must not be
// done inside the copyright notive above. A re-distribution must contain
// the intact and unchanged copyright notice.
// Contributions and changes to the program code must be marked as such.
/*
The plasmaIndexCache manages a database table with a list of
indexEntries in it. This is done in a completely different fashion
as organized by the plasmaIndex tables. The entries are not
sorted and just stored in a buffer.
Whenever during a seach an index is retrieved, first it's buffer
is flushed into the corresponding index table, so that it can be
sorted into the remaining index entry elements.
The cache database consist of
- the word hash as primary key
- one column with a one-byte counter
- a number of more columns with indexEntry elements
*/
// compile with
// javac -classpath classes -sourcepath source -d classes -g source/de/anomic/plasma/*.java
package de.anomic.plasma;
import java.io.*;
import java.util.*;
import de.anomic.server.*;
import de.anomic.kelondro.*;
public class plasmaWordIndexClassicCacheMigration {
private static final String indexCacheFileName = "indexCache.db";
// class variables
private File databaseRoot;
private kelondroTree indexCache;
private plasmaWordIndex fresh;
public plasmaWordIndexClassicCacheMigration(File databaseRoot, plasmaWordIndex fresh) throws IOException {
this.fresh = fresh;
this.databaseRoot = databaseRoot;
File indexCacheFile = new File(databaseRoot, indexCacheFileName);
if (indexCacheFile.exists()) {
// simply open the file
indexCache = new kelondroTree(indexCacheFile, 0x400);
if (indexCache.size() == 0) {
indexCache.close();
indexCacheFile.delete();
indexCache = null;
}
} else {
indexCache = null;
}
}
protected void close() throws IOException {
if (indexCache != null) indexCache.close();
indexCache = null;
}
private byte[][] getCache(String wordHash) throws IOException {
if (indexCache == null) return null;
// read one line from the cache; if none exists: construct one
byte[][] row;
try {
row = indexCache.get(wordHash.getBytes());
} catch (Exception e) {
// we had some negativeSeekOffsetExceptions here, and also loops may cause this
// in that case the indexCache is corrupt
System.out.println("Error in plasmaWordINdexFileCache.getCache: index for hash " + wordHash + " is corrupt:" + e.toString());
//e.printStackTrace();
row = null;
}
if (row == null) {
row = new byte[indexCache.columns()][];
row[0] = wordHash.getBytes();
row[1] = new byte[1];
row[1][0] = (byte) 0;
}
return row;
}
protected Iterator wordHashes() throws IOException {
if (indexCache == null) return new HashSet().iterator();
try {
return indexCache.rows(true, false, null);
} catch (kelondroException e) {
de.anomic.server.serverLog.logError("PLASMA", "kelondro error in plasmaWordIndexFileCache: " + e.getMessage());
return new HashSet().iterator();
}
}
protected void remove(String wordHash) throws IOException {
if (indexCache == null) return;
indexCache.remove(wordHash.getBytes());
}
private int size(String wordHash) throws IOException {
if (indexCache == null) return 0;
// return number of entries in specific cache
byte[][] row = indexCache.get(wordHash.getBytes());
if (row == null) return 0;
return (int) row[1][0];
}
public int size() {
if (indexCache == null) return 0; else return indexCache.size();
}
public boolean oneStepMigration() {
try {
Iterator i = wordHashes();
if (!(i.hasNext())) return false;
byte[][] row = (byte[][]) i.next();
if (row == null) return false;
String hash = new String(row[0]);
if (hash == null) return false;
int size = (int) row[1][0];
plasmaWordIndexEntryContainer container = new plasmaWordIndexEntryContainer(hash);
plasmaWordIndexEntry[] entries = new plasmaWordIndexEntry[size];
for (int j = 0; j < size; j++) {
entries[j] = new plasmaWordIndexEntry(
new String(row[j + 2], 0, plasmaCrawlLURL.urlHashLength),
new String(row[j + 2], plasmaCrawlLURL.urlHashLength, plasmaWordIndexEntry.attrSpaceShort));
}
container.add(entries, System.currentTimeMillis());
fresh.addEntries(container);
i = null;
remove(hash);
return true;
} catch (Exception e) {
serverLog.logError("PLASMA MIGRATION", "oneStepMigration error: " + e.getMessage());
e.printStackTrace();
return false;
}
}
}

@ -10,18 +10,24 @@ public final class serverInstantThread extends serverAbstractThread implements s
private Object environment;
public serverInstantThread(Object env, String jobExec, String jobCount) {
// job is the name of a method of the object 'env'
// jobExec is the name of a method of the object 'env' that executes the one-step-run
// jobCount is the name of a method that returns the size of the job
try {
this.jobExecMethod = env.getClass().getMethod(jobExec, new Class[0]);
} catch (NoSuchMethodException e) {
throw new RuntimeException("serverInstantThread, wrong declaration of jobExec: " + e.getMessage());
}
try {
if (jobCount == null)
this.jobCountMethod = null;
else
this.jobCountMethod = env.getClass().getMethod(jobCount, new Class[0]);
this.environment = env;
this.setName(env.getClass().getName() + "." + jobExec);
} catch (NoSuchMethodException e) {
throw new RuntimeException("Internal Error in serverInstantThread, wrong declaration: " + e.getMessage());
throw new RuntimeException("serverInstantThread, wrong declaration of jobCount: " + e.getMessage());
}
this.environment = env;
this.setName(env.getClass().getName() + "." + jobExec);
}
public int getJobCount() {

Loading…
Cancel
Save