Merge #20953: test: dedup zmq test setup code (node restart, topics subscription)

4efb6c2d3b zmq test: deduplicate test setup code (node restart, topics subscription) (Sebastian Falbesoner)

Pull request description:

  This PR deduplicates common setup code for the ZMQ functional test. The following steps, previously duplicated in each sub-test, are put into a new method `setup_zmq_test(...)`:
  - create subscriber sockets (`zmq.SUB`) for each topic with the specified timeout (default 60s)
  - restart node0 with specified zmq notifications enabled (`-zmqpub...=tcp://127.0.0.1:...`...)
  - if desired, connect node0 with node1 (note done by default)
  - connect all susbcriber sockets to publisher (running on node0)
  - wait a bit (currently 200ms), to _"Relax so that the subscribers are ready before publishing zmq messages"_

  Note that the last point should be repaced by a more robust method, as this test is still flaky, see #20934 (also #20590 and #20538).

ACKs for top commit:
  instagibbs:
    ACK 4efb6c2d3b
  laanwj:
    Code review ACK 4efb6c2d3b

Tree-SHA512: d49626756a9c669f1133f1b73ce273994b58c760ce0d6a4bdaa384f043a74149dc2b9fa66fe990413d9105f9c3b6ea973e099669e8e02f2902a5b84fa995028c
pull/826/head
fanquake 4 years ago
commit 3734adba39
No known key found for this signature in database
GPG Key ID: 2EEB9F5CC09526C1

@ -80,34 +80,43 @@ class ZMQTest (BitcoinTestFramework):
self.log.debug("Destroying ZMQ context") self.log.debug("Destroying ZMQ context")
self.ctx.destroy(linger=None) self.ctx.destroy(linger=None)
# Restart node with the specified zmq notifications enabled, subscribe to
# all of them and return the corresponding ZMQSubscriber objects.
def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False):
subscribers = []
for topic, address in services:
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, recv_timeout*1000)
subscribers.append(ZMQSubscriber(socket, topic.encode()))
self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services])
if connect_nodes:
self.connect_nodes(0, 1)
for i, sub in enumerate(subscribers):
sub.socket.connect(services[i][1])
# Relax so that the subscribers are ready before publishing zmq messages
sleep(0.2)
return subscribers
def test_basic(self): def test_basic(self):
# Invalid zmq arguments don't take down the node, see #17185. # Invalid zmq arguments don't take down the node, see #17185.
self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"]) self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"])
address = 'tcp://127.0.0.1:28332' address = 'tcp://127.0.0.1:28332'
sockets = [] subs = self.setup_zmq_test(
subs = [] [(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]],
services = [b"hashblock", b"hashtx", b"rawblock", b"rawtx"] connect_nodes=True)
for service in services:
sockets.append(self.ctx.socket(zmq.SUB))
sockets[-1].set(zmq.RCVTIMEO, 60000)
subs.append(ZMQSubscriber(sockets[-1], service))
# Subscribe to all available topics.
hashblock = subs[0] hashblock = subs[0]
hashtx = subs[1] hashtx = subs[1]
rawblock = subs[2] rawblock = subs[2]
rawtx = subs[3] rawtx = subs[3]
self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx, rawblock, rawtx]])
self.connect_nodes(0, 1)
for socket in sockets:
socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
num_blocks = 5 num_blocks = 5
self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks}) self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks})
genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE) genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE)
@ -174,25 +183,10 @@ class ZMQTest (BitcoinTestFramework):
address = 'tcp://127.0.0.1:28333' address = 'tcp://127.0.0.1:28333'
services = [b"hashblock", b"hashtx"]
sockets = []
subs = []
for service in services:
sockets.append(self.ctx.socket(zmq.SUB))
# 2 second timeout to check end of notifications
sockets[-1].set(zmq.RCVTIMEO, 2000)
subs.append(ZMQSubscriber(sockets[-1], service))
# Subscribe to all available topics.
hashblock = subs[0]
hashtx = subs[1]
# Should only notify the tip if a reorg occurs # Should only notify the tip if a reorg occurs
self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx]]) hashblock, hashtx = self.setup_zmq_test(
for socket in sockets: [(topic, address) for topic in ["hashblock", "hashtx"]],
socket.connect(address) recv_timeout=2) # 2 second timeout to check end of notifications
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
# Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications
payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0)
@ -240,15 +234,7 @@ class ZMQTest (BitcoinTestFramework):
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool
""" """
self.log.info("Testing 'sequence' publisher") self.log.info("Testing 'sequence' publisher")
address = 'tcp://127.0.0.1:28333' [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")])
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
seq = ZMQSubscriber(socket, b'sequence')
self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
# Mempool sequence number starts at 1 # Mempool sequence number starts at 1
seq_num = 1 seq_num = 1
@ -399,16 +385,7 @@ class ZMQTest (BitcoinTestFramework):
return return
self.log.info("Testing 'mempool sync' usage of sequence notifier") self.log.info("Testing 'mempool sync' usage of sequence notifier")
address = 'tcp://127.0.0.1:28333' [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], connect_nodes=True)
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
seq = ZMQSubscriber(socket, b'sequence')
self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)])
self.connect_nodes(0, 1)
socket.connect(address)
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
# In-memory counter, should always start at 1 # In-memory counter, should always start at 1
next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"]
@ -508,26 +485,17 @@ class ZMQTest (BitcoinTestFramework):
def test_multiple_interfaces(self): def test_multiple_interfaces(self):
# Set up two subscribers with different addresses # Set up two subscribers with different addresses
subscribers = [] subscribers = self.setup_zmq_test([
for i in range(2): ("hashblock", "tcp://127.0.0.1:28334"),
address = 'tcp://127.0.0.1:%d' % (28334 + i) ("hashblock", "tcp://127.0.0.1:28335"),
socket = self.ctx.socket(zmq.SUB) ])
socket.set(zmq.RCVTIMEO, 60000)
hashblock = ZMQSubscriber(socket, b"hashblock")
socket.connect(address)
subscribers.append({'address': address, 'hashblock': hashblock})
self.restart_node(0, ['-zmqpub%s=%s' % (subscriber['hashblock'].topic.decode(), subscriber['address']) for subscriber in subscribers])
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
# Generate 1 block in nodes[0] and receive all notifications # Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
# Should receive the same block hash on both subscribers # Should receive the same block hash on both subscribers
assert_equal(self.nodes[0].getbestblockhash(), subscribers[0]['hashblock'].receive().hex()) assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex())
assert_equal(self.nodes[0].getbestblockhash(), subscribers[1]['hashblock'].receive().hex()) assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex())
if __name__ == '__main__': if __name__ == '__main__':
ZMQTest().main() ZMQTest().main()

Loading…
Cancel
Save