@ -76,11 +76,20 @@ public final class plasmaWordIndexDistribution {
private boolean closed ;
private boolean gzipBody ;
private int timeout ;
private int maxOpenFiles ;
public transferIndexThread transferIdxThread = null ;
public plasmaWordIndexDistribution ( plasmaURLPool urlPool , plasmaWordIndex wordIndex , serverLog log ,
boolean enable , boolean enabledWhileCrawling , boolean gzipBody , int timeout ) {
public plasmaWordIndexDistribution (
plasmaURLPool urlPool ,
plasmaWordIndex wordIndex ,
serverLog log ,
boolean enable ,
boolean enabledWhileCrawling ,
boolean gzipBody ,
int timeout ,
int maxOpenFiles
) {
this . urlPool = urlPool ;
this . wordIndex = wordIndex ;
this . enabled = enable ;
@ -90,6 +99,7 @@ public final class plasmaWordIndexDistribution {
setCounts ( 100 /*indexCount*/ , 1 /*juniorPeerCount*/ , 3 /*seniorPeerCount*/ , 8000 ) ;
this . gzipBody = gzipBody ;
this . timeout = timeout ;
this . maxOpenFiles = maxOpenFiles ;
}
public void enable ( ) {
@ -190,8 +200,9 @@ public final class plasmaWordIndexDistribution {
// collect index
String startPointHash = selectTransferStart ( ) ;
log . logFine ( "Selected hash " + startPointHash + " as start point for index distribution, distance = " + yacyDHTAction . dhtDistance ( yacyCore . seedDB . mySeed . hash , startPointHash ) ) ;
Object [ ] selectResult = selectTransferIndexes ( startPointHash , indexCount );
Object [ ] selectResult = selectTransferIndexes ( startPointHash , indexCount , this . maxOpenFiles );
plasmaWordIndexEntity [ ] indexEntities = ( plasmaWordIndexEntity [ ] ) selectResult [ 0 ] ;
Integer openedFiles = ( Integer ) selectResult [ 2 ] ;
HashMap urlCache = ( HashMap ) selectResult [ 1 ] ; // String (url-hash) / plasmaCrawlLURL.Entry
if ( ( indexEntities = = null ) | | ( indexEntities . length = = 0 ) ) {
log . logFine ( "No index available for index transfer, hash start-point " + startPointHash ) ;
@ -282,12 +293,13 @@ public final class plasmaWordIndexDistribution {
}
Object [ ] /* of {plasmaWordIndexEntity[], HashMap(String, plasmaCrawlLURL.Entry)}*/
selectTransferIndexes ( String hash , int count ) {
selectTransferIndexes ( String hash , int count , int maxOpenFiles ) {
// the hash is a start hash from where the indexes are picked
ArrayList tmpEntities = new ArrayList ( count ) ;
String nexthash = "" ;
try {
Iterator wordHashIterator = wordIndex . wordHashes ( hash , true , true ) ;
int currOpenFiles = 0 ;
Iterator wordHashIterator = this . wordIndex . wordHashes ( hash , true , true ) ;
plasmaWordIndexEntity indexEntity , tmpEntity ;
Enumeration urlEnum ;
Iterator hashIter ;
@ -295,9 +307,14 @@ public final class plasmaWordIndexDistribution {
plasmaCrawlLURL . Entry lurl ;
final HashSet unknownURLEntries = new HashSet ( ) ;
final HashMap knownURLs = new HashMap ( ) ;
while ( ( count > 0 ) & & ( wordHashIterator . hasNext ( ) ) & &
( ( nexthash = ( String ) wordHashIterator . next ( ) ) ! = null ) & & ( nexthash . trim ( ) . length ( ) > 0 ) ) {
indexEntity = wordIndex . getEntity ( nexthash , true ) ;
while (
( count > 0 ) & &
( currOpenFiles < = maxOpenFiles ) & &
( wordHashIterator . hasNext ( ) ) & &
( ( nexthash = ( String ) wordHashIterator . next ( ) ) ! = null ) & &
( nexthash . trim ( ) . length ( ) > 0 )
) {
indexEntity = this . wordIndex . getEntity ( nexthash , true ) ;
if ( indexEntity . size ( ) = = 0 ) {
indexEntity . deleteComplete ( ) ;
} else if ( ( indexEntity . size ( ) < = count ) | | // if we havn't exceeded the limit
@ -309,12 +326,12 @@ public final class plasmaWordIndexDistribution {
unknownURLEntries . clear ( ) ;
while ( urlEnum . hasMoreElements ( ) ) {
indexEntry = ( plasmaWordIndexEntry ) urlEnum . nextElement ( ) ;
lurl = urlPool. loadedURL . getEntry ( indexEntry . getUrlHash ( ) ) ;
lurl = this . urlPool. loadedURL . getEntry ( indexEntry . getUrlHash ( ) ) ;
if ( ( lurl = = null ) | | ( lurl . toString ( ) = = null ) ) {
unknownURLEntries . add ( indexEntry . getUrlHash ( ) ) ;
} else {
if ( lurl . toString ( ) = = null ) {
urlPool. loadedURL . remove ( indexEntry . getUrlHash ( ) ) ;
this . urlPool. loadedURL . remove ( indexEntry . getUrlHash ( ) ) ;
unknownURLEntries . add ( indexEntry . getUrlHash ( ) ) ;
} else {
knownURLs . put ( indexEntry . getUrlHash ( ) , lurl ) ;
@ -334,6 +351,7 @@ public final class plasmaWordIndexDistribution {
tmpEntities . add ( indexEntity ) ;
this . log . logFine ( "Selected whole index (" + indexEntity . size ( ) + " URLs, " + unknownURLEntries . size ( ) + " not bound) for word " + indexEntity . wordHash ( ) ) ;
count - = indexEntity . size ( ) ;
currOpenFiles + + ;
}
} catch ( kelondroException e ) {
this . log . logSevere ( "plasmaWordIndexDistribution/1: deleted DB for word " + indexEntity . wordHash ( ) , e ) ;
@ -347,12 +365,12 @@ public final class plasmaWordIndexDistribution {
unknownURLEntries . clear ( ) ;
while ( ( urlEnum . hasMoreElements ( ) ) & & ( count > 0 ) ) {
indexEntry = ( plasmaWordIndexEntry ) urlEnum . nextElement ( ) ;
lurl = urlPool. loadedURL . getEntry ( indexEntry . getUrlHash ( ) ) ;
lurl = this . urlPool. loadedURL . getEntry ( indexEntry . getUrlHash ( ) ) ;
if ( lurl = = null ) {
unknownURLEntries . add ( indexEntry . getUrlHash ( ) ) ;
} else {
if ( lurl . toString ( ) = = null ) {
urlPool. loadedURL . remove ( indexEntry . getUrlHash ( ) ) ;
this . urlPool. loadedURL . remove ( indexEntry . getUrlHash ( ) ) ;
unknownURLEntries . add ( indexEntry . getUrlHash ( ) ) ;
} else {
knownURLs . put ( indexEntry . getUrlHash ( ) , lurl ) ;
@ -367,10 +385,10 @@ public final class plasmaWordIndexDistribution {
indexEntity . removeEntry ( ( String ) hashIter . next ( ) , true ) ;
}
// use whats remaining
log. logFine ( "Selected partial index (" + tmpEntity . size ( ) + " from " + indexEntity . size ( ) + " URLs, " + unknownURLEntries . size ( ) + " not bound) for word " + tmpEntity . wordHash ( ) ) ;
this . log. logFine ( "Selected partial index (" + tmpEntity . size ( ) + " from " + indexEntity . size ( ) + " URLs, " + unknownURLEntries . size ( ) + " not bound) for word " + tmpEntity . wordHash ( ) ) ;
tmpEntities . add ( tmpEntity ) ;
} catch ( kelondroException e ) {
log. logSevere ( "plasmaWordIndexDistribution/2: deleted DB for word " + indexEntity . wordHash ( ) , e ) ;
this . log. logSevere ( "plasmaWordIndexDistribution/2: deleted DB for word " + indexEntity . wordHash ( ) , e ) ;
try { indexEntity . deleteComplete ( ) ; } catch ( IOException ee ) { }
}
indexEntity . close ( ) ; // important: is not closed elswhere and cannot be deleted afterwards
@ -380,12 +398,12 @@ public final class plasmaWordIndexDistribution {
}
// transfer to array
plasmaWordIndexEntity [ ] indexEntities = ( plasmaWordIndexEntity [ ] ) tmpEntities . toArray ( new plasmaWordIndexEntity [ tmpEntities . size ( ) ] ) ;
return new Object [ ] { indexEntities , knownURLs };
return new Object [ ] { indexEntities , knownURLs , new Integer ( currOpenFiles ) };
} catch ( IOException e ) {
log. logSevere ( "selectTransferIndexes IO-Error (hash=" + nexthash + "): " + e . getMessage ( ) , e ) ;
this . log. logSevere ( "selectTransferIndexes IO-Error (hash=" + nexthash + "): " + e . getMessage ( ) , e ) ;
return new Object [ ] { new plasmaWordIndexEntity [ 0 ] , new HashMap ( 0 ) } ;
} catch ( kelondroException e ) {
log. logSevere ( "selectTransferIndexes database corrupted: " + e . getMessage ( ) , e ) ;
this . log. logSevere ( "selectTransferIndexes database corrupted: " + e . getMessage ( ) , e ) ;
return new Object [ ] { new plasmaWordIndexEntity [ 0 ] , new HashMap ( 0 ) } ;
}
}
@ -641,6 +659,7 @@ public final class plasmaWordIndexDistribution {
private boolean finished = false ;
private boolean gzipBody = false ;
private int timeout = 60000 ;
private int maxOpenFiles = 800 ;
private int transferedIndexCount = 0 ;
private String status = "Running" ;
private String oldStartingPointHash = "------------" , startPointHash = "------------" ;
@ -659,6 +678,7 @@ public final class plasmaWordIndexDistribution {
this . wordsDBSize = sb . wordIndex . size ( ) ;
this . gzipBody = "true" . equalsIgnoreCase ( sb . getConfig ( "indexTransfer.gzipBody" , "false" ) ) ;
this . timeout = ( int ) sb . getConfigLong ( "indexTransfer.timeout" , 60000 ) ;
this . maxOpenFiles = ( int ) sb . getConfigLong ( "indexTransfer.maxOpenFiles" , 800 ) ;
}
public void run ( ) {
@ -742,7 +762,7 @@ public final class plasmaWordIndexDistribution {
// selecting 500 words to transfer
this . status = "Running: Selecting chunk " + iteration ;
Object [ ] selectResult = selectTransferIndexes ( startPointHash , chunkSize ) ;
Object [ ] selectResult = selectTransferIndexes ( this . startPointHash , this . chunkSize , this . maxOpenFiles ) ;
newIndexEntities = ( plasmaWordIndexEntity [ ] ) selectResult [ 0 ] ;
HashMap urlCache = ( HashMap ) selectResult [ 1 ] ; // String (url-hash) / plasmaCrawlLURL.Entry