diff --git a/source/de/anomic/plasma/plasmaWordIndexDistribution.java b/source/de/anomic/plasma/plasmaWordIndexDistribution.java index 01aab547d..e9d5f58bc 100644 --- a/source/de/anomic/plasma/plasmaWordIndexDistribution.java +++ b/source/de/anomic/plasma/plasmaWordIndexDistribution.java @@ -309,7 +309,7 @@ public final class plasmaWordIndexDistribution { final HashMap knownURLs = new HashMap(); while ( (count > 0) && - (currOpenFiles <= maxOpenFiles) && + (currOpenFiles < maxOpenFiles) && (wordHashIterator.hasNext()) && ((nexthash = (String) wordHashIterator.next()) != null) && (nexthash.trim().length() > 0) @@ -740,7 +740,10 @@ public final class plasmaWordIndexDistribution { } public void performTransferWholeIndex() { + plasmaWordIndexEntity[] newIndexEntities = null, oldIndexEntities = null; try { + // pausing the regular index distribution + // TODO: adding sync, to wait for a still running index distribution to finish plasmaWordIndexDistribution.this.paused = true; // initial startingpoint of intex transfer is "------------" @@ -751,9 +754,9 @@ public final class plasmaWordIndexDistribution { * - detected a server shutdown or user interruption * - detected a failure */ - long selectionStart = System.currentTimeMillis(), selectionEnd = 0, selectionTime, iteration = 0; + long selectionStart = System.currentTimeMillis(), selectionEnd = 0, selectionTime = 0, iteration = 0; - plasmaWordIndexEntity[] newIndexEntities = null, oldIndexEntities = null; + Integer openedFiles = new Integer(0); while (!finished && !Thread.currentThread().isInterrupted()) { iteration++; int idxCount = 0; @@ -762,51 +765,71 @@ public final class plasmaWordIndexDistribution { // selecting 500 words to transfer this.status = "Running: Selecting chunk " + iteration; - Object[] selectResult = selectTransferIndexes(this.startPointHash, this.chunkSize, this.maxOpenFiles); - newIndexEntities = (plasmaWordIndexEntity[]) selectResult[0]; + Object[] selectResult = selectTransferIndexes(this.startPointHash, this.chunkSize, this.maxOpenFiles-openedFiles.intValue()); + newIndexEntities = (plasmaWordIndexEntity[]) selectResult[0]; + HashMap urlCache = (HashMap) selectResult[1]; // String (url-hash) / plasmaCrawlLURL.Entry + openedFiles = (Integer) selectResult[2]; - HashMap urlCache = (HashMap) selectResult[1]; // String (url-hash) / plasmaCrawlLURL.Entry + /* If we havn't selected a word chunk this could be because of + * a) no words are left in the index + * b) max open file limit was exceeded + */ if ((newIndexEntities == null) || (newIndexEntities.length == 0)) { - // if there are still words in the index we try it again now if (sb.wordIndex.size() > 0) { + // if there are still words in the index we try it again now startPointHash = "------------"; - continue; + } else { + // otherwise we could end transfer now + plasmaWordIndexDistribution.this.log.logFine("No index available for index transfer, hash start-point " + startPointHash); + this.status = "Finished. " + iteration + " chunks transfered."; + finished = true; } + } else { + // count the indexes again, can be smaller as expected + for (int i = 0; i < newIndexEntities.length; i++) idxCount += newIndexEntities[i].size(); - plasmaWordIndexDistribution.this.log.logFine("No index available for index transfer, hash start-point " + startPointHash); - this.status = "Finished. " + iteration + " chunks transfered."; - return; + // getting start point for next DHT-selection + oldStartingPointHash = startPointHash; + startPointHash = newIndexEntities[newIndexEntities.length - 1].wordHash(); // DHT targets must have greater hashes + + selectionEnd = System.currentTimeMillis(); + selectionTime = selectionEnd - selectionStart; + plasmaWordIndexDistribution.this.log.logInfo("Index selection of " + idxCount + " words [" + newIndexEntities[0].wordHash() + " .. " + newIndexEntities[newIndexEntities.length-1].wordHash() + "]" + + " in " + + (selectionTime / 1000) + " seconds (" + + (1000 * idxCount / (selectionTime)) + " words/s)"); } - // count the indexes again, can be smaller as expected - for (int i = 0; i < newIndexEntities.length; i++) idxCount += newIndexEntities[i].size(); - - // getting start point for next DHT-selection - oldStartingPointHash = startPointHash; - startPointHash = newIndexEntities[newIndexEntities.length - 1].wordHash(); // DHT targets must have greater hashes - - selectionEnd = System.currentTimeMillis(); - selectionTime = selectionEnd - selectionStart; - plasmaWordIndexDistribution.this.log.logInfo("Index selection of " + idxCount + " words [" + newIndexEntities[0].wordHash() + " .. " + newIndexEntities[newIndexEntities.length-1].wordHash() + "]" + - " in " + - (selectionTime / 1000) + " seconds (" + - (1000 * idxCount / (selectionTime)) + " words/s)"); // query status of old worker thread if (worker != null) { this.status = "Finished: Selecting chunk " + iteration; worker.join(); if (!worker.success) { + // if the transfer failed we abort index transfer now this.status = "Aborted because of Transfer error:\n" + worker.getStatus(); + + // cleanup. closing all open files + closeEntities(oldIndexEntities); + oldIndexEntities = null; + closeEntities(newIndexEntities); + newIndexEntities = null; + + // abort index transfer return; } else { + /* + * If index transfer was done successfully we close all remaining open + * files that belong to the old index chunk and handover a new chunk + * to the transfer thread. + * Addintionally we recalculate the chunk size to optimize performance + */ + this.chunkSize = worker.getChunkSize(); long transferTime = worker.getTransferTime(); //TODO: only increase chunk Size if there is free memory left on the server // we need aprox. 73Byte per IndexEntity and an average URL length of 32 char - //if (ft.freeMemory() < 73*2*100) - - + //if (ft.freeMemory() < 73*2*100) if (transferTime > 60*1000) { if (chunkSize>200) chunkSize-=100; } else if (selectionTime < transferTime){ @@ -832,23 +855,23 @@ public final class plasmaWordIndexDistribution { plasmaWordIndexDistribution.this.log.logSevere("Deletion of indexes not possible:" + ee.getMessage(), ee); } } else { - // simply close the indexEntities - for (int i = 0; i < oldIndexEntities.length; i++) try { - oldIndexEntities[i].close(); - } catch (IOException ee) {} + this.closeEntities(oldIndexEntities); transferedIndexCount += idxCount; - } - + } + oldIndexEntities = null; } + this.worker = null; } // handover chunk to transfer worker - worker = new transferIndexWorkerThread(seed,newIndexEntities,urlCache,gzipBody,timeout,iteration,idxCount,idxCount,startPointHash,oldStartingPointHash); - worker.start(); + if (!((newIndexEntities == null) || (newIndexEntities.length == 0))) { + worker = new transferIndexWorkerThread(seed,newIndexEntities,urlCache,gzipBody,timeout,iteration,idxCount,idxCount,startPointHash,oldStartingPointHash); + worker.start(); + } } // if we reach this point we were aborted by the user or by server shutdown - this.status = "aborted"; + if (sb.wordIndex.size() > 0) this.status = "aborted"; } catch (Exception e) { this.status = "Error: " + e.getMessage(); plasmaWordIndexDistribution.this.log.logWarning("Index transfer to peer " + seed.getName() + ":" + seed.hash + " failed:'" + e.getMessage() + "'",e); @@ -859,10 +882,21 @@ public final class plasmaWordIndexDistribution { try {worker.join();}catch(Exception e){} // worker = null; } + if (oldIndexEntities != null) closeEntities(oldIndexEntities); + if (newIndexEntities != null) closeEntities(newIndexEntities); + plasmaWordIndexDistribution.this.paused = false; } } + private void closeEntities(plasmaWordIndexEntity[] indexEntities) { + if ((indexEntities == null)||(indexEntities.length ==0)) return; + + for (int i = 0; i < indexEntities.length; i++) try { + indexEntities[i].close(); + } catch (IOException ee) {} + } + private boolean isAborted() { if (finished || Thread.currentThread().isInterrupted()) { this.status = "aborted";