// transferRWI.java
// -----------------------
// part of the AnomicHTTPD caching proxy
// (C) by Michael Peter Christen; mc@yacy.net
// first published on http://www.anomic.de
// Frankfurt, Germany, 2004, 2005
//
// $LastChangedDate$
// $LastChangedRevision$
// $LastChangedBy$
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
// You must compile this file with
// javac -classpath .:../classes transferRWI.java
import java.io.IOException ;
import java.util.HashSet ;
import java.util.Iterator ;
import java.util.List ;
import de.anomic.content.RSSMessage ;
import de.anomic.data.Blacklist ;
import de.anomic.document.parser.xml.RSSFeed ;
import de.anomic.http.metadata.RequestHeader ;
import de.anomic.kelondro.text.referencePrototype.WordReferenceRow ;
import de.anomic.kelondro.util.FileUtils ;
import de.anomic.search.Switchboard ;
import de.anomic.search.SwitchboardConstants ;
import de.anomic.server.serverCore ;
import de.anomic.server.serverObjects ;
import de.anomic.server.serverSwitch ;
import de.anomic.yacy.yacyCore ;
import de.anomic.yacy.yacyNetwork ;
import de.anomic.yacy.yacySeed ;
replaced old DHT transmission method with new method. Many things have changed! some of them:
- after a index selection is made, the index is splitted into its vertical components
- from differrent index selctions the splitted components can be accumulated before they are placed into the transmission queue
- each splitted chunk gets its own transmission thread
- multiple transmission threads are started concurrently
- the process can be monitored with the blocking queue servlet
To implement that, a new package de.anomic.yacy.dht was created. Some old files have been removed.
The new index distribution model using a vertical DHT was implemented. An abstraction of this model
is implemented in the new dht package as interface. The freeworld network has now a configuration
of two vertial partitions; sixteen partitions are planned and will be configured if the process is bug-free.
This modification has three main targets:
- enhance the DHT transmission speed
- with a vertical DHT, a search will speed up. With two partitions, two times. With sixteen, sixteen times.
- the vertical DHT will apply a semi-dht for URLs, and peers will receive a fraction of the overall URLs they received before.
with two partitions, the fractions will be halve. With sixteen partitions, a 1/16 of the previous number of URLs.
BE CAREFULL, THIS IS A MAJOR CODE CHANGE, POSSIBLY FULL OF BUGS AND HARMFUL THINGS.
git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5586 6c8d7289-2bf4-0310-a012-ef5d649a1542
16 years ago
import de.anomic.yacy.dht.FlatWordPartitionScheme ;
import de.anomic.yacy.logging.Log ;
public final class transferRWI {
public static serverObjects respond ( final RequestHeader header , final serverObjects post , final serverSwitch env ) throws InterruptedException {
// return variable that accumulates replacements
final Switchboard sb = ( Switchboard ) env ;
final serverObjects prop = new serverObjects ( ) ;
final String contentType = header . getContentType ( ) ;
if ( ( post = = null ) | | ( env = = null ) ) {
logWarning ( contentType , "post or env is null!" ) ;
return prop ;
}
if ( ! yacyNetwork . authentifyRequest ( post , env ) ) {
logWarning ( contentType , "not authentified" ) ;
return prop ;
}
if ( ! post . containsKey ( "wordc" ) ) {
logWarning ( contentType , "missing wordc" ) ;
return prop ;
}
if ( ! post . containsKey ( "entryc" ) ) {
logWarning ( contentType , "missing entryc" ) ;
return prop ;
}
// request values
final String iam = post . get ( "iam" , "" ) ; // seed hash of requester
final String youare = post . get ( "youare" , "" ) ; // seed hash of the target peer, needed for network stability
// final String key = (String) post.get("key", ""); // transmission key
final int wordc = post . getInt ( "wordc" , 0 ) ; // number of different words
final int entryc = post . getInt ( "entryc" , 0 ) ; // number of entries in indexes
byte [ ] indexes = post . get ( "indexes" , "" ) . getBytes ( ) ; // the indexes, as list of word entries
boolean granted = sb . getConfig ( "allowReceiveIndex" , "false" ) . equals ( "true" ) ;
final boolean blockBlacklist = sb . getConfig ( "indexReceiveBlockBlacklist" , "false" ) . equals ( "true" ) ;
final long cachelimit = sb . getConfigLong ( SwitchboardConstants . WORDCACHE_MAX_COUNT , 100000 ) ;
final yacySeed otherPeer = sb . peers . get ( iam ) ;
final String otherPeerName = iam + ":" + ( ( otherPeer = = null ) ? "NULL" : ( otherPeer . getName ( ) + "/" + otherPeer . getVersion ( ) ) ) ;
// response values
int pause = 0 ;
String result = "ok" ;
final StringBuilder unknownURLs = new StringBuilder ( ) ;
if ( ( youare = = null ) | | ( ! youare . equals ( sb . peers . mySeed ( ) . hash ) ) ) {
sb . getLog ( ) . logInfo ( "Rejecting RWIs from peer " + otherPeerName + ". Wrong target. Wanted peer=" + youare + ", iam=" + sb . peers . mySeed ( ) . hash ) ;
result = "wrong_target" ;
pause = 0 ;
} else if ( otherPeer = = null ) {
// we dont want to receive indexes
sb . getLog ( ) . logInfo ( "Rejecting RWIs from peer " + otherPeerName + ". Not granted. Other Peer is unknown" ) ;
result = "not_granted" ;
pause = 60000 ;
} else if ( ! granted ) {
// we dont want to receive indexes
sb . getLog ( ) . logInfo ( "Rejecting RWIs from peer " + otherPeerName + ". Granted is false" ) ;
result = "not_granted" ;
pause = 60000 ;
} else if ( sb . isRobinsonMode ( ) ) {
// we dont want to receive indexes
sb . getLog ( ) . logInfo ( "Rejecting RWIs from peer " + otherPeerName + ". Not granted. This peer is in robinson mode" ) ;
result = "not_granted" ;
pause = 60000 ;
} else if ( sb . indexSegment . termIndex ( ) . getBufferSize ( ) > cachelimit ) {
// we are too busy to receive indexes
sb . getLog ( ) . logInfo ( "Rejecting RWIs from peer " + otherPeerName + ". We are too busy (buffersize=" + sb . indexSegment . termIndex ( ) . getBufferSize ( ) + ")." ) ;
granted = false ; // don't accept more words if there are too many words to flush
result = "busy" ;
pause = 60000 ;
} else if ( otherPeer . getVersion ( ) < 0.75005845 & & otherPeer . getVersion ( ) > = 0.75005821 ) {
// version that sends [B@... hashes
sb . getLog ( ) . logInfo ( "Rejecting RWIs from peer " + otherPeerName + ". Bad version." ) ;
result = "not_granted" ;
pause = 1800000 ;
} else {
// we want and can receive indexes
// log value status (currently added to find outOfMemory error
if ( sb . getLog ( ) . isFine ( ) ) sb . getLog ( ) . logFine ( "Processing " + indexes . length + " bytes / " + wordc + " words / " + entryc + " entries from " + otherPeerName ) ;
final long startProcess = System . currentTimeMillis ( ) ;
// decode request
final List < String > v = FileUtils . strings ( indexes , null ) ;
// free memory
indexes = null ;
// the value-vector should now have the same length as entryc
if ( v . size ( ) ! = entryc ) sb . getLog ( ) . logSevere ( "ERROR WITH ENTRY COUNTER: v=" + v . size ( ) + ", entryc=" + entryc ) ;
// now parse the Strings in the value-vector and write index entries
String estring ;
int p ;
String wordHash ;
String urlHash ;
WordReferenceRow iEntry ;
final HashSet < String > unknownURL = new HashSet < String > ( ) ;
final HashSet < String > knownURL = new HashSet < String > ( ) ;
final String [ ] wordhashes = new String [ v . size ( ) ] ;
int received = 0 ;
int blocked = 0 ;
int receivedURL = 0 ;
final Iterator < String > i = v . iterator ( ) ;
while ( i . hasNext ( ) ) {
serverCore . checkInterruption ( ) ;
estring = i . next ( ) ;
// check if RWI entry is well-formed
p = estring . indexOf ( "{" ) ;
if ( ( p < 0 ) | | ( estring . indexOf ( "x=" ) < 0 ) | | ! ( estring . indexOf ( "[B@" ) < 0 ) ) {
blocked + + ;
continue ;
}
wordHash = estring . substring ( 0 , p ) ;
wordhashes [ received ] = wordHash ;
iEntry = new WordReferenceRow ( estring . substring ( p ) ) ;
urlHash = iEntry . metadataHash ( ) ;
// block blacklisted entries
if ( ( blockBlacklist ) & & ( Switchboard . urlBlacklist . hashInBlacklistedCache ( Blacklist . BLACKLIST_DHT , urlHash ) ) ) {
if ( yacyCore . log . isFine ( ) ) yacyCore . log . logFine ( "transferRWI: blocked blacklisted URLHash '" + urlHash + "' from peer " + otherPeerName ) ;
blocked + + ;
continue ;
}
// check if the entry is in our network domain
final String urlRejectReason = sb . crawlStacker . urlInAcceptedDomainHash ( urlHash ) ;
if ( urlRejectReason ! = null ) {
yacyCore . log . logWarning ( "transferRWI: blocked URL hash '" + urlHash + "' (" + urlRejectReason + ") from peer " + otherPeerName + "; peer is suspected to be a spam-peer (or something is wrong)" ) ;
//if (yacyCore.log.isFine()) yacyCore.log.logFine("transferRWI: blocked URL hash '" + urlHash + "' (" + urlRejectReason + ") from peer " + otherPeerName);
blocked + + ;
continue ;
}
// learn entry
try {
sb . indexSegment . termIndex ( ) . add ( wordHash . getBytes ( ) , iEntry ) ;
} catch ( IOException e ) {
e . printStackTrace ( ) ;
}
serverCore . checkInterruption ( ) ;
// check if we need to ask for the corresponding URL
if ( ! ( knownURL . contains ( urlHash ) | | unknownURL . contains ( urlHash ) ) ) try {
if ( sb . indexSegment . urlMetadata ( ) . exists ( urlHash ) ) {
knownURL . add ( urlHash ) ;
} else {
unknownURL . add ( urlHash ) ;
}
receivedURL + + ;
} catch ( final Exception ex ) {
sb . getLog ( ) . logWarning (
"transferRWI: DB-Error while trying to determine if URL with hash '" +
urlHash + "' is known." , ex ) ;
}
received + + ;
}
sb . peers . mySeed ( ) . incRI ( received ) ;
// finally compose the unknownURL hash list
final Iterator < String > it = unknownURL . iterator ( ) ;
unknownURLs . ensureCapacity ( unknownURL . size ( ) * 25 ) ;
while ( it . hasNext ( ) ) {
unknownURLs . append ( "," ) . append ( it . next ( ) ) ;
}
if ( unknownURLs . length ( ) > 0 ) { unknownURLs . delete ( 0 , 1 ) ; }
if ( ( wordhashes . length = = 0 ) | | ( received = = 0 ) ) {
sb . getLog ( ) . logInfo ( "Received 0 RWIs from " + otherPeerName + ", processed in " + ( System . currentTimeMillis ( ) - startProcess ) + " milliseconds, requesting " + unknownURL . size ( ) + " URLs, blocked " + blocked + " RWIs" ) ;
} else {
final long avdist = ( FlatWordPartitionScheme . std . dhtDistance ( wordhashes [ 0 ] . getBytes ( ) , null , sb . peers . mySeed ( ) ) + FlatWordPartitionScheme . std . dhtDistance ( wordhashes [ received - 1 ] . getBytes ( ) , null , sb . peers . mySeed ( ) ) ) / 2 ;
sb . getLog ( ) . logInfo ( "Received " + received + " Entries " + wordc + " Words [" + wordhashes [ 0 ] + " .. " + wordhashes [ received - 1 ] + "]/" + avdist + " from " + otherPeerName + ", processed in " + ( System . currentTimeMillis ( ) - startProcess ) + " milliseconds, requesting " + unknownURL . size ( ) + "/" + receivedURL + " URLs, blocked " + blocked + " RWIs" ) ;
RSSFeed . channels ( RSSFeed . INDEXRECEIVE ) . addMessage ( new RSSMessage ( "Received " + received + " RWIs [" + wordhashes [ 0 ] + " .. " + wordhashes [ received - 1 ] + "]/" + avdist + " from " + otherPeerName + ", requesting " + unknownURL . size ( ) + " URLs, blocked " + blocked , "" , "" ) ) ;
}
result = "ok" ;
pause = ( int ) ( sb . indexSegment . termIndex ( ) . getBufferSize ( ) * 20000 / sb . getConfigLong ( SwitchboardConstants . WORDCACHE_MAX_COUNT , 100000 ) ) ; // estimation of necessary pause time
}
prop . put ( "unknownURL" , unknownURLs . toString ( ) ) ;
prop . put ( "result" , result ) ;
prop . put ( "pause" , pause ) ;
// return rewrite properties
return prop ;
}
/ * *
* @param requestIdentifier
* @param msg
* /
private static void logWarning ( final String requestIdentifier , final String msg ) {
Log . logWarning ( "transferRWI" , requestIdentifier + " " + msg ) ;
}
}