#!/usr/bin/env python3
# Copyright (c) 2019-2022 The Bitcoin Core developers
# Distributed under the MIT software license, see the accompanying
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
""" Test descriptor wallet function. """
try :
import sqlite3
except ImportError :
pass
import concurrent . futures
from test_framework . blocktools import COINBASE_MATURITY
from test_framework . descriptors import descsum_create
from test_framework . test_framework import BitcoinTestFramework
from test_framework . util import (
assert_equal ,
assert_raises_rpc_error
)
from test_framework . wallet_util import WalletUnlock
class WalletDescriptorTest ( BitcoinTestFramework ) :
def add_options ( self , parser ) :
self . add_wallet_options ( parser , legacy = False )
def set_test_params ( self ) :
self . setup_clean_chain = True
self . num_nodes = 1
self . extra_args = [ [ ' -keypool=100 ' ] ]
self . wallet_names = [ ]
def skip_test_if_missing_module ( self ) :
self . skip_if_no_wallet ( )
self . skip_if_no_sqlite ( )
self . skip_if_no_py_sqlite3 ( )
def test_concurrent_writes ( self ) :
self . log . info ( " Test sqlite concurrent writes are in the correct order " )
self . restart_node ( 0 , extra_args = [ " -unsafesqlitesync=0 " ] )
self . nodes [ 0 ] . createwallet ( wallet_name = " concurrency " , blank = True )
wallet = self . nodes [ 0 ] . get_wallet_rpc ( " concurrency " )
# First import a descriptor that uses hardened dervation so that topping up
# Will require writing a ton to db
wallet . importdescriptors ( [ { " desc " : descsum_create ( " wpkh(tprv8ZgxMBicQKsPeuVhWwi6wuMQGfPKi9Li5GtX35jVNknACgqe3CY4g5xgkfDDJcmtF7o1QnxWDRYw4H5P26PXq7sbcUkEqeR4fg3Kxp2tigg/0h/0h/*h) " ) , " timestamp " : " now " , " active " : True } ] )
with concurrent . futures . ThreadPoolExecutor ( max_workers = 1 ) as thread :
topup = thread . submit ( wallet . keypoolrefill , newsize = 1000 )
# Then while the topup is running, we need to do something that will call
# ChainStateFlushed which will trigger a write to the db, hopefully at the
# same time that the topup still has an open db transaction.
self . nodes [ 0 ] . cli . gettxoutsetinfo ( )
assert_equal ( topup . result ( ) , None )
wallet . unloadwallet ( )
# Check that everything was written
wallet_db = self . nodes [ 0 ] . wallets_path / " concurrency " / self . wallet_data_filename
conn = sqlite3 . connect ( wallet_db )
with conn :
# Retrieve the bestblock_nomerkle record
bestblock_rec = conn . execute ( " SELECT value FROM main WHERE hex(key) = ' 1262657374626C6F636B5F6E6F6D65726B6C65 ' " ) . fetchone ( ) [ 0 ]
# Retrieve the number of descriptor cache records
# Since we store binary data, sqlite's comparison operators don't work everywhere
# so just retrieve all records and process them ourselves.
db_keys = conn . execute ( " SELECT key FROM main " ) . fetchall ( )
cache_records = len ( [ k [ 0 ] for k in db_keys if b " walletdescriptorcache " in k [ 0 ] ] )
conn . close ( )
assert_equal ( bestblock_rec [ 5 : 37 ] [ : : - 1 ] . hex ( ) , self . nodes [ 0 ] . getbestblockhash ( ) )
assert_equal ( cache_records , 1000 )
def run_test ( self ) :
if self . is_bdb_compiled ( ) :
# Make a legacy wallet and check it is BDB
self . nodes [ 0 ] . createwallet ( wallet_name = " legacy1 " , descriptors = False )
wallet_info = self . nodes [ 0 ] . getwalletinfo ( )
assert_equal ( wallet_info [ ' format ' ] , ' bdb ' )
self . nodes [ 0 ] . unloadwallet ( " legacy1 " )
else :
self . log . warning ( " Skipping BDB test " )
# Make a descriptor wallet
self . log . info ( " Making a descriptor wallet " )
self . nodes [ 0 ] . createwallet ( wallet_name = " desc1 " , descriptors = True )
# A descriptor wallet should have 100 addresses * 4 types = 400 keys
self . log . info ( " Checking wallet info " )
wallet_info = self . nodes [ 0 ] . getwalletinfo ( )
assert_equal ( wallet_info [ ' format ' ] , ' sqlite ' )
assert_equal ( wallet_info [ ' keypoolsize ' ] , 400 )
assert_equal ( wallet_info [ ' keypoolsize_hd_internal ' ] , 400 )
assert ' keypoololdest ' not in wallet_info
# Check that getnewaddress works
self . log . info ( " Test that getnewaddress and getrawchangeaddress work " )
addr = self . nodes [ 0 ] . getnewaddress ( " " , " legacy " )
addr_info = self . nodes [ 0 ] . getaddressinfo ( addr )
assert addr_info [ ' desc ' ] . startswith ( ' pkh( ' )
assert_equal ( addr_info [ ' hdkeypath ' ] , ' m/44h/1h/0h/0/0 ' )
addr = self . nodes [ 0 ] . getnewaddress ( " " , " p2sh-segwit " )
addr_info = self . nodes [ 0 ] . getaddressinfo ( addr )
assert addr_info [ ' desc ' ] . startswith ( ' sh(wpkh( ' )
assert_equal ( addr_info [ ' hdkeypath ' ] , ' m/49h/1h/0h/0/0 ' )
addr = self . nodes [ 0 ] . getnewaddress ( " " , " bech32 " )
addr_info = self . nodes [ 0 ] . getaddressinfo ( addr )
assert addr_info [ ' desc ' ] . startswith ( ' wpkh( ' )
assert_equal ( addr_info [ ' hdkeypath ' ] , ' m/84h/1h/0h/0/0 ' )
addr = self . nodes [ 0 ] . getnewaddress ( " " , " bech32m " )
addr_info = self . nodes [ 0 ] . getaddressinfo ( addr )
assert addr_info [ ' desc ' ] . startswith ( ' tr( ' )
assert_equal ( addr_info [ ' hdkeypath ' ] , ' m/86h/1h/0h/0/0 ' )
# Check that getrawchangeaddress works
addr = self . nodes [ 0 ] . getrawchangeaddress ( " legacy " )
addr_info = self . nodes [ 0 ] . getaddressinfo ( addr )
assert addr_info [ ' desc ' ] . startswith ( ' pkh( ' )
assert_equal ( addr_info [ ' hdkeypath ' ] , ' m/44h/1h/0h/1/0 ' )
addr = self . nodes [ 0 ] . getrawchangeaddress ( " p2sh-segwit " )
addr_info = self . nodes [ 0 ] . getaddressinfo ( addr )
assert addr_info [ ' desc ' ] . startswith ( ' sh(wpkh( ' )
assert_equal ( addr_info [ ' hdkeypath ' ] , ' m/49h/1h/0h/1/0 ' )
addr = self . nodes [ 0 ] . getrawchangeaddress ( " bech32 " )
addr_info = self . nodes [ 0 ] . getaddressinfo ( addr )
assert addr_info [ ' desc ' ] . startswith ( ' wpkh( ' )
assert_equal ( addr_info [ ' hdkeypath ' ] , ' m/84h/1h/0h/1/0 ' )
addr = self . nodes [ 0 ] . getrawchangeaddress ( " bech32m " )
addr_info = self . nodes [ 0 ] . getaddressinfo ( addr )
assert addr_info [ ' desc ' ] . startswith ( ' tr( ' )
assert_equal ( addr_info [ ' hdkeypath ' ] , ' m/86h/1h/0h/1/0 ' )
# Make a wallet to receive coins at
self . nodes [ 0 ] . createwallet ( wallet_name = " desc2 " , descriptors = True )
recv_wrpc = self . nodes [ 0 ] . get_wallet_rpc ( " desc2 " )
send_wrpc = self . nodes [ 0 ] . get_wallet_rpc ( " desc1 " )
# Generate some coins
self . generatetoaddress ( self . nodes [ 0 ] , COINBASE_MATURITY + 1 , send_wrpc . getnewaddress ( ) )
# Make transactions
self . log . info ( " Test sending and receiving " )
addr = recv_wrpc . getnewaddress ( )
send_wrpc . sendtoaddress ( addr , 10 )
# Make sure things are disabled
self . log . info ( " Test disabled RPCs " )
assert_raises_rpc_error ( - 4 , " Only legacy wallets are supported by this command " , recv_wrpc . rpc . importprivkey , " cVpF924EspNh8KjYsfhgY96mmxvT6DgdWiTYMtMjuM74hJaU5psW " )
assert_raises_rpc_error ( - 4 , " Only legacy wallets are supported by this command " , recv_wrpc . rpc . importpubkey , send_wrpc . getaddressinfo ( send_wrpc . getnewaddress ( ) ) [ " pubkey " ] )
assert_raises_rpc_error ( - 4 , " Only legacy wallets are supported by this command " , recv_wrpc . rpc . importaddress , recv_wrpc . getnewaddress ( ) )
assert_raises_rpc_error ( - 4 , " Only legacy wallets are supported by this command " , recv_wrpc . rpc . importmulti , [ ] )
assert_raises_rpc_error ( - 4 , " Only legacy wallets are supported by this command " , recv_wrpc . rpc . addmultisigaddress , 1 , [ recv_wrpc . getnewaddress ( ) ] )
assert_raises_rpc_error ( - 4 , " Only legacy wallets are supported by this command " , recv_wrpc . rpc . dumpprivkey , recv_wrpc . getnewaddress ( ) )
assert_raises_rpc_error ( - 4 , " Only legacy wallets are supported by this command " , recv_wrpc . rpc . dumpwallet , ' wallet.dump ' )
assert_raises_rpc_error ( - 4 , " Only legacy wallets are supported by this command " , recv_wrpc . rpc . importwallet , ' wallet.dump ' )
assert_raises_rpc_error ( - 4 , " Only legacy wallets are supported by this command " , recv_wrpc . rpc . sethdseed )
self . log . info ( " Test encryption " )
# Get the master fingerprint before encrypt
info1 = send_wrpc . getaddressinfo ( send_wrpc . getnewaddress ( ) )
# Encrypt wallet 0
send_wrpc . encryptwallet ( ' pass ' )
with WalletUnlock ( send_wrpc , " pass " ) :
addr = send_wrpc . getnewaddress ( )
info2 = send_wrpc . getaddressinfo ( addr )
assert info1 [ ' hdmasterfingerprint ' ] != info2 [ ' hdmasterfingerprint ' ]
assert ' hdmasterfingerprint ' in send_wrpc . getaddressinfo ( send_wrpc . getnewaddress ( ) )
info3 = send_wrpc . getaddressinfo ( addr )
assert_equal ( info2 [ ' desc ' ] , info3 [ ' desc ' ] )
self . log . info ( " Test that getnewaddress still works after keypool is exhausted in an encrypted wallet " )
for _ in range ( 500 ) :
send_wrpc . getnewaddress ( )
self . log . info ( " Test that unlock is needed when deriving only hardened keys in an encrypted wallet " )
with WalletUnlock ( send_wrpc , " pass " ) :
send_wrpc . importdescriptors ( [ {
" desc " : " wpkh(tprv8ZgxMBicQKsPd7Uf69XL1XwhmjHopUGep8GuEiJDZmbQz6o58LninorQAfcKZWARbtRtfnLcJ5MQ2AtHcQJCCRUcMRvmDUjyEmNUWwx8UbK/0h/*h)#y4dfsj7n " ,
" timestamp " : " now " ,
" range " : [ 0 , 10 ] ,
" active " : True
} ] )
# Exhaust keypool of 100
for _ in range ( 100 ) :
send_wrpc . getnewaddress ( address_type = ' bech32 ' )
# This should now error
assert_raises_rpc_error ( - 12 , " Keypool ran out, please call keypoolrefill first " , send_wrpc . getnewaddress , ' ' , ' bech32 ' )
self . log . info ( " Test born encrypted wallets " )
self . nodes [ 0 ] . createwallet ( ' desc_enc ' , False , False , ' pass ' , False , True )
enc_rpc = self . nodes [ 0 ] . get_wallet_rpc ( ' desc_enc ' )
enc_rpc . getnewaddress ( ) # Makes sure that we can get a new address from a born encrypted wallet
self . log . info ( " Test blank descriptor wallets " )
self . nodes [ 0 ] . createwallet ( wallet_name = ' desc_blank ' , blank = True , descriptors = True )
blank_rpc = self . nodes [ 0 ] . get_wallet_rpc ( ' desc_blank ' )
assert_raises_rpc_error ( - 4 , ' This wallet has no available keys ' , blank_rpc . getnewaddress )
self . log . info ( " Test descriptor wallet with disabled private keys " )
self . nodes [ 0 ] . createwallet ( wallet_name = ' desc_no_priv ' , disable_private_keys = True , descriptors = True )
nopriv_rpc = self . nodes [ 0 ] . get_wallet_rpc ( ' desc_no_priv ' )
assert_raises_rpc_error ( - 4 , ' This wallet has no available keys ' , nopriv_rpc . getnewaddress )
self . log . info ( " Test descriptor exports " )
self . nodes [ 0 ] . createwallet ( wallet_name = ' desc_export ' , descriptors = True )
exp_rpc = self . nodes [ 0 ] . get_wallet_rpc ( ' desc_export ' )
self . nodes [ 0 ] . createwallet ( wallet_name = ' desc_import ' , disable_private_keys = True , descriptors = True )
imp_rpc = self . nodes [ 0 ] . get_wallet_rpc ( ' desc_import ' )
addr_types = [ ( ' legacy ' , False , ' pkh( ' , ' 44h/1h/0h ' , - 13 ) ,
( ' p2sh-segwit ' , False , ' sh(wpkh( ' , ' 49h/1h/0h ' , - 14 ) ,
( ' bech32 ' , False , ' wpkh( ' , ' 84h/1h/0h ' , - 13 ) ,
( ' bech32m ' , False , ' tr( ' , ' 86h/1h/0h ' , - 13 ) ,
( ' legacy ' , True , ' pkh( ' , ' 44h/1h/0h ' , - 13 ) ,
( ' p2sh-segwit ' , True , ' sh(wpkh( ' , ' 49h/1h/0h ' , - 14 ) ,
( ' bech32 ' , True , ' wpkh( ' , ' 84h/1h/0h ' , - 13 ) ,
( ' bech32m ' , True , ' tr( ' , ' 86h/1h/0h ' , - 13 ) ]
for addr_type , internal , desc_prefix , deriv_path , int_idx in addr_types :
int_str = ' internal ' if internal else ' external '
self . log . info ( " Testing descriptor address type for {} {} " . format ( addr_type , int_str ) )
if internal :
addr = exp_rpc . getrawchangeaddress ( address_type = addr_type )
else :
addr = exp_rpc . getnewaddress ( address_type = addr_type )
desc = exp_rpc . getaddressinfo ( addr ) [ ' parent_desc ' ]
assert_equal ( desc_prefix , desc [ 0 : len ( desc_prefix ) ] )
idx = desc . index ( ' / ' ) + 1
assert_equal ( deriv_path , desc [ idx : idx + 9 ] )
if internal :
assert_equal ( ' 1 ' , desc [ int_idx ] )
else :
assert_equal ( ' 0 ' , desc [ int_idx ] )
self . log . info ( " Testing the same descriptor is returned for address type {} {} " . format ( addr_type , int_str ) )
for i in range ( 0 , 10 ) :
if internal :
addr = exp_rpc . getrawchangeaddress ( address_type = addr_type )
else :
addr = exp_rpc . getnewaddress ( address_type = addr_type )
test_desc = exp_rpc . getaddressinfo ( addr ) [ ' parent_desc ' ]
assert_equal ( desc , test_desc )
self . log . info ( " Testing import of exported {} descriptor " . format ( addr_type ) )
imp_rpc . importdescriptors ( [ {
' desc ' : desc ,
' active ' : True ,
' next_index ' : 11 ,
' timestamp ' : ' now ' ,
' internal ' : internal
} ] )
for i in range ( 0 , 10 ) :
if internal :
exp_addr = exp_rpc . getrawchangeaddress ( address_type = addr_type )
imp_addr = imp_rpc . getrawchangeaddress ( address_type = addr_type )
else :
exp_addr = exp_rpc . getnewaddress ( address_type = addr_type )
imp_addr = imp_rpc . getnewaddress ( address_type = addr_type )
assert_equal ( exp_addr , imp_addr )
self . log . info ( " Test that loading descriptor wallet containing legacy key types throws error " )
self . nodes [ 0 ] . createwallet ( wallet_name = " crashme " , descriptors = True )
self . nodes [ 0 ] . unloadwallet ( " crashme " )
wallet_db = self . nodes [ 0 ] . wallets_path / " crashme " / self . wallet_data_filename
conn = sqlite3 . connect ( wallet_db )
with conn :
# add "cscript" entry: key type is uint160 (20 bytes), value type is CScript (zero-length here)
conn . execute ( ' INSERT INTO main VALUES(?, ?) ' , ( b ' \x07 cscript ' + b ' \x00 ' * 20 , b ' \x00 ' ) )
conn . close ( )
assert_raises_rpc_error ( - 4 , " Unexpected legacy entry in descriptor wallet found. " , self . nodes [ 0 ] . loadwallet , " crashme " )
self . test_concurrent_writes ( )
if __name__ == ' __main__ ' :
WalletDescriptorTest ( ) . main ( )