@ -54,28 +54,31 @@ class ZMQTest (BitcoinTestFramework):
self . ctx . destroy ( linger = None )
def test_basic ( self ) :
# All messages are received in the same socket which means
# that this test fails if the publishing order changes.
# Note that the publishing order is not defined in the documentation and
# is subject to change.
import zmq
# Invalid zmq arguments don't take down the node, see #17185.
self . restart_node ( 0 , [ " -zmqpubrawtx=foo " , " -zmqpubhashtx=bar " ] )
address = ' tcp://127.0.0.1:28332 '
socket = self . ctx . socket ( zmq . SUB )
socket . set ( zmq . RCVTIMEO , 60000 )
sockets = [ ]
subs = [ ]
services = [ b " hashblock " , b " hashtx " , b " rawblock " , b " rawtx " ]
for service in services :
sockets . append ( self . ctx . socket ( zmq . SUB ) )
sockets [ - 1 ] . set ( zmq . RCVTIMEO , 60000 )
subs . append ( ZMQSubscriber ( sockets [ - 1 ] , service ) )
# Subscribe to all available topics.
hashblock = ZMQSubscriber ( socket , b " hashblock " )
hashtx = ZMQSubscriber ( socket , b " hashtx " )
rawblock = ZMQSubscriber ( socket , b " rawblock " )
rawtx = ZMQSubscriber( socket , b " rawtx " )
hashblock = subs[ 0 ]
hashtx = subs[ 1 ]
rawblock = subs[ 2 ]
rawtx = subs[ 3 ]
self . restart_node ( 0 , [ " -zmqpub %s = %s " % ( sub . topic . decode ( ) , address ) for sub in [ hashblock , hashtx , rawblock , rawtx ] ] )
connect_nodes ( self . nodes [ 0 ] , 1 )
socket . connect ( address )
for socket in sockets :
socket . connect ( address )
# Relax so that the subscriber is ready before publishing zmq messages
sleep ( 0.2 )
@ -96,15 +99,16 @@ class ZMQTest (BitcoinTestFramework):
tx . calc_sha256 ( )
assert_equal ( tx . hash , txid . hex ( ) )
# Should receive the generated raw block.
block = rawblock . receive ( )
assert_equal ( genhashes [ x ] , hash256_reversed ( block [ : 80 ] ) . hex ( ) )
# Should receive the generated block hash.
hash = hashblock . receive ( ) . hex ( )
assert_equal ( genhashes [ x ] , hash )
# The block should only have the coinbase txid.
assert_equal ( [ txid . hex ( ) ] , self . nodes [ 1 ] . getblock ( hash ) [ " tx " ] )
# Should receive the generated raw block.
block = rawblock . receive ( )
assert_equal ( genhashes [ x ] , hash256_reversed ( block [ : 80 ] ) . hex ( ) )
if self . is_wallet_compiled ( ) :
self . log . info ( " Wait for tx from second node " )
@ -119,6 +123,13 @@ class ZMQTest (BitcoinTestFramework):
hex = rawtx . receive ( )
assert_equal ( payment_txid , hash256_reversed ( hex ) . hex ( ) )
# Mining the block with this tx should result in second notification
# after coinbase tx notification
self . nodes [ 0 ] . generatetoaddress ( 1 , ADDRESS_BCRT1_UNSPENDABLE )
hashtx . receive ( )
txid = hashtx . receive ( )
assert_equal ( payment_txid , txid . hex ( ) )
self . log . info ( " Test the getzmqnotifications RPC " )
assert_equal ( self . nodes [ 0 ] . getzmqnotifications ( ) , [
@ -131,30 +142,67 @@ class ZMQTest (BitcoinTestFramework):
assert_equal ( self . nodes [ 1 ] . getzmqnotifications ( ) , [ ] )
def test_reorg ( self ) :
if not self . is_wallet_compiled ( ) :
self . log . info ( " Skipping reorg test because wallet is disabled " )
return
import zmq
address = ' tcp://127.0.0.1:28333 '
socket = self . ctx . socket ( zmq . SUB )
socket . set ( zmq . RCVTIMEO , 60000 )
hashblock = ZMQSubscriber ( socket , b ' hashblock ' )
services = [ b " hashblock " , b " hashtx " ]
sockets = [ ]
subs = [ ]
for service in services :
sockets . append ( self . ctx . socket ( zmq . SUB ) )
# 2 second timeout to check end of notifications
sockets [ - 1 ] . set ( zmq . RCVTIMEO , 2000 )
subs . append ( ZMQSubscriber ( sockets [ - 1 ] , service ) )
# Subscribe to all available topics.
hashblock = subs [ 0 ]
hashtx = subs [ 1 ]
# Should only notify the tip if a reorg occurs
self . restart_node ( 0 , [ ' -zmqpub %s = %s ' % ( hashblock . topic . decode ( ) , address ) ] )
socket . connect ( address )
self . restart_node ( 0 , [ " -zmqpub %s = %s " % ( sub . topic . decode ( ) , address ) for sub in [ hashblock , hashtx ] ] )
for socket in sockets :
socket . connect ( address )
# Relax so that the subscriber is ready before publishing zmq messages
sleep ( 0.2 )
# Generate 1 block in nodes[0] and receive all notifications
self . nodes [ 0 ] . generatetoaddress ( 1 , ADDRESS_BCRT1_UNSPENDABLE )
# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
payment_txid = self . nodes [ 0 ] . sendtoaddress ( self . nodes [ 0 ] . getnewaddress ( ) , 1.0 )
disconnect_block = self . nodes [ 0 ] . generatetoaddress ( 1 , ADDRESS_BCRT1_UNSPENDABLE ) [ 0 ]
disconnect_cb = self . nodes [ 0 ] . getblock ( disconnect_block ) [ " tx " ] [ 0 ]
assert_equal ( self . nodes [ 0 ] . getbestblockhash ( ) , hashblock . receive ( ) . hex ( ) )
assert_equal ( hashtx . receive ( ) . hex ( ) , payment_txid )
assert_equal ( hashtx . receive ( ) . hex ( ) , disconnect_cb )
# Generate 2 blocks in nodes[1]
self . nodes [ 1 ] . generatetoaddress ( 2 , ADDRESS_BCRT1_UNSPENDABLE )
connect_blocks = self . nodes [ 1 ] . generatetoaddress ( 2 , ADDRESS_BCRT1_UNSPENDABLE )
# nodes[0] will reorg chain after connecting back nodes[1]
connect_nodes ( self . nodes [ 0 ] , 1 )
self . sync_blocks ( ) # tx in mempool valid but not advertised
# Should receive nodes[1] tip
assert_equal ( self . nodes [ 1 ] . getbestblockhash ( ) , hashblock . receive ( ) . hex ( ) )
# During reorg:
# Get old payment transaction notification from disconnect and disconnected cb
assert_equal ( hashtx . receive ( ) . hex ( ) , payment_txid )
assert_equal ( hashtx . receive ( ) . hex ( ) , disconnect_cb )
# And the payment transaction again due to mempool entry
assert_equal ( hashtx . receive ( ) . hex ( ) , payment_txid )
assert_equal ( hashtx . receive ( ) . hex ( ) , payment_txid )
# And the new connected coinbases
for i in [ 0 , 1 ] :
assert_equal ( hashtx . receive ( ) . hex ( ) , self . nodes [ 1 ] . getblock ( connect_blocks [ i ] ) [ " tx " ] [ 0 ] )
# If we do a simple invalidate we announce the disconnected coinbase
self . nodes [ 0 ] . invalidateblock ( connect_blocks [ 1 ] )
assert_equal ( hashtx . receive ( ) . hex ( ) , self . nodes [ 1 ] . getblock ( connect_blocks [ 1 ] ) [ " tx " ] [ 0 ] )
# And the current tip
assert_equal ( hashtx . receive ( ) . hex ( ) , self . nodes [ 1 ] . getblock ( connect_blocks [ 0 ] ) [ " tx " ] [ 0 ] )
if __name__ == ' __main__ ' :
ZMQTest ( ) . main ( )