@ -20,6 +20,17 @@ import string
import configparser
import sys
def call_with_auth ( node , user , password ) :
url = urllib . parse . urlparse ( node . url )
headers = { " Authorization " : " Basic " + str_to_b64str ( ' {} : {} ' . format ( user , password ) ) }
conn = http . client . HTTPConnection ( url . hostname , url . port )
conn . connect ( )
conn . request ( ' POST ' , ' / ' , ' { " method " : " getbestblockhash " } ' , headers )
resp = conn . getresponse ( )
conn . close ( )
return resp
class HTTPBasicsTest ( BitcoinTestFramework ) :
def set_test_params ( self ) :
@ -28,15 +39,24 @@ class HTTPBasicsTest(BitcoinTestFramework):
def setup_chain ( self ) :
super ( ) . setup_chain ( )
#Append rpcauth to bitcoin.conf before initialization
self . rtpassword = " cA773lm788buwYe4g4WT+05pKyNruVKjQ25x3n0DQcM= "
rpcauth = " rpcauth=rt:93648e835a54c573682c2eb19f882535$7681e9c5b74bdd85e78166031d2058e1069b3ed7ed967c93fc63abba06f31144 "
rpcauth2 = " rpcauth=rt2:f8607b1a88861fac29dfccf9b52ff9f$ff36a0c23c8c62b4846112e50fa888416e94c17bfd4c42f88fd8f55ec6a3137e "
rpcuser = " rpcuser=rpcuser💻 "
rpcpassword = " rpcpassword=rpcpassword🔑 "
self . user = ' ' . join ( SystemRandom ( ) . choice ( string . ascii_letters + string . digits ) for _ in range ( 10 ) )
self . rpcuser = " rpcuser💻 "
self . rpcpassword = " rpcpassword🔑 "
config = configparser . ConfigParser ( )
config . read_file ( open ( self . options . configfile ) )
gen_rpcauth = config [ ' environment ' ] [ ' RPCAUTH ' ]
# Generate RPCAUTH with specified password
self . rt2password = " 8/F3uMDw4KSEbw96U3CA1C4X05dkHDN2BPFjTgZW4KI= "
p = subprocess . Popen ( [ sys . executable , gen_rpcauth , ' rt2 ' , self . rt2password ] , stdout = subprocess . PIPE , universal_newlines = True )
lines = p . stdout . read ( ) . splitlines ( )
rpcauth2 = lines [ 1 ]
# Generate RPCAUTH without specifying password
self . user = ' ' . join ( SystemRandom ( ) . choice ( string . ascii_letters + string . digits ) for _ in range ( 10 ) )
p = subprocess . Popen ( [ sys . executable , gen_rpcauth , self . user ] , stdout = subprocess . PIPE , universal_newlines = True )
lines = p . stdout . read ( ) . splitlines ( )
rpcauth3 = lines [ 1 ]
@ -47,160 +67,40 @@ class HTTPBasicsTest(BitcoinTestFramework):
f . write ( rpcauth2 + " \n " )
f . write ( rpcauth3 + " \n " )
with open ( os . path . join ( get_datadir_path ( self . options . tmpdir , 1 ) , " bitcoin.conf " ) , ' a ' , encoding = ' utf8 ' ) as f :
f . write ( rpcuser + " \n " )
f . write ( rpcpassword + " \n " )
def run_test ( self ) :
##################################################
# Check correctness of the rpcauth config option #
##################################################
url = urllib . parse . urlparse ( self . nodes [ 0 ] . url )
#Old authpair
authpair = url . username + ' : ' + url . password
#New authpair generated via share/rpcauth tool
password = " cA773lm788buwYe4g4WT+05pKyNruVKjQ25x3n0DQcM= "
#Second authpair with different username
password2 = " 8/F3uMDw4KSEbw96U3CA1C4X05dkHDN2BPFjTgZW4KI= "
authpairnew = " rt: " + password
f . write ( " rpcuser= {} \n " . format ( self . rpcuser ) )
f . write ( " rpcpassword= {} \n " . format ( self . rpcpassword ) )
def test_auth ( self , node , user , password ) :
self . log . info ( ' Correct... ' )
headers = { " Authorization " : " Basic " + str_to_b64str ( authpair ) }
assert_equal ( 200 , call_with_auth ( node , user , password ) . status )
conn = http . client . HTTPConnection ( url . hostname , url . port )
conn . connect ( )
conn . request ( ' POST ' , ' / ' , ' { " method " : " getbestblockhash " } ' , headers )
resp = conn . getresponse ( )
assert_equal ( resp . status , 200 )
conn . close ( )
#Use new authpair to confirm both work
self . log . info ( ' Correct... ' )
headers = { " Authorization " : " Basic " + str_to_b64str ( authpairnew ) }
conn = http . client . HTTPConnection ( url . hostname , url . port )
conn . connect ( )
conn . request ( ' POST ' , ' / ' , ' { " method " : " getbestblockhash " } ' , headers )
resp = conn . getresponse ( )
assert_equal ( resp . status , 200 )
conn . close ( )
#Wrong login name with rt's password
self . log . info ( ' Wrong... ' )
authpairnew = " rtwrong: " + password
headers = { " Authorization " : " Basic " + str_to_b64str ( authpairnew ) }
conn = http . client . HTTPConnection ( url . hostname , url . port )
conn . connect ( )
conn . request ( ' POST ' , ' / ' , ' { " method " : " getbestblockhash " } ' , headers )
resp = conn . getresponse ( )
assert_equal ( resp . status , 401 )
conn . close ( )
assert_equal ( 401 , call_with_auth ( node , user , password + ' wrong ' ) . status )
#Wrong password for rt
self . log . info ( ' Wrong... ' )
authpairnew = " rt: " + password + " wrong "
headers = { " Authorization " : " Basic " + str_to_b64str ( authpairnew ) }
assert_equal ( 401 , call_with_auth ( node , user + ' wrong ' , password ) . status )
conn = http . client . HTTPConnection ( url . hostname , url . port )
conn . connect ( )
conn . request ( ' POST ' , ' / ' , ' { " method " : " getbestblockhash " } ' , headers )
resp = conn . getresponse ( )
assert_equal ( resp . status , 401 )
conn . close ( )
#Correct for rt2
self . log . info ( ' Correct... ' )
authpairnew = " rt2: " + password2
headers = { " Authorization " : " Basic " + str_to_b64str ( authpairnew ) }
conn = http . client . HTTPConnection ( url . hostname , url . port )
conn . connect ( )
conn . request ( ' POST ' , ' / ' , ' { " method " : " getbestblockhash " } ' , headers )
resp = conn . getresponse ( )
assert_equal ( resp . status , 200 )
conn . close ( )
#Wrong password for rt2
self . log . info ( ' Wrong... ' )
authpairnew = " rt2: " + password2 + " wrong "
headers = { " Authorization " : " Basic " + str_to_b64str ( authpairnew ) }
conn = http . client . HTTPConnection ( url . hostname , url . port )
conn . connect ( )
conn . request ( ' POST ' , ' / ' , ' { " method " : " getbestblockhash " } ' , headers )
resp = conn . getresponse ( )
assert_equal ( resp . status , 401 )
conn . close ( )
assert_equal ( 401 , call_with_auth ( node , user + ' wrong ' , password + ' wrong ' ) . status )
#Correct for randomly generated user
self . log . info ( ' Correct... ' )
authpairnew = self . user + " : " + self . password
headers = { " Authorization " : " Basic " + str_to_b64str ( authpairnew ) }
conn = http . client . HTTPConnection ( url . hostname , url . port )
conn . connect ( )
conn . request ( ' POST ' , ' / ' , ' { " method " : " getbestblockhash " } ' , headers )
resp = conn . getresponse ( )
assert_equal ( resp . status , 200 )
conn . close ( )
def run_test ( self ) :
# Wrong password for randomly generated user
self . log . info ( ' Wrong... ' )
authpairnew = self . user + " : " + self . password + " Wrong "
headers = { " Authorization " : " Basic " + str_to_b64str ( authpairnew ) }
##################################################
# Check correctness of the rpcauth config option #
##################################################
url = urllib . parse . urlparse ( self . nodes [ 0 ] . url )
conn = http . client . HTTPConnection ( url . hostname , url . port )
conn . connect ( )
conn . request ( ' POST ' , ' / ' , ' { " method " : " getbestblockhash " } ' , headers )
resp = conn . getresponse ( )
assert_equal ( resp . status , 401 )
conn . close ( )
self . test_auth ( self . nodes [ 0 ] , url . username , url . password )
self . test_auth ( self . nodes [ 0 ] , ' rt ' , self . rtpassword )
self . test_auth ( self . nodes [ 0 ] , ' rt2 ' , self . rt2password )
self . test_auth ( self . nodes [ 0 ] , self . user , self . password )
###############################################################
# Check correctness of the rpcuser/rpcpassword config options #
###############################################################
url = urllib . parse . urlparse ( self . nodes [ 1 ] . url )
# rpcuser and rpcpassword authpair
self . log . info ( ' Correct... ' )
rpcuserauthpair = " rpcuser💻:rpcpassword🔑 "
headers = { " Authorization " : " Basic " + str_to_b64str ( rpcuserauthpair ) }
conn = http . client . HTTPConnection ( url . hostname , url . port )
conn . connect ( )
conn . request ( ' POST ' , ' / ' , ' { " method " : " getbestblockhash " } ' , headers )
resp = conn . getresponse ( )
assert_equal ( resp . status , 200 )
conn . close ( )
#Wrong login name with rpcuser's password
rpcuserauthpair = " rpcuserwrong:rpcpassword "
headers = { " Authorization " : " Basic " + str_to_b64str ( rpcuserauthpair ) }
conn = http . client . HTTPConnection ( url . hostname , url . port )
conn . connect ( )
conn . request ( ' POST ' , ' / ' , ' { " method " : " getbestblockhash " } ' , headers )
resp = conn . getresponse ( )
assert_equal ( resp . status , 401 )
conn . close ( )
#Wrong password for rpcuser
self . log . info ( ' Wrong... ' )
rpcuserauthpair = " rpcuser:rpcpasswordwrong "
headers = { " Authorization " : " Basic " + str_to_b64str ( rpcuserauthpair ) }
conn = http . client . HTTPConnection ( url . hostname , url . port )
conn . connect ( )
conn . request ( ' POST ' , ' / ' , ' { " method " : " getbestblockhash " } ' , headers )
resp = conn . getresponse ( )
assert_equal ( resp . status , 401 )
conn . close ( )
self . test_auth ( self . nodes [ 1 ] , self . rpcuser , self . rpcpassword )
if __name__ == ' __main__ ' :
HTTPBasicsTest ( ) . main ( )