diff --git a/source/de/anomic/index/indexContainerHeap.java b/source/de/anomic/index/indexContainerHeap.java index c1411e94e..5f7d5f953 100755 --- a/source/de/anomic/index/indexContainerHeap.java +++ b/source/de/anomic/index/indexContainerHeap.java @@ -29,11 +29,11 @@ package de.anomic.index; import java.io.BufferedInputStream; import java.io.BufferedOutputStream; import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.io.RandomAccessFile; import java.util.Collections; import java.util.Iterator; @@ -62,7 +62,8 @@ public final class indexContainerHeap { private serverLog log; private kelondroBytesIntMap index; private SortedMap cache; - private File heapFile; + private File backupFile; + private boolean readOnlyMode; // index xor cache is used. If one is not null, then the other must be null /* @@ -79,39 +80,103 @@ public final class indexContainerHeap { */ /** - * open a new container heap and prepare it as a heap to be written + * opens an existing heap file in undefined mode + * after this a initialization should be made to use the heap: + * either a read-only or read/write mode inititalization * @param payloadrow * @param log */ public indexContainerHeap(kelondroRow payloadrow, serverLog log) { this.payloadrow = payloadrow; this.log = log; - this.cache = Collections.synchronizedSortedMap(new TreeMap(new kelondroByteOrder.StringOrder(payloadrow.getOrdering()))); + this.cache = null; this.index = null; - this.heapFile = null; + this.backupFile = null; + this.readOnlyMode = false; } /** - * opens an existing heap file in read-only mode - * @param indexHeapFile - * @param payloadrow - * @param log + * initializes the heap in read/write mode without reading of a dump first + * another dump reading afterwards is not possible */ - public indexContainerHeap(kelondroRow payloadrow, serverLog log, File heapFile) { - this.payloadrow = payloadrow; - this.log = log; - this.cache = null; - this.index = null; - this.heapFile = heapFile; + public void initWriteMode() { + this.cache = Collections.synchronizedSortedMap(new TreeMap(new kelondroByteOrder.StringOrder(payloadrow.getOrdering()))); + this.readOnlyMode = false; + } + + /** + * restore a heap dump: this is a heap in write mode. There should no heap file + * be assigned in initialization; the file name is given here in this method + * when the heap is once dumped again, the heap file name may be different + * @param heapFile + * @throws IOException + */ + public void initWriteMode(File heapFile) throws IOException { + this.readOnlyMode = false; + if (log != null) log.logInfo("restoring dump for rwi heap '" + heapFile.getName() + "'"); + long start = System.currentTimeMillis(); + this.cache = Collections.synchronizedSortedMap(new TreeMap(new kelondroByteOrder.StringOrder(payloadrow.getOrdering()))); + int urlCount = 0; + synchronized (cache) { + for (indexContainer container : new heapFileEntries(heapFile, this.payloadrow)) { + if (container == null) break; + cache.put(container.getWordHash(), container); + urlCount += container.size(); + } + } + if (log != null) log.logInfo("finished rwi heap restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + ((System.currentTimeMillis() - start) / 1000) + " seconds"); + } + + /** + * init a heap file in read-only mode + * this initiates a heap index generation which is then be used to access elements of the heap + * during the life-time of this object, the file is _not_ open; it is opened each time + * the heap is accessed for reading + * @param heapFile + * @throws IOException + */ + public void initReadMode(File heapFile) throws IOException { + this.readOnlyMode = true; + assert this.cache == null; + assert this.index == null; + this.backupFile = heapFile; + if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist"); + if (heapFile.length() >= (long) Integer.MAX_VALUE) throw new IOException("file " + heapFile + " too large, index can only be crated for files less than 2GB"); + if (log != null) log.logInfo("creating index for rwi heap '" + heapFile.getName() + "'"); + + long start = System.currentTimeMillis(); + this.index = new kelondroBytesIntMap(payloadrow.primaryKeyLength, (kelondroByteOrder) payloadrow.getOrdering(), 0); + DataInputStream is = null; + long urlCount = 0; + String wordHash; + byte[] word = new byte[payloadrow.primaryKeyLength]; + int seek = 0, seek0; + synchronized (index) { + is = new DataInputStream(new BufferedInputStream(new FileInputStream(heapFile), 64*1024)); + + while (is.available() > 0) { + // remember seek position + seek0 = seek; + + // read word + is.readFully(word); + wordHash = new String(word); + seek += wordHash.length(); + + // read collection + seek += kelondroRowSet.skipNextRowSet(is, payloadrow); + index.addi(word, seek0); + } + } + is.close(); + if (log != null) log.logInfo("finished rwi heap indexing: " + urlCount + " word/URL relations in " + ((System.currentTimeMillis() - start) / 1000) + " seconds"); } - public void dumpHeap(File heapFile) throws IOException { - assert this.heapFile == null; + public void dump(File heapFile) throws IOException { assert this.cache != null; - this.heapFile = heapFile; if (log != null) log.logInfo("creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's"); if (heapFile.exists()) heapFile.delete(); - OutputStream os = new BufferedOutputStream(new FileOutputStream(heapFile), 64 * 1024); + DataOutputStream os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(heapFile), 64 * 1024)); long startTime = System.currentTimeMillis(); long wordcount = 0, urlcount = 0; String wordHash; @@ -142,6 +207,7 @@ public final class indexContainerHeap { // finally delete the internal cache to switch handling to read-only mode this.cache = null; + // if the cache will be used in read-only mode afterwards, it must be initiated with initReadMode(file); } public int size() { @@ -174,7 +240,7 @@ public final class indexContainerHeap { public indexContainer next() { try { - is.read(word); + is.readFully(word); return new indexContainer(new String(word), kelondroRowSet.importRowSet(is, payloadrow)); } catch (IOException e) { return null; @@ -198,64 +264,6 @@ public final class indexContainerHeap { this.close(); } } - - /** - * restore a heap dump: this is a heap in write mode. There should no heap file - * be assigned in initialization; the file name is given here in this method - * when the heap is once dumped again, the heap file name may be different - * @param heapFile - * @throws IOException - */ - public void restoreHeap(File heapFile) throws IOException { - assert this.heapFile == null; // the heap must be opened on write-mode - - if (log != null) log.logInfo("restoring dump for rwi heap '" + heapFile.getName() + "'"); - long start = System.currentTimeMillis(); - this.cache = Collections.synchronizedSortedMap(new TreeMap(new kelondroByteOrder.StringOrder(payloadrow.getOrdering()))); - int urlCount = 0; - synchronized (cache) { - for (indexContainer container : new heapFileEntries(heapFile, this.payloadrow)) { - cache.put(container.getWordHash(), container); - urlCount += container.size(); - } - } - if (log != null) log.logInfo("finished rwi heap restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + ((System.currentTimeMillis() - start) / 1000) + " seconds"); - } - - private void indexHeap() throws IOException { - assert this.cache == null; - if (this.index != null) return; - if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist"); - if (heapFile.length() >= (long) Integer.MAX_VALUE) throw new IOException("file " + heapFile + " too large, index can only be crated for files less than 2GB"); - if (log != null) log.logInfo("creating index for rwi heap '" + heapFile.getName() + "'"); - - long start = System.currentTimeMillis(); - this.index = new kelondroBytesIntMap(payloadrow.primaryKeyLength, (kelondroByteOrder) payloadrow.getOrdering(), 0); - DataInputStream is = null; - long urlCount = 0; - String wordHash; - byte[] word = new byte[payloadrow.primaryKeyLength]; - int seek = 0, seek0; - synchronized (index) { - is = new DataInputStream(new BufferedInputStream(new FileInputStream(heapFile), 64*1024)); - - while (is.available() > 0) { - // remember seek position - seek0 = seek; - - // read word - is.read(word); - wordHash = new String(word); - seek += wordHash.length(); - - // read collection - seek += kelondroRowSet.skipNextRowSet(is, payloadrow); - index.addi(word, seek0); - } - } - is.close(); - if (log != null) log.logInfo("finished rwi heap indexing: " + urlCount + " word/URL relations in " + ((System.currentTimeMillis() - start) / 1000) + " seconds"); - } /** * return an iterator object that creates top-level-clones of the indexContainers @@ -326,17 +334,17 @@ public final class indexContainerHeap { * @return true, if the key is used in the heap; false othervise */ public boolean has(String key) { - if (this.cache == null) try { - if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist"); - if (index == null) indexHeap(); + if (this.readOnlyMode) { assert index != null; assert index.row().primaryKeyLength == key.length(); // check if the index contains the key - return index.geti(key.getBytes()) >= 0; - } catch (IOException e) { - log.logSevere("error accessing entry in heap file " + this.heapFile + ": " + e.getMessage()); - return false; + try { + return index.geti(key.getBytes()) >= 0; + } catch (IOException e) { + e.printStackTrace(); + return false; + } } else { return this.cache.containsKey(key); } @@ -348,9 +356,7 @@ public final class indexContainerHeap { * @return the indexContainer if one exist, null otherwise */ public indexContainer get(String key) { - if (this.cache == null) try { - if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist"); - if (index == null) indexHeap(); + if (this.readOnlyMode) try { assert index != null; assert index.row().primaryKeyLength == key.length(); @@ -359,7 +365,7 @@ public final class indexContainerHeap { if (pos < 0) return null; // access the file and read the container - RandomAccessFile raf = new RandomAccessFile(heapFile, "r"); + RandomAccessFile raf = new RandomAccessFile(backupFile, "r"); byte[] word = new byte[index.row().primaryKeyLength]; raf.seek(pos); @@ -371,7 +377,7 @@ public final class indexContainerHeap { raf.close(); return container; } catch (IOException e) { - log.logSevere("error accessing entry in heap file " + this.heapFile + ": " + e.getMessage()); + log.logSevere("error accessing entry in heap file " + this.backupFile + ": " + e.getMessage()); return null; } else { return this.cache.get(key); @@ -386,11 +392,14 @@ public final class indexContainerHeap { public synchronized indexContainer delete(String wordHash) { // returns the index that had been deleted assert this.cache != null; + assert !this.readOnlyMode; return cache.remove(wordHash); } public synchronized boolean removeReference(String wordHash, String urlHash) { + assert this.cache != null; + assert !this.readOnlyMode; indexContainer c = (indexContainer) cache.get(wordHash); if ((c != null) && (c.remove(urlHash) != null)) { // removal successful @@ -408,6 +417,8 @@ public final class indexContainerHeap { // this tries to delete an index from the cache that has this // urlHash assigned. This can only work if the entry is really fresh // Such entries must be searched in the latest entries + assert this.cache != null; + assert !this.readOnlyMode; int delCount = 0; Iterator> i = cache.entrySet().iterator(); Map.Entry entry; @@ -432,6 +443,8 @@ public final class indexContainerHeap { } public synchronized int removeReferences(String wordHash, Set urlHashes) { + assert this.cache != null; + assert !this.readOnlyMode; if (urlHashes.size() == 0) return 0; indexContainer c = (indexContainer) cache.get(wordHash); int count; @@ -451,7 +464,9 @@ public final class indexContainerHeap { // this puts the entries into the cache int added = 0; if ((container == null) || (container.size() == 0)) return 0; - + assert this.cache != null; + assert !this.readOnlyMode; + // put new words into cache String wordHash = container.getWordHash(); indexContainer entries = (indexContainer) cache.get(wordHash); // null pointer exception? wordhash != null! must be cache==null @@ -469,6 +484,8 @@ public final class indexContainerHeap { } public synchronized void addEntry(String wordHash, indexRWIRowEntry newEntry) { + assert this.cache != null; + assert !this.readOnlyMode; indexContainer container = (indexContainer) cache.get(wordHash); if (container == null) container = new indexContainer(wordHash, this.payloadrow, 1); container.put(newEntry); @@ -483,6 +500,7 @@ public final class indexContainerHeap { public void restoreArray(File indexArrayFile) throws IOException { // is only here to read old array data structures if (!(indexArrayFile.exists())) return; + this.readOnlyMode = false; kelondroFixedWidthArray dumpArray; kelondroBufferedRA readBuffer = null; kelondroRow bufferStructureBasis = new kelondroRow( diff --git a/source/de/anomic/index/indexRAMRI.java b/source/de/anomic/index/indexRAMRI.java index e82a7b09c..541b1442e 100644 --- a/source/de/anomic/index/indexRAMRI.java +++ b/source/de/anomic/index/indexRAMRI.java @@ -80,7 +80,7 @@ public final class indexRAMRI implements indexRI, indexRIReader { if (indexArrayFile.exists()) log.logSevere("cannot delete old array file: " + indexArrayFile.toString() + "; please delete manually"); } else if (indexHeapFile.exists()) { try { - heap.restoreHeap(indexHeapFile); + heap.initWriteMode(indexHeapFile); for (indexContainer ic : (Iterable) heap.wordContainers(null, false)) { this.hashDate.setScore(ic.getWordHash(), intTime(ic.lastWrote())); this.hashScore.setScore(ic.getWordHash(), ic.size()); @@ -88,6 +88,8 @@ public final class indexRAMRI implements indexRI, indexRIReader { } catch (IOException e){ log.logSevere("unable to restore cache dump: " + e.getMessage(), e); } + } else { + heap.initWriteMode(); } } @@ -291,7 +293,7 @@ public final class indexRAMRI implements indexRI, indexRIReader { public synchronized void close() { // dump cache try { - heap.dumpHeap(this.indexHeapFile); + heap.dump(this.indexHeapFile); } catch (IOException e){ log.logSevere("unable to dump cache: " + e.getMessage(), e); } diff --git a/source/de/anomic/kelondro/kelondroEcoFS.java b/source/de/anomic/kelondro/kelondroEcoFS.java index 898710548..a0e85f2b7 100644 --- a/source/de/anomic/kelondro/kelondroEcoFS.java +++ b/source/de/anomic/kelondro/kelondroEcoFS.java @@ -566,14 +566,16 @@ public class kelondroEcoFS { public byte[] next() { byte[] chunk = new byte[chunksize]; - int r; + int r, s; try { // read the chunk this.stream.readFully(chunk); // skip remaining bytes r = chunksize; while (r < recordsize) { - r += this.stream.skip(recordsize - r); + s = (int) this.stream.skip(recordsize - r); + assert s != 0; + r += s; } return chunk; } catch (IOException e) { diff --git a/source/de/anomic/plasma/plasmaCrawlBalancer.java b/source/de/anomic/plasma/plasmaCrawlBalancer.java index 797d1182f..0cf9111cb 100644 --- a/source/de/anomic/plasma/plasmaCrawlBalancer.java +++ b/source/de/anomic/plasma/plasmaCrawlBalancer.java @@ -243,7 +243,7 @@ public class plasmaCrawlBalancer { return urlRAMStack.size() > 0 || urlFileStack.size() > 0 || domainStacksNotEmpty(); } - public int size() { + public synchronized int size() { int componentsize = urlFileStack.size() + urlRAMStack.size() + sizeDomainStacks(); if (componentsize != urlFileIndex.size()) { // here is urlIndexFile.size() always smaller. why? diff --git a/source/de/anomic/server/serverAbstractBusyThread.java b/source/de/anomic/server/serverAbstractBusyThread.java index ffd90497f..6fd4e31ac 100644 --- a/source/de/anomic/server/serverAbstractBusyThread.java +++ b/source/de/anomic/server/serverAbstractBusyThread.java @@ -24,6 +24,8 @@ package de.anomic.server; +import java.net.SocketException; + import de.anomic.server.logging.serverLog; public abstract class serverAbstractBusyThread extends serverAbstractThread implements serverBusyThread { @@ -160,6 +162,9 @@ public abstract class serverAbstractBusyThread extends serverAbstractThread impl timestamp = System.currentTimeMillis(); ratz((isBusy) ? this.busyPause : this.idlePause); idletime += System.currentTimeMillis() - timestamp; + } catch (SocketException e) { + // in case that a socket is interrupted, this method must die silently (shutdown) + this.log.logFine("socket-job interrupted: " + e.getMessage()); } catch (Exception e) { // handle exceptions: thread must not die on any unexpected exceptions // if the exception is too bad it should call terminate() diff --git a/source/de/anomic/server/serverCore.java b/source/de/anomic/server/serverCore.java index 746e86427..faef71dc8 100644 --- a/source/de/anomic/server/serverCore.java +++ b/source/de/anomic/server/serverCore.java @@ -443,13 +443,13 @@ public final class serverCore extends serverAbstractBusyThread implements server controlSocket.setSoTimeout(this.timeout); // keep-alive: if set to true, the server frequently sends keep-alive packets to the client which the client must respond to // we set this to false to prevent that a missing ack from the client forces the server to close the connection - controlSocket.setKeepAlive(false); + // controlSocket.setKeepAlive(false); // disable Nagle's algorithm (waiting for more data until packet is full) - controlSocket.setTcpNoDelay(true); + // controlSocket.setTcpNoDelay(true); // set a non-zero linger, that means that a socket.close() blocks until all data is written - controlSocket.setSoLinger(false, this.timeout); + // controlSocket.setSoLinger(false, this.timeout); // ensure that MTU-48 is not exceeded to prevent that routers cannot handle large data packets // read http://www.cisco.com/warp/public/105/38.shtml for explanation @@ -479,6 +479,13 @@ public final class serverCore extends serverAbstractBusyThread implements server // consuming the isInterrupted Flag. Otherwise we could not properly close the session pool Thread.interrupted(); + // shut down all busySessions + for (Session session: this.busySessions) { + try {session.notify();} catch (IllegalMonitorStateException e) {e.printStackTrace();} + try {session.notifyAll();} catch (IllegalMonitorStateException e) {e.printStackTrace();} + try {session.interrupt();} catch (SecurityException e ) {e.printStackTrace();} + } + // closing the port forwarding channel if ((portForwardingEnabled) && (portForwarding != null) ) { try { diff --git a/source/yacy.java b/source/yacy.java index 7928c240f..f4d1a0921 100644 --- a/source/yacy.java +++ b/source/yacy.java @@ -63,6 +63,8 @@ import java.util.regex.Pattern; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; +import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; + import de.anomic.data.translator; import de.anomic.http.HttpClient; import de.anomic.http.HttpFactory; @@ -70,6 +72,7 @@ import de.anomic.http.HttpResponse; import de.anomic.http.httpHeader; import de.anomic.http.httpd; import de.anomic.http.HttpResponse.Saver; +import de.anomic.http.JakartaCommonsHttpClient; import de.anomic.index.indexContainer; import de.anomic.index.indexRWIEntry; import de.anomic.index.indexRWIRowEntry; @@ -423,6 +426,7 @@ public final class yacy { serverLog.logConfig("SHUTDOWN", "caught termination signal"); server.terminate(false); server.interrupt(); + server.close(); if (server.isAlive()) try { // TODO only send request, don't read response (cause server is already down resulting in error) yacyURL u = new yacyURL((server.withSSL()?"https":"http")+"://localhost:" + serverCore.getPortNr(port), null); @@ -431,13 +435,18 @@ public final class yacy { } catch (IOException ee) { serverLog.logConfig("SHUTDOWN", "termination signal to server socket missed (server shutdown, ok)"); } - + JakartaCommonsHttpClient.closeAllConnections(); + MultiThreadedHttpConnectionManager.shutdownAll(); + // idle until the processes are down - while (server.isAlive()) { + if (server.isAlive()) { Thread.sleep(2000); // wait a while + server.interrupt(); + MultiThreadedHttpConnectionManager.shutdownAll(); } serverLog.logConfig("SHUTDOWN", "server has terminated"); sb.close(); + MultiThreadedHttpConnectionManager.shutdownAll(); } } catch (Exception e) { serverLog.logSevere("STARTUP", "Unexpected Error: " + e.getClass().getName(),e);