@ -1,4 +1,4 @@
// transferRWI.java
// transferRWI.java
// -----------------------
// part of the AnomicHTTPD caching proxy
// (C) by Michael Peter Christen; mc@yacy.net
@ -35,12 +35,13 @@ import net.yacy.cora.document.RSSMessage;
import net.yacy.cora.document.UTF8 ;
import net.yacy.cora.protocol.RequestHeader ;
import net.yacy.kelondro.data.meta.URIMetadataRow ;
import net.yacy.kelondro.data.word.WordReference ;
import net.yacy.kelondro.data.word.WordReferenceRow ;
import net.yacy.kelondro.index.HandleSet ;
import net.yacy.kelondro.logging.Log ;
import net.yacy.kelondro.rwi.IndexCell ;
import net.yacy.kelondro.util.FileUtils ;
import net.yacy.repository.Blacklist ;
import de.anomic.search.Segments ;
import de.anomic.search.Switchboard ;
import de.anomic.search.SwitchboardConstants ;
@ -56,7 +57,7 @@ import de.anomic.yacy.dht.FlatWordPartitionScheme;
public final class transferRWI {
public static serverObjects respond ( final RequestHeader header , final serverObjects post , final serverSwitch env ) throws InterruptedException {
// return variable that accumulates replacements
final Switchboard sb = ( Switchboard ) env ;
final serverObjects prop = new serverObjects ( ) ;
@ -77,7 +78,7 @@ public final class transferRWI {
logWarning ( contentType , "missing entryc" ) ;
return prop ;
}
// request values
final String iam = post . get ( "iam" , "" ) ; // seed hash of requester
final String youare = post . get ( "youare" , "" ) ; // seed hash of the target peer, needed for network stability
@ -89,13 +90,13 @@ public final class transferRWI {
final boolean blockBlacklist = sb . getConfigBool ( "indexReceiveBlockBlacklist" , false ) ;
final long cachelimit = sb . getConfigLong ( SwitchboardConstants . WORDCACHE_MAX_COUNT , 100000 ) ;
final yacySeed otherPeer = sb . peers . get ( iam ) ;
final String otherPeerName = iam + ":" + ( ( otherPeer = = null ) ? "NULL" : ( otherPeer . getName ( ) + "/" + otherPeer . getVersion ( ) ) ) ;
final String otherPeerName = iam + ":" + ( ( otherPeer = = null ) ? "NULL" : ( otherPeer . getName ( ) + "/" + otherPeer . getVersion ( ) ) ) ;
// response values
int pause = 0 ;
String result = "ok" ;
final StringBuilder unknownURLs = new StringBuilder ( 6000 ) ;
if ( ( youare = = null ) | | ( ! youare . equals ( sb . peers . mySeed ( ) . hash ) ) ) {
sb . getLog ( ) . logInfo ( "Rejecting RWIs from peer " + otherPeerName + ". Wrong target. Wanted peer=" + youare + ", iam=" + sb . peers . mySeed ( ) . hash ) ;
result = "wrong_target" ;
@ -134,7 +135,7 @@ public final class transferRWI {
// decode request
System . out . println ( "STRINGS " + UTF8 . String ( indexes ) ) ;
Iterator < String > it = FileUtils . strings ( indexes ) ;
final Iterator < String > it = FileUtils . strings ( indexes ) ;
// free memory
indexes = null ;
@ -151,10 +152,11 @@ public final class transferRWI {
int received = 0 ;
int blocked = 0 ;
int receivedURL = 0 ;
final IndexCell < WordReference > cell = sb . indexSegments . termIndex ( Segments . Process . DHTIN ) ;
while ( it . hasNext ( ) ) {
serverCore . checkInterruption ( ) ;
estring = it . next ( ) ;
// check if RWI entry is well-formed
p = estring . indexOf ( '{' ) ;
if ( ( p < 0 ) | | ( estring . indexOf ( "x=" ) < 0 ) | | ! ( estring . indexOf ( "[B@" ) < 0 ) ) {
@ -165,14 +167,14 @@ public final class transferRWI {
wordhashes . add ( wordHash ) ;
iEntry = new WordReferenceRow ( estring . substring ( p ) ) ;
urlHash = iEntry . urlhash ( ) ;
// block blacklisted entries
if ( ( blockBlacklist ) & & ( Switchboard . urlBlacklist . hashInBlacklistedCache ( Blacklist . BLACKLIST_DHT , urlHash ) ) ) {
if ( yacyCore . log . isFine ( ) ) yacyCore . log . logFine ( "transferRWI: blocked blacklisted URLHash '" + ASCII . String ( urlHash ) + "' from peer " + otherPeerName ) ;
blocked + + ;
continue ;
}
// check if the entry is in our network domain
final String urlRejectReason = sb . crawlStacker . urlInAcceptedDomainHash ( urlHash ) ;
if ( urlRejectReason ! = null ) {
@ -181,11 +183,11 @@ public final class transferRWI {
blocked + + ;
continue ;
}
// learn entry
try {
sb. indexSegments . termIndex ( Segments . Process . DHTIN ) . add ( wordHash . getBytes ( ) , iEntry ) ;
} catch ( Exception e ) {
cell . add ( wordHash . getBytes ( ) , iEntry ) ;
} catch ( final Exception e ) {
Log . logException ( e ) ;
}
serverCore . checkInterruption ( ) ;
@ -208,7 +210,7 @@ public final class transferRWI {
sb . peers . mySeed ( ) . incRI ( received ) ;
// finally compose the unknownURL hash list
Iterator < byte [ ] > bit = unknownURL . iterator ( ) ;
final Iterator < byte [ ] > bit = unknownURL . iterator ( ) ;
unknownURLs . ensureCapacity ( unknownURL . size ( ) * 25 ) ;
while ( bit . hasNext ( ) ) {
unknownURLs . append ( "," ) . append ( UTF8 . String ( bit . next ( ) ) ) ;
@ -217,14 +219,14 @@ public final class transferRWI {
if ( wordhashes . isEmpty ( ) | | received = = 0 ) {
sb . getLog ( ) . logInfo ( "Received 0 RWIs from " + otherPeerName + ", processed in " + ( System . currentTimeMillis ( ) - startProcess ) + " milliseconds, requesting " + unknownURL . size ( ) + " URLs, blocked " + blocked + " RWIs" ) ;
} else {
String firstHash = wordhashes . get ( 0 ) ;
String lastHash = wordhashes . get ( wordhashes . size ( ) - 1 ) ;
final String firstHash = wordhashes . get ( 0 ) ;
final String lastHash = wordhashes . get ( wordhashes . size ( ) - 1 ) ;
final long avdist = ( FlatWordPartitionScheme . std . dhtDistance ( firstHash . getBytes ( ) , null , sb . peers . mySeed ( ) ) + FlatWordPartitionScheme . std . dhtDistance ( lastHash . getBytes ( ) , null , sb . peers . mySeed ( ) ) ) / 2 ;
sb . getLog ( ) . logInfo ( "Received " + received + " RWIs, " + wordc + " Words [" + firstHash + " .. " + lastHash + "] / " + avdist + ", blocked " + blocked + ", requesting " + unknownURL . size ( ) + "/" + receivedURL + " URLs from " + otherPeerName ) ;
yacyChannel . channels ( yacyChannel . DHTRECEIVE ) . addMessage ( new RSSMessage ( "Received " + received + " RWIs, " + wordc + " Words [" + firstHash + " .. " + lastHash + "] / " + avdist + ", blocked " + blocked + ", requesting " + unknownURL . size ( ) + "/" + receivedURL + " URLs from " + otherPeerName , "" , otherPeer . hash ) ) ;
sb . getLog ( ) . logInfo ( "Received " + received + " RWIs, " + wordc + " Words [" + firstHash + " .. " + lastHash + "] , processed in " + ( System . currentTimeMillis ( ) - startProcess ) + " milliseconds, " + avdist + ", blocked " + blocked + ", requesting " + unknownURL . size ( ) + "/" + receivedURL + " URLs from " + otherPeerName ) ;
yacyChannel . channels ( yacyChannel . DHTRECEIVE ) . addMessage ( new RSSMessage ( "Received " + received + " RWIs, " + wordc + " Words [" + firstHash + " .. " + lastHash + "] , processed in " + ( System . currentTimeMillis ( ) - startProcess ) + " milliseconds, " + avdist + ", blocked " + blocked + ", requesting " + unknownURL . size ( ) + "/" + receivedURL + " URLs from " + otherPeerName , "" , otherPeer . hash ) ) ;
}
result = "ok" ;
pause = ( int ) ( sb . indexSegments . termIndex ( Segments . Process . DHTIN ) . getBufferSize ( ) * 20000 / sb . getConfigLong ( SwitchboardConstants . WORDCACHE_MAX_COUNT , 100000 ) ) ; // estimation of necessary pause time
}