From 6bc1ff915dd495f05985d3402a34dbfc3b6a08b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Barbosa?= Date: Wed, 17 Jul 2019 14:38:15 +0100 Subject: [PATCH 1/3] doc: Add note regarding ZMQ block notification --- doc/zmq.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/doc/zmq.md b/doc/zmq.md index 7ffc5623b63..a309abd0cc5 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -111,7 +111,9 @@ using other means such as firewalling. Note that when the block chain tip changes, a reorganisation may occur and just the tip will be notified. It is up to the subscriber to -retrieve the chain from the last known block to the new tip. +retrieve the chain from the last known block to the new tip. Also note +that no notification occurs if the tip was in the active chain - this +is the case after calling invalidateblock RPC. There are several possibilities that ZMQ notification can get lost during transmission depending on the communication type you are From aa2622a726bc0f02152d79c888a332694678a989 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Barbosa?= Date: Wed, 17 Jul 2019 16:13:37 +0100 Subject: [PATCH 2/3] qa: Refactor ZMQ test --- test/functional/interface_zmq.py | 69 ++++++++++++++------------------ 1 file changed, 31 insertions(+), 38 deletions(-) diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 1ba781c539c..d1304e77650 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -8,11 +8,9 @@ import struct from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE from test_framework.test_framework import BitcoinTestFramework from test_framework.messages import CTransaction, hash256 -from test_framework.util import assert_equal +from test_framework.util import assert_equal, connect_nodes from io import BytesIO -ADDRESS = "tcp://127.0.0.1:28332" - def hash256_reversed(byte_str): return hash256(byte_str)[::-1] @@ -43,66 +41,61 @@ class ZMQTest (BitcoinTestFramework): self.skip_if_no_py3_zmq() self.skip_if_no_bitcoind_zmq() - def setup_nodes(self): + def run_test(self): import zmq + self.ctx = zmq.Context() + try: + self.test_basic() + finally: + # Destroy the ZMQ context. + self.log.debug("Destroying ZMQ context") + self.ctx.destroy(linger=None) - # Initialize ZMQ context and socket. + def test_basic(self): # All messages are received in the same socket which means # that this test fails if the publishing order changes. # Note that the publishing order is not defined in the documentation and # is subject to change. - self.zmq_context = zmq.Context() - socket = self.zmq_context.socket(zmq.SUB) + import zmq + address = 'tcp://127.0.0.1:28332' + socket = self.ctx.socket(zmq.SUB) socket.set(zmq.RCVTIMEO, 60000) - socket.connect(ADDRESS) + socket.connect(address) # Subscribe to all available topics. - self.hashblock = ZMQSubscriber(socket, b"hashblock") - self.hashtx = ZMQSubscriber(socket, b"hashtx") - self.rawblock = ZMQSubscriber(socket, b"rawblock") - self.rawtx = ZMQSubscriber(socket, b"rawtx") - - self.extra_args = [ - ["-zmqpub%s=%s" % (sub.topic.decode(), ADDRESS) for sub in [self.hashblock, self.hashtx, self.rawblock, self.rawtx]], - [], - ] - self.add_nodes(self.num_nodes, self.extra_args) - self.start_nodes() - self.import_deterministic_coinbase_privkeys() + hashblock = ZMQSubscriber(socket, b"hashblock") + hashtx = ZMQSubscriber(socket, b"hashtx") + rawblock = ZMQSubscriber(socket, b"rawblock") + rawtx = ZMQSubscriber(socket, b"rawtx") - def run_test(self): - try: - self._zmq_test() - finally: - # Destroy the ZMQ context. - self.log.debug("Destroying ZMQ context") - self.zmq_context.destroy(linger=None) + self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx, rawblock, rawtx]]) + connect_nodes(self.nodes[0], 1) - def _zmq_test(self): num_blocks = 5 self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks}) genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE) + self.sync_all() for x in range(num_blocks): # Should receive the coinbase txid. - txid = self.hashtx.receive() + txid = hashtx.receive() # Should receive the coinbase raw transaction. - hex = self.rawtx.receive() + hex = rawtx.receive() tx = CTransaction() tx.deserialize(BytesIO(hex)) tx.calc_sha256() assert_equal(tx.hash, txid.hex()) # Should receive the generated block hash. - hash = self.hashblock.receive().hex() + hash = hashblock.receive().hex() assert_equal(genhashes[x], hash) # The block should only have the coinbase txid. assert_equal([txid.hex()], self.nodes[1].getblock(hash)["tx"]) # Should receive the generated raw block. - block = self.rawblock.receive() + block = rawblock.receive() assert_equal(genhashes[x], hash256_reversed(block[:80]).hex()) if self.is_wallet_compiled(): @@ -111,20 +104,20 @@ class ZMQTest (BitcoinTestFramework): self.sync_all() # Should receive the broadcasted txid. - txid = self.hashtx.receive() + txid = hashtx.receive() assert_equal(payment_txid, txid.hex()) # Should receive the broadcasted raw transaction. - hex = self.rawtx.receive() + hex = rawtx.receive() assert_equal(payment_txid, hash256_reversed(hex).hex()) self.log.info("Test the getzmqnotifications RPC") assert_equal(self.nodes[0].getzmqnotifications(), [ - {"type": "pubhashblock", "address": ADDRESS, "hwm": 1000}, - {"type": "pubhashtx", "address": ADDRESS, "hwm": 1000}, - {"type": "pubrawblock", "address": ADDRESS, "hwm": 1000}, - {"type": "pubrawtx", "address": ADDRESS, "hwm": 1000}, + {"type": "pubhashblock", "address": address, "hwm": 1000}, + {"type": "pubhashtx", "address": address, "hwm": 1000}, + {"type": "pubrawblock", "address": address, "hwm": 1000}, + {"type": "pubrawtx", "address": address, "hwm": 1000}, ]) assert_equal(self.nodes[1].getzmqnotifications(), []) From abdfc5e89b687f73de4ab97e924c29cc27e71c15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Barbosa?= Date: Wed, 17 Jul 2019 14:41:42 +0100 Subject: [PATCH 3/3] qa: Test ZMQ notification after chain reorg --- test/functional/interface_zmq.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index d1304e77650..912b667b0c7 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -46,6 +46,7 @@ class ZMQTest (BitcoinTestFramework): self.ctx = zmq.Context() try: self.test_basic() + self.test_reorg() finally: # Destroy the ZMQ context. self.log.debug("Destroying ZMQ context") @@ -122,5 +123,29 @@ class ZMQTest (BitcoinTestFramework): assert_equal(self.nodes[1].getzmqnotifications(), []) + def test_reorg(self): + import zmq + address = 'tcp://127.0.0.1:28333' + socket = self.ctx.socket(zmq.SUB) + socket.set(zmq.RCVTIMEO, 60000) + socket.connect(address) + hashblock = ZMQSubscriber(socket, b'hashblock') + + # Should only notify the tip if a reorg occurs + self.restart_node(0, ['-zmqpub%s=%s' % (hashblock.topic.decode(), address)]) + + # Generate 1 block in nodes[0] and receive all notifications + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + assert_equal(self.nodes[0].getbestblockhash(), hashblock.receive().hex()) + + # Generate 2 blocks in nodes[1] + self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_UNSPENDABLE) + + # nodes[0] will reorg chain after connecting back nodes[1] + connect_nodes(self.nodes[0], 1) + + # Should receive nodes[1] tip + assert_equal(self.nodes[1].getbestblockhash(), hashblock.receive().hex()) + if __name__ == '__main__': ZMQTest().main()