diff --git a/test/functional/test_framework/netutil.py b/test/functional/test_framework/netutil.py index 08d41fe97fd..f6acf926fa8 100644 --- a/test/functional/test_framework/netutil.py +++ b/test/functional/test_framework/netutil.py @@ -167,3 +167,10 @@ def test_unix_socket(): return False else: return True + +def format_addr_port(addr, port): + '''Return either "addr:port" or "[addr]:port" based on whether addr looks like an IPv6 address.''' + if ":" in addr: + return f"[{addr}]:{port}" + else: + return f"{addr}:{port}" diff --git a/test/functional/test_framework/p2p.py b/test/functional/test_framework/p2p.py index 4f1265eb548..4a03fdd16d8 100755 --- a/test/functional/test_framework/p2p.py +++ b/test/functional/test_framework/p2p.py @@ -188,6 +188,7 @@ class P2PConnection(asyncio.Protocol): self.on_connection_send_msg = None self.recvbuf = b"" self.magic_bytes = MAGIC_BYTES[net] + self.p2p_connected_to_node = dstport != 0 def peer_connect(self, dstaddr, dstport, *, net, timeout_factor, supports_v2_p2p): self.peer_connect_helper(dstaddr, dstport, net, timeout_factor) @@ -217,7 +218,12 @@ class P2PConnection(asyncio.Protocol): def connection_made(self, transport): """asyncio callback when a connection is opened.""" assert not self._transport - logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport)) + info = transport.get_extra_info("socket") + us = info.getsockname() + them = info.getpeername() + logger.debug(f"Connected: us={us[0]}:{us[1]}, them={them[0]}:{them[1]}") + self.dstaddr = them[0] + self.dstport = them[1] self._transport = transport # in an inbound connection to the TestNode with P2PConnection as the initiator, [TestNode <---- P2PConnection] # send the initial handshake immediately @@ -803,12 +809,13 @@ class P2PDataStore(P2PInterface): self.getdata_requests = [] def on_getdata(self, message): - """Check for the tx/block in our stores and if found, reply with an inv message.""" + """Check for the tx/block in our stores and if found, reply with MSG_TX or MSG_BLOCK.""" for inv in message.inv: self.getdata_requests.append(inv.hash) - if (inv.type & MSG_TYPE_MASK) == MSG_TX and inv.hash in self.tx_store.keys(): + invtype = inv.type & MSG_TYPE_MASK + if (invtype == MSG_TX or invtype == MSG_WTX) and inv.hash in self.tx_store.keys(): self.send_message(msg_tx(self.tx_store[inv.hash])) - elif (inv.type & MSG_TYPE_MASK) == MSG_BLOCK and inv.hash in self.block_store.keys(): + elif invtype == MSG_BLOCK and inv.hash in self.block_store.keys(): self.send_message(msg_block(self.block_store[inv.hash])) else: logger.debug('getdata message type {} received.'.format(hex(inv.type))) diff --git a/test/functional/test_framework/socks5.py b/test/functional/test_framework/socks5.py index 0ca06a73961..3387c8a1fef 100644 --- a/test/functional/test_framework/socks5.py +++ b/test/functional/test_framework/socks5.py @@ -4,11 +4,16 @@ # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Dummy Socks5 server for testing.""" +import select import socket import threading import queue import logging +from .netutil import ( + format_addr_port +) + logger = logging.getLogger("TestFramework.socks5") # Protocol constants @@ -32,6 +37,42 @@ def recvall(s, n): n -= len(d) return rv +def sendall(s, data): + """Send all data to a socket, or fail.""" + sent = 0 + while sent < len(data): + _, wlist, _ = select.select([], [s], []) + if len(wlist) > 0: + n = s.send(data[sent:]) + if n == 0: + raise IOError('send() on socket returned 0') + sent += n + +def forward_sockets(a, b): + """Forward data received on socket a to socket b and vice versa, until EOF is received on one of the sockets.""" + # Mark as non-blocking so that we do not end up in a deadlock-like situation + # where we block and wait on data from `a` while there is data ready to be + # received on `b` and forwarded to `a`. And at the same time the application + # at `a` is not sending anything because it waits for the data from `b` to + # respond. + a.setblocking(False) + b.setblocking(False) + sockets = [a, b] + done = False + while not done: + rlist, _, xlist = select.select(sockets, [], sockets) + if len(xlist) > 0: + raise IOError('Exceptional condition on socket') + for s in rlist: + data = s.recv(4096) + if data is None or len(data) == 0: + done = True + break + if s == a: + sendall(b, data) + else: + sendall(a, data) + # Implementation classes class Socks5Configuration(): """Proxy configuration.""" @@ -41,6 +82,19 @@ class Socks5Configuration(): self.unauth = False # Support unauthenticated self.auth = False # Support authentication self.keep_alive = False # Do not automatically close connections + # This function is called whenever a new connection arrives to the proxy + # and it decides where the connection is redirected to. It is passed: + # - the address the client requested to connect to + # - the port the client requested to connect to + # It is supposed to return an object like: + # { + # "actual_to_addr": "127.0.0.1" + # "actual_to_port": 28276 + # } + # or None. + # If it returns an object then the connection is redirected to actual_to_addr:actual_to_port. + # If it returns None, or destinations_factory itself is None then the connection is closed. + self.destinations_factory = None class Socks5Command(): """Information about an incoming socks5 command.""" @@ -117,6 +171,22 @@ class Socks5Connection(): cmdin = Socks5Command(cmd, atyp, addr, port, username, password) self.serv.queue.put(cmdin) logger.debug('Proxy: %s', cmdin) + + requested_to_addr = addr.decode("utf-8") + requested_to = format_addr_port(requested_to_addr, port) + + if self.serv.conf.destinations_factory is not None: + dest = self.serv.conf.destinations_factory(requested_to_addr, port) + if dest is not None: + logger.debug(f"Serving connection to {requested_to}, will redirect it to " + f"{dest['actual_to_addr']}:{dest['actual_to_port']} instead") + with socket.create_connection((dest["actual_to_addr"], dest["actual_to_port"])) as conn_to: + forward_sockets(self.conn, conn_to) + else: + logger.debug(f"Closing connection to {requested_to}: the destinations factory returned None") + else: + logger.debug(f"Closing connection to {requested_to}: no destinations factory") + # Fall through to disconnect except Exception as e: logger.exception("socks5 request handling failed.") diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index e59ff4e3925..f1083dc1ce6 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -715,7 +715,6 @@ class TestNode(): if supports_v2_p2p is None: supports_v2_p2p = self.use_v2transport - p2p_conn.p2p_connected_to_node = True if self.use_v2transport: kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2 supports_v2_p2p = self.use_v2transport and supports_v2_p2p @@ -782,7 +781,6 @@ class TestNode(): self.log.debug("Connecting to %s:%d %s" % (address, port, connection_type)) self.addconnection('%s:%d' % (address, port), connection_type, advertise_v2_p2p) - p2p_conn.p2p_connected_to_node = False if supports_v2_p2p is None: supports_v2_p2p = self.use_v2transport if advertise_v2_p2p is None: