Merge bitcoin/bitcoin#29420: test: extend the SOCKS5 Python proxy to actually connect to a destination

57529ac4db test: set P2PConnection.p2p_connected_to_node in peer_connect_helper() (Vasil Dimov)
22cd0e888c test: support WTX INVs from P2PDataStore and fix a comment (Vasil Dimov)
ebe42c00aa test: extend the SOCKS5 Python proxy to actually connect to a destination (Vasil Dimov)
ba621ffb9c test: improve debug log message from P2PConnection::connection_made() (Vasil Dimov)

Pull request description:

  If requested, make the SOCKS5 Python proxy redirect connections to a set of given destinations. Actually act as a real proxy, connecting the client to a destination, except that the destination is not what the client asked for.

  This would enable us to "connect" to Tor addresses from the functional tests.

  Plus a few other minor improvements in the test framework as individual commits.

  ---

  These changes are part of https://github.com/bitcoin/bitcoin/pull/29415 but they make sense on their own and would be good to have them, regardless of the fate of #29415. Also, if this is merged, that would reduce the size of #29415, thus the current standalone PR.

ACKs for top commit:
  jonatack:
    Approach ACK 57529ac4db
  achow101:
    ACK 57529ac4db
  tdb3:
    CR and test ACK 57529ac4db
  mzumsande:
    Code review / tested ACK 57529ac4db

Tree-SHA512: a2892c97bff2d337b37455c409c6136cb62423ce6cc32b197b36f220c1eec9ca046b599135b9a2603c0eb6c1ac4d9795e73831ef0f04378aeea8b245ea733399
pull/31160/merge
Ava Chow 1 week ago
commit 97b790e844
No known key found for this signature in database
GPG Key ID: 17565732E08E5E41

@ -167,3 +167,10 @@ def test_unix_socket():
return False
else:
return True
def format_addr_port(addr, port):
'''Return either "addr:port" or "[addr]:port" based on whether addr looks like an IPv6 address.'''
if ":" in addr:
return f"[{addr}]:{port}"
else:
return f"{addr}:{port}"

@ -188,6 +188,7 @@ class P2PConnection(asyncio.Protocol):
self.on_connection_send_msg = None
self.recvbuf = b""
self.magic_bytes = MAGIC_BYTES[net]
self.p2p_connected_to_node = dstport != 0
def peer_connect(self, dstaddr, dstport, *, net, timeout_factor, supports_v2_p2p):
self.peer_connect_helper(dstaddr, dstport, net, timeout_factor)
@ -217,7 +218,12 @@ class P2PConnection(asyncio.Protocol):
def connection_made(self, transport):
"""asyncio callback when a connection is opened."""
assert not self._transport
logger.debug("Connected & Listening: %s:%d" % (self.dstaddr, self.dstport))
info = transport.get_extra_info("socket")
us = info.getsockname()
them = info.getpeername()
logger.debug(f"Connected: us={us[0]}:{us[1]}, them={them[0]}:{them[1]}")
self.dstaddr = them[0]
self.dstport = them[1]
self._transport = transport
# in an inbound connection to the TestNode with P2PConnection as the initiator, [TestNode <---- P2PConnection]
# send the initial handshake immediately
@ -803,12 +809,13 @@ class P2PDataStore(P2PInterface):
self.getdata_requests = []
def on_getdata(self, message):
"""Check for the tx/block in our stores and if found, reply with an inv message."""
"""Check for the tx/block in our stores and if found, reply with MSG_TX or MSG_BLOCK."""
for inv in message.inv:
self.getdata_requests.append(inv.hash)
if (inv.type & MSG_TYPE_MASK) == MSG_TX and inv.hash in self.tx_store.keys():
invtype = inv.type & MSG_TYPE_MASK
if (invtype == MSG_TX or invtype == MSG_WTX) and inv.hash in self.tx_store.keys():
self.send_message(msg_tx(self.tx_store[inv.hash]))
elif (inv.type & MSG_TYPE_MASK) == MSG_BLOCK and inv.hash in self.block_store.keys():
elif invtype == MSG_BLOCK and inv.hash in self.block_store.keys():
self.send_message(msg_block(self.block_store[inv.hash]))
else:
logger.debug('getdata message type {} received.'.format(hex(inv.type)))

@ -4,11 +4,16 @@
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
"""Dummy Socks5 server for testing."""
import select
import socket
import threading
import queue
import logging
from .netutil import (
format_addr_port
)
logger = logging.getLogger("TestFramework.socks5")
# Protocol constants
@ -32,6 +37,42 @@ def recvall(s, n):
n -= len(d)
return rv
def sendall(s, data):
"""Send all data to a socket, or fail."""
sent = 0
while sent < len(data):
_, wlist, _ = select.select([], [s], [])
if len(wlist) > 0:
n = s.send(data[sent:])
if n == 0:
raise IOError('send() on socket returned 0')
sent += n
def forward_sockets(a, b):
"""Forward data received on socket a to socket b and vice versa, until EOF is received on one of the sockets."""
# Mark as non-blocking so that we do not end up in a deadlock-like situation
# where we block and wait on data from `a` while there is data ready to be
# received on `b` and forwarded to `a`. And at the same time the application
# at `a` is not sending anything because it waits for the data from `b` to
# respond.
a.setblocking(False)
b.setblocking(False)
sockets = [a, b]
done = False
while not done:
rlist, _, xlist = select.select(sockets, [], sockets)
if len(xlist) > 0:
raise IOError('Exceptional condition on socket')
for s in rlist:
data = s.recv(4096)
if data is None or len(data) == 0:
done = True
break
if s == a:
sendall(b, data)
else:
sendall(a, data)
# Implementation classes
class Socks5Configuration():
"""Proxy configuration."""
@ -41,6 +82,19 @@ class Socks5Configuration():
self.unauth = False # Support unauthenticated
self.auth = False # Support authentication
self.keep_alive = False # Do not automatically close connections
# This function is called whenever a new connection arrives to the proxy
# and it decides where the connection is redirected to. It is passed:
# - the address the client requested to connect to
# - the port the client requested to connect to
# It is supposed to return an object like:
# {
# "actual_to_addr": "127.0.0.1"
# "actual_to_port": 28276
# }
# or None.
# If it returns an object then the connection is redirected to actual_to_addr:actual_to_port.
# If it returns None, or destinations_factory itself is None then the connection is closed.
self.destinations_factory = None
class Socks5Command():
"""Information about an incoming socks5 command."""
@ -117,6 +171,22 @@ class Socks5Connection():
cmdin = Socks5Command(cmd, atyp, addr, port, username, password)
self.serv.queue.put(cmdin)
logger.debug('Proxy: %s', cmdin)
requested_to_addr = addr.decode("utf-8")
requested_to = format_addr_port(requested_to_addr, port)
if self.serv.conf.destinations_factory is not None:
dest = self.serv.conf.destinations_factory(requested_to_addr, port)
if dest is not None:
logger.debug(f"Serving connection to {requested_to}, will redirect it to "
f"{dest['actual_to_addr']}:{dest['actual_to_port']} instead")
with socket.create_connection((dest["actual_to_addr"], dest["actual_to_port"])) as conn_to:
forward_sockets(self.conn, conn_to)
else:
logger.debug(f"Closing connection to {requested_to}: the destinations factory returned None")
else:
logger.debug(f"Closing connection to {requested_to}: no destinations factory")
# Fall through to disconnect
except Exception as e:
logger.exception("socks5 request handling failed.")

@ -715,7 +715,6 @@ class TestNode():
if supports_v2_p2p is None:
supports_v2_p2p = self.use_v2transport
p2p_conn.p2p_connected_to_node = True
if self.use_v2transport:
kwargs['services'] = kwargs.get('services', P2P_SERVICES) | NODE_P2P_V2
supports_v2_p2p = self.use_v2transport and supports_v2_p2p
@ -782,7 +781,6 @@ class TestNode():
self.log.debug("Connecting to %s:%d %s" % (address, port, connection_type))
self.addconnection('%s:%d' % (address, port), connection_type, advertise_v2_p2p)
p2p_conn.p2p_connected_to_node = False
if supports_v2_p2p is None:
supports_v2_p2p = self.use_v2transport
if advertise_v2_p2p is None:

Loading…
Cancel
Save