From 3997ab915451a702eed2153a0727b0a78c0450ac Mon Sep 17 00:00:00 2001 From: Amiti Uttarwar Date: Wed, 10 Jun 2020 13:29:07 -0700 Subject: [PATCH] [test] Add test framework support to create outbound connections. In the interest of increasing our P2P test coverage, add support to create full-relay or block-relay-only connections. To support this, a P2P connection spins up a listening thread & uses a callback to trigger the node initiating the connection. Co-authored-by: Anthony Towns --- test/functional/test_framework/p2p.py | 99 +++++++++++++++++---- test/functional/test_framework/test_node.py | 27 +++++- 2 files changed, 110 insertions(+), 16 deletions(-) diff --git a/test/functional/test_framework/p2p.py b/test/functional/test_framework/p2p.py index ea769ddfa2..fa4a567aac 100755 --- a/test/functional/test_framework/p2p.py +++ b/test/functional/test_framework/p2p.py @@ -71,7 +71,11 @@ from test_framework.messages import ( NODE_WITNESS, sha256, ) -from test_framework.util import wait_until_helper +from test_framework.util import ( + MAX_NODES, + p2p_port, + wait_until_helper, +) logger = logging.getLogger("TestFramework.p2p") @@ -139,7 +143,7 @@ class P2PConnection(asyncio.Protocol): def is_connected(self): return self._transport is not None - def peer_connect(self, dstaddr, dstport, *, net, timeout_factor): + def peer_connect_helper(self, dstaddr, dstport, net, timeout_factor): assert not self.is_connected self.timeout_factor = timeout_factor self.dstaddr = dstaddr @@ -148,12 +152,20 @@ class P2PConnection(asyncio.Protocol): self.on_connection_send_msg = None self.recvbuf = b"" self.magic_bytes = MAGIC_BYTES[net] - logger.debug('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport)) + + def peer_connect(self, dstaddr, dstport, *, net, timeout_factor): + self.peer_connect_helper(dstaddr, dstport, net, timeout_factor) loop = NetworkThread.network_event_loop - conn_gen_unsafe = loop.create_connection(lambda: self, host=self.dstaddr, port=self.dstport) - conn_gen = lambda: loop.call_soon_threadsafe(loop.create_task, conn_gen_unsafe) - return conn_gen + logger.debug('Connecting to Bitcoin Node: %s:%d' % (self.dstaddr, self.dstport)) + coroutine = loop.create_connection(lambda: self, host=self.dstaddr, port=self.dstport) + return lambda: loop.call_soon_threadsafe(loop.create_task, coroutine) + + def peer_accept_connection(self, connect_id, connect_cb=lambda: None, *, net, timeout_factor): + self.peer_connect_helper('0', 0, net, timeout_factor) + + logger.debug('Listening for Bitcoin Node with id: {}'.format(connect_id)) + return lambda: NetworkThread.listen(self, connect_cb, idx=connect_id) def peer_disconnect(self): # Connection could have already been closed by other end. @@ -312,18 +324,27 @@ class P2PInterface(P2PConnection): # If the peer supports wtxid-relay self.wtxidrelay = wtxidrelay - def peer_connect(self, *args, services=NODE_NETWORK|NODE_WITNESS, send_version=True, **kwargs): + def peer_connect_send_version(self, services): + # Send a version msg + vt = msg_version() + vt.nServices = services + vt.addrTo.ip = self.dstaddr + vt.addrTo.port = self.dstport + vt.addrFrom.ip = "0.0.0.0" + vt.addrFrom.port = 0 + self.on_connection_send_msg = vt # Will be sent in connection_made callback + + def peer_connect(self, *args, services=NODE_NETWORK | NODE_WITNESS, send_version=True, **kwargs): create_conn = super().peer_connect(*args, **kwargs) if send_version: - # Send a version msg - vt = msg_version() - vt.nServices = services - vt.addrTo.ip = self.dstaddr - vt.addrTo.port = self.dstport - vt.addrFrom.ip = "0.0.0.0" - vt.addrFrom.port = 0 - self.on_connection_send_msg = vt # Will be sent soon after connection_made + self.peer_connect_send_version(services) + + return create_conn + + def peer_accept_connection(self, *args, services=NODE_NETWORK | NODE_WITNESS, **kwargs): + create_conn = super().peer_accept_connection(*args, **kwargs) + self.peer_connect_send_version(services) return create_conn @@ -414,6 +435,10 @@ class P2PInterface(P2PConnection): wait_until_helper(test_function, timeout=timeout, lock=p2p_lock, timeout_factor=self.timeout_factor) + def wait_for_connect(self, timeout=60): + test_function = lambda: self.is_connected + wait_until_helper(test_function, timeout=timeout, lock=p2p_lock) + def wait_for_disconnect(self, timeout=60): test_function = lambda: not self.is_connected self.wait_until(test_function, timeout=timeout, check_connected=False) @@ -527,6 +552,8 @@ class NetworkThread(threading.Thread): # There is only one event loop and no more than one thread must be created assert not self.network_event_loop + NetworkThread.listeners = {} + NetworkThread.protos = {} NetworkThread.network_event_loop = asyncio.new_event_loop() def run(self): @@ -542,6 +569,48 @@ class NetworkThread(threading.Thread): # Safe to remove event loop. NetworkThread.network_event_loop = None + @classmethod + def listen(cls, p2p, callback, port=None, addr=None, idx=1): + """ Ensure a listening server is running on the given port, and run the + protocol specified by `p2p` on the next connection to it. Once ready + for connections, call `callback`.""" + + if port is None: + assert 0 < idx <= MAX_NODES + port = p2p_port(MAX_NODES - idx) + if addr is None: + addr = '127.0.0.1' + + coroutine = cls.create_listen_server(addr, port, callback, p2p) + cls.network_event_loop.call_soon_threadsafe(cls.network_event_loop.create_task, coroutine) + + @classmethod + async def create_listen_server(cls, addr, port, callback, proto): + def peer_protocol(): + """Returns a function that does the protocol handling for a new + connection. To allow different connections to have different + behaviors, the protocol function is first put in the cls.protos + dict. When the connection is made, the function removes the + protocol function from that dict, and returns it so the event loop + can start executing it.""" + response = cls.protos.get((addr, port)) + cls.protos[(addr, port)] = None + return response + + if (addr, port) not in cls.listeners: + # When creating a listener on a given (addr, port) we only need to + # do it once. If we want different behaviors for different + # connections, we can accomplish this by providing different + # `proto` functions + + listener = await cls.network_event_loop.create_server(peer_protocol, addr, port) + logger.debug("Listening server on %s:%d should be started" % (addr, port)) + cls.listeners[(addr, port)] = listener + + cls.protos[(addr, port)] = proto + callback(addr, port) + + class P2PDataStore(P2PInterface): """A P2P data store class. diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index e10ec1328b..b61d433652 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -71,6 +71,7 @@ class TestNode(): """ self.index = i + self.p2p_conn_index = 1 self.datadir = datadir self.bitcoinconf = os.path.join(self.datadir, "bitcoin.conf") self.stdout_dir = os.path.join(self.datadir, "stdout") @@ -517,7 +518,7 @@ class TestNode(): self._raise_assertion_error(assert_msg) def add_p2p_connection(self, p2p_conn, *, wait_for_verack=True, **kwargs): - """Add a p2p connection to the node. + """Add an inbound p2p connection to the node. This method adds the p2p connection to the self.p2ps list and also returns the connection to the caller.""" @@ -546,6 +547,29 @@ class TestNode(): return p2p_conn + def add_outbound_p2p_connection(self, p2p_conn, *, p2p_idx, connection_type="outbound-full-relay", **kwargs): + """Add an outbound p2p connection from node. Either + full-relay("outbound-full-relay") or + block-relay-only("block-relay-only") connection. + + This method adds the p2p connection to the self.p2ps list and returns + the connection to the caller. + """ + + def addconnection_callback(address, port): + self.log.debug("Connecting to %s:%d %s" % (address, port, connection_type)) + self.addconnection('%s:%d' % (address, port), connection_type) + + p2p_conn.peer_accept_connection(connect_cb=addconnection_callback, connect_id=p2p_idx + 1, net=self.chain, timeout_factor=self.timeout_factor, **kwargs)() + + p2p_conn.wait_for_connect() + self.p2ps.append(p2p_conn) + + p2p_conn.wait_for_verack() + p2p_conn.sync_with_ping() + + return p2p_conn + def num_test_p2p_connections(self): """Return number of test framework p2p connections to the node.""" return len([peer for peer in self.getpeerinfo() if peer['subver'] == MY_SUBVERSION.decode("utf-8")]) @@ -555,6 +579,7 @@ class TestNode(): for p in self.p2ps: p.peer_disconnect() del self.p2ps[:] + wait_until_helper(lambda: self.num_test_p2p_connections() == 0, timeout_factor=self.timeout_factor)