various fixes

- shutdown behavior (killing of client sessions)
- EcoFS reading better
- another synchronization in balancer.size()


git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4662 6c8d7289-2bf4-0310-a012-ef5d649a1542
pull/1/head
orbiter 17 years ago
parent 6e36c156e8
commit 225f9fd429

@ -29,11 +29,11 @@ package de.anomic.index;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.util.Collections;
import java.util.Iterator;
@ -62,7 +62,8 @@ public final class indexContainerHeap {
private serverLog log;
private kelondroBytesIntMap index;
private SortedMap<String, indexContainer> cache;
private File heapFile;
private File backupFile;
private boolean readOnlyMode;
// index xor cache is used. If one is not null, then the other must be null
/*
@ -79,39 +80,103 @@ public final class indexContainerHeap {
*/
/**
* open a new container heap and prepare it as a heap to be written
* opens an existing heap file in undefined mode
* after this a initialization should be made to use the heap:
* either a read-only or read/write mode inititalization
* @param payloadrow
* @param log
*/
public indexContainerHeap(kelondroRow payloadrow, serverLog log) {
this.payloadrow = payloadrow;
this.log = log;
this.cache = Collections.synchronizedSortedMap(new TreeMap<String, indexContainer>(new kelondroByteOrder.StringOrder(payloadrow.getOrdering())));
this.cache = null;
this.index = null;
this.heapFile = null;
this.backupFile = null;
this.readOnlyMode = false;
}
/**
* opens an existing heap file in read-only mode
* @param indexHeapFile
* @param payloadrow
* @param log
* initializes the heap in read/write mode without reading of a dump first
* another dump reading afterwards is not possible
*/
public indexContainerHeap(kelondroRow payloadrow, serverLog log, File heapFile) {
this.payloadrow = payloadrow;
this.log = log;
this.cache = null;
this.index = null;
this.heapFile = heapFile;
public void initWriteMode() {
this.cache = Collections.synchronizedSortedMap(new TreeMap<String, indexContainer>(new kelondroByteOrder.StringOrder(payloadrow.getOrdering())));
this.readOnlyMode = false;
}
/**
* restore a heap dump: this is a heap in write mode. There should no heap file
* be assigned in initialization; the file name is given here in this method
* when the heap is once dumped again, the heap file name may be different
* @param heapFile
* @throws IOException
*/
public void initWriteMode(File heapFile) throws IOException {
this.readOnlyMode = false;
if (log != null) log.logInfo("restoring dump for rwi heap '" + heapFile.getName() + "'");
long start = System.currentTimeMillis();
this.cache = Collections.synchronizedSortedMap(new TreeMap<String, indexContainer>(new kelondroByteOrder.StringOrder(payloadrow.getOrdering())));
int urlCount = 0;
synchronized (cache) {
for (indexContainer container : new heapFileEntries(heapFile, this.payloadrow)) {
if (container == null) break;
cache.put(container.getWordHash(), container);
urlCount += container.size();
}
}
if (log != null) log.logInfo("finished rwi heap restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
}
public void dumpHeap(File heapFile) throws IOException {
assert this.heapFile == null;
/**
* init a heap file in read-only mode
* this initiates a heap index generation which is then be used to access elements of the heap
* during the life-time of this object, the file is _not_ open; it is opened each time
* the heap is accessed for reading
* @param heapFile
* @throws IOException
*/
public void initReadMode(File heapFile) throws IOException {
this.readOnlyMode = true;
assert this.cache == null;
assert this.index == null;
this.backupFile = heapFile;
if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist");
if (heapFile.length() >= (long) Integer.MAX_VALUE) throw new IOException("file " + heapFile + " too large, index can only be crated for files less than 2GB");
if (log != null) log.logInfo("creating index for rwi heap '" + heapFile.getName() + "'");
long start = System.currentTimeMillis();
this.index = new kelondroBytesIntMap(payloadrow.primaryKeyLength, (kelondroByteOrder) payloadrow.getOrdering(), 0);
DataInputStream is = null;
long urlCount = 0;
String wordHash;
byte[] word = new byte[payloadrow.primaryKeyLength];
int seek = 0, seek0;
synchronized (index) {
is = new DataInputStream(new BufferedInputStream(new FileInputStream(heapFile), 64*1024));
while (is.available() > 0) {
// remember seek position
seek0 = seek;
// read word
is.readFully(word);
wordHash = new String(word);
seek += wordHash.length();
// read collection
seek += kelondroRowSet.skipNextRowSet(is, payloadrow);
index.addi(word, seek0);
}
}
is.close();
if (log != null) log.logInfo("finished rwi heap indexing: " + urlCount + " word/URL relations in " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
}
public void dump(File heapFile) throws IOException {
assert this.cache != null;
this.heapFile = heapFile;
if (log != null) log.logInfo("creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's");
if (heapFile.exists()) heapFile.delete();
OutputStream os = new BufferedOutputStream(new FileOutputStream(heapFile), 64 * 1024);
DataOutputStream os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(heapFile), 64 * 1024));
long startTime = System.currentTimeMillis();
long wordcount = 0, urlcount = 0;
String wordHash;
@ -142,6 +207,7 @@ public final class indexContainerHeap {
// finally delete the internal cache to switch handling to read-only mode
this.cache = null;
// if the cache will be used in read-only mode afterwards, it must be initiated with initReadMode(file);
}
public int size() {
@ -174,7 +240,7 @@ public final class indexContainerHeap {
public indexContainer next() {
try {
is.read(word);
is.readFully(word);
return new indexContainer(new String(word), kelondroRowSet.importRowSet(is, payloadrow));
} catch (IOException e) {
return null;
@ -199,64 +265,6 @@ public final class indexContainerHeap {
}
}
/**
* restore a heap dump: this is a heap in write mode. There should no heap file
* be assigned in initialization; the file name is given here in this method
* when the heap is once dumped again, the heap file name may be different
* @param heapFile
* @throws IOException
*/
public void restoreHeap(File heapFile) throws IOException {
assert this.heapFile == null; // the heap must be opened on write-mode
if (log != null) log.logInfo("restoring dump for rwi heap '" + heapFile.getName() + "'");
long start = System.currentTimeMillis();
this.cache = Collections.synchronizedSortedMap(new TreeMap<String, indexContainer>(new kelondroByteOrder.StringOrder(payloadrow.getOrdering())));
int urlCount = 0;
synchronized (cache) {
for (indexContainer container : new heapFileEntries(heapFile, this.payloadrow)) {
cache.put(container.getWordHash(), container);
urlCount += container.size();
}
}
if (log != null) log.logInfo("finished rwi heap restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
}
private void indexHeap() throws IOException {
assert this.cache == null;
if (this.index != null) return;
if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist");
if (heapFile.length() >= (long) Integer.MAX_VALUE) throw new IOException("file " + heapFile + " too large, index can only be crated for files less than 2GB");
if (log != null) log.logInfo("creating index for rwi heap '" + heapFile.getName() + "'");
long start = System.currentTimeMillis();
this.index = new kelondroBytesIntMap(payloadrow.primaryKeyLength, (kelondroByteOrder) payloadrow.getOrdering(), 0);
DataInputStream is = null;
long urlCount = 0;
String wordHash;
byte[] word = new byte[payloadrow.primaryKeyLength];
int seek = 0, seek0;
synchronized (index) {
is = new DataInputStream(new BufferedInputStream(new FileInputStream(heapFile), 64*1024));
while (is.available() > 0) {
// remember seek position
seek0 = seek;
// read word
is.read(word);
wordHash = new String(word);
seek += wordHash.length();
// read collection
seek += kelondroRowSet.skipNextRowSet(is, payloadrow);
index.addi(word, seek0);
}
}
is.close();
if (log != null) log.logInfo("finished rwi heap indexing: " + urlCount + " word/URL relations in " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
}
/**
* return an iterator object that creates top-level-clones of the indexContainers
* in the cache, so that manipulations of the iterated objects do not change
@ -326,17 +334,17 @@ public final class indexContainerHeap {
* @return true, if the key is used in the heap; false othervise
*/
public boolean has(String key) {
if (this.cache == null) try {
if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist");
if (index == null) indexHeap();
if (this.readOnlyMode) {
assert index != null;
assert index.row().primaryKeyLength == key.length();
// check if the index contains the key
try {
return index.geti(key.getBytes()) >= 0;
} catch (IOException e) {
log.logSevere("error accessing entry in heap file " + this.heapFile + ": " + e.getMessage());
e.printStackTrace();
return false;
}
} else {
return this.cache.containsKey(key);
}
@ -348,9 +356,7 @@ public final class indexContainerHeap {
* @return the indexContainer if one exist, null otherwise
*/
public indexContainer get(String key) {
if (this.cache == null) try {
if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist");
if (index == null) indexHeap();
if (this.readOnlyMode) try {
assert index != null;
assert index.row().primaryKeyLength == key.length();
@ -359,7 +365,7 @@ public final class indexContainerHeap {
if (pos < 0) return null;
// access the file and read the container
RandomAccessFile raf = new RandomAccessFile(heapFile, "r");
RandomAccessFile raf = new RandomAccessFile(backupFile, "r");
byte[] word = new byte[index.row().primaryKeyLength];
raf.seek(pos);
@ -371,7 +377,7 @@ public final class indexContainerHeap {
raf.close();
return container;
} catch (IOException e) {
log.logSevere("error accessing entry in heap file " + this.heapFile + ": " + e.getMessage());
log.logSevere("error accessing entry in heap file " + this.backupFile + ": " + e.getMessage());
return null;
} else {
return this.cache.get(key);
@ -386,11 +392,14 @@ public final class indexContainerHeap {
public synchronized indexContainer delete(String wordHash) {
// returns the index that had been deleted
assert this.cache != null;
assert !this.readOnlyMode;
return cache.remove(wordHash);
}
public synchronized boolean removeReference(String wordHash, String urlHash) {
assert this.cache != null;
assert !this.readOnlyMode;
indexContainer c = (indexContainer) cache.get(wordHash);
if ((c != null) && (c.remove(urlHash) != null)) {
// removal successful
@ -408,6 +417,8 @@ public final class indexContainerHeap {
// this tries to delete an index from the cache that has this
// urlHash assigned. This can only work if the entry is really fresh
// Such entries must be searched in the latest entries
assert this.cache != null;
assert !this.readOnlyMode;
int delCount = 0;
Iterator<Map.Entry<String, indexContainer>> i = cache.entrySet().iterator();
Map.Entry<String, indexContainer> entry;
@ -432,6 +443,8 @@ public final class indexContainerHeap {
}
public synchronized int removeReferences(String wordHash, Set<String> urlHashes) {
assert this.cache != null;
assert !this.readOnlyMode;
if (urlHashes.size() == 0) return 0;
indexContainer c = (indexContainer) cache.get(wordHash);
int count;
@ -451,6 +464,8 @@ public final class indexContainerHeap {
// this puts the entries into the cache
int added = 0;
if ((container == null) || (container.size() == 0)) return 0;
assert this.cache != null;
assert !this.readOnlyMode;
// put new words into cache
String wordHash = container.getWordHash();
@ -469,6 +484,8 @@ public final class indexContainerHeap {
}
public synchronized void addEntry(String wordHash, indexRWIRowEntry newEntry) {
assert this.cache != null;
assert !this.readOnlyMode;
indexContainer container = (indexContainer) cache.get(wordHash);
if (container == null) container = new indexContainer(wordHash, this.payloadrow, 1);
container.put(newEntry);
@ -483,6 +500,7 @@ public final class indexContainerHeap {
public void restoreArray(File indexArrayFile) throws IOException {
// is only here to read old array data structures
if (!(indexArrayFile.exists())) return;
this.readOnlyMode = false;
kelondroFixedWidthArray dumpArray;
kelondroBufferedRA readBuffer = null;
kelondroRow bufferStructureBasis = new kelondroRow(

@ -80,7 +80,7 @@ public final class indexRAMRI implements indexRI, indexRIReader {
if (indexArrayFile.exists()) log.logSevere("cannot delete old array file: " + indexArrayFile.toString() + "; please delete manually");
} else if (indexHeapFile.exists()) {
try {
heap.restoreHeap(indexHeapFile);
heap.initWriteMode(indexHeapFile);
for (indexContainer ic : (Iterable<indexContainer>) heap.wordContainers(null, false)) {
this.hashDate.setScore(ic.getWordHash(), intTime(ic.lastWrote()));
this.hashScore.setScore(ic.getWordHash(), ic.size());
@ -88,6 +88,8 @@ public final class indexRAMRI implements indexRI, indexRIReader {
} catch (IOException e){
log.logSevere("unable to restore cache dump: " + e.getMessage(), e);
}
} else {
heap.initWriteMode();
}
}
@ -291,7 +293,7 @@ public final class indexRAMRI implements indexRI, indexRIReader {
public synchronized void close() {
// dump cache
try {
heap.dumpHeap(this.indexHeapFile);
heap.dump(this.indexHeapFile);
} catch (IOException e){
log.logSevere("unable to dump cache: " + e.getMessage(), e);
}

@ -566,14 +566,16 @@ public class kelondroEcoFS {
public byte[] next() {
byte[] chunk = new byte[chunksize];
int r;
int r, s;
try {
// read the chunk
this.stream.readFully(chunk);
// skip remaining bytes
r = chunksize;
while (r < recordsize) {
r += this.stream.skip(recordsize - r);
s = (int) this.stream.skip(recordsize - r);
assert s != 0;
r += s;
}
return chunk;
} catch (IOException e) {

@ -243,7 +243,7 @@ public class plasmaCrawlBalancer {
return urlRAMStack.size() > 0 || urlFileStack.size() > 0 || domainStacksNotEmpty();
}
public int size() {
public synchronized int size() {
int componentsize = urlFileStack.size() + urlRAMStack.size() + sizeDomainStacks();
if (componentsize != urlFileIndex.size()) {
// here is urlIndexFile.size() always smaller. why?

@ -24,6 +24,8 @@
package de.anomic.server;
import java.net.SocketException;
import de.anomic.server.logging.serverLog;
public abstract class serverAbstractBusyThread extends serverAbstractThread implements serverBusyThread {
@ -160,6 +162,9 @@ public abstract class serverAbstractBusyThread extends serverAbstractThread impl
timestamp = System.currentTimeMillis();
ratz((isBusy) ? this.busyPause : this.idlePause);
idletime += System.currentTimeMillis() - timestamp;
} catch (SocketException e) {
// in case that a socket is interrupted, this method must die silently (shutdown)
this.log.logFine("socket-job interrupted: " + e.getMessage());
} catch (Exception e) {
// handle exceptions: thread must not die on any unexpected exceptions
// if the exception is too bad it should call terminate()

@ -443,13 +443,13 @@ public final class serverCore extends serverAbstractBusyThread implements server
controlSocket.setSoTimeout(this.timeout);
// keep-alive: if set to true, the server frequently sends keep-alive packets to the client which the client must respond to
// we set this to false to prevent that a missing ack from the client forces the server to close the connection
controlSocket.setKeepAlive(false);
// controlSocket.setKeepAlive(false);
// disable Nagle's algorithm (waiting for more data until packet is full)
controlSocket.setTcpNoDelay(true);
// controlSocket.setTcpNoDelay(true);
// set a non-zero linger, that means that a socket.close() blocks until all data is written
controlSocket.setSoLinger(false, this.timeout);
// controlSocket.setSoLinger(false, this.timeout);
// ensure that MTU-48 is not exceeded to prevent that routers cannot handle large data packets
// read http://www.cisco.com/warp/public/105/38.shtml for explanation
@ -479,6 +479,13 @@ public final class serverCore extends serverAbstractBusyThread implements server
// consuming the isInterrupted Flag. Otherwise we could not properly close the session pool
Thread.interrupted();
// shut down all busySessions
for (Session session: this.busySessions) {
try {session.notify();} catch (IllegalMonitorStateException e) {e.printStackTrace();}
try {session.notifyAll();} catch (IllegalMonitorStateException e) {e.printStackTrace();}
try {session.interrupt();} catch (SecurityException e ) {e.printStackTrace();}
}
// closing the port forwarding channel
if ((portForwardingEnabled) && (portForwarding != null) ) {
try {

@ -63,6 +63,8 @@ import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import de.anomic.data.translator;
import de.anomic.http.HttpClient;
import de.anomic.http.HttpFactory;
@ -70,6 +72,7 @@ import de.anomic.http.HttpResponse;
import de.anomic.http.httpHeader;
import de.anomic.http.httpd;
import de.anomic.http.HttpResponse.Saver;
import de.anomic.http.JakartaCommonsHttpClient;
import de.anomic.index.indexContainer;
import de.anomic.index.indexRWIEntry;
import de.anomic.index.indexRWIRowEntry;
@ -423,6 +426,7 @@ public final class yacy {
serverLog.logConfig("SHUTDOWN", "caught termination signal");
server.terminate(false);
server.interrupt();
server.close();
if (server.isAlive()) try {
// TODO only send request, don't read response (cause server is already down resulting in error)
yacyURL u = new yacyURL((server.withSSL()?"https":"http")+"://localhost:" + serverCore.getPortNr(port), null);
@ -431,13 +435,18 @@ public final class yacy {
} catch (IOException ee) {
serverLog.logConfig("SHUTDOWN", "termination signal to server socket missed (server shutdown, ok)");
}
JakartaCommonsHttpClient.closeAllConnections();
MultiThreadedHttpConnectionManager.shutdownAll();
// idle until the processes are down
while (server.isAlive()) {
if (server.isAlive()) {
Thread.sleep(2000); // wait a while
server.interrupt();
MultiThreadedHttpConnectionManager.shutdownAll();
}
serverLog.logConfig("SHUTDOWN", "server has terminated");
sb.close();
MultiThreadedHttpConnectionManager.shutdownAll();
}
} catch (Exception e) {
serverLog.logSevere("STARTUP", "Unexpected Error: " + e.getClass().getName(),e);

Loading…
Cancel
Save