@ -71,7 +71,11 @@ from test_framework.messages import (
NODE_WITNESS ,
sha256 ,
)
from test_framework . util import wait_until_helper
from test_framework . util import (
MAX_NODES ,
p2p_port ,
wait_until_helper ,
)
logger = logging . getLogger ( " TestFramework.p2p " )
@ -139,7 +143,7 @@ class P2PConnection(asyncio.Protocol):
def is_connected ( self ) :
return self . _transport is not None
def peer_connect ( self , dstaddr , dstport , * , net , timeout_factor ) :
def peer_connect _helper ( self , dstaddr , dstport , net , timeout_factor ) :
assert not self . is_connected
self . timeout_factor = timeout_factor
self . dstaddr = dstaddr
@ -148,12 +152,20 @@ class P2PConnection(asyncio.Protocol):
self . on_connection_send_msg = None
self . recvbuf = b " "
self . magic_bytes = MAGIC_BYTES [ net ]
logger . debug ( ' Connecting to Bitcoin Node: %s : %d ' % ( self . dstaddr , self . dstport ) )
def peer_connect ( self , dstaddr , dstport , * , net , timeout_factor ) :
self . peer_connect_helper ( dstaddr , dstport , net , timeout_factor )
loop = NetworkThread . network_event_loop
conn_gen_unsafe = loop . create_connection ( lambda : self , host = self . dstaddr , port = self . dstport )
conn_gen = lambda : loop . call_soon_threadsafe ( loop . create_task , conn_gen_unsafe )
return conn_gen
logger . debug ( ' Connecting to Bitcoin Node: %s : %d ' % ( self . dstaddr , self . dstport ) )
coroutine = loop . create_connection ( lambda : self , host = self . dstaddr , port = self . dstport )
return lambda : loop . call_soon_threadsafe ( loop . create_task , coroutine )
def peer_accept_connection ( self , connect_id , connect_cb = lambda : None , * , net , timeout_factor ) :
self . peer_connect_helper ( ' 0 ' , 0 , net , timeout_factor )
logger . debug ( ' Listening for Bitcoin Node with id: {} ' . format ( connect_id ) )
return lambda : NetworkThread . listen ( self , connect_cb , idx = connect_id )
def peer_disconnect ( self ) :
# Connection could have already been closed by other end.
@ -312,18 +324,27 @@ class P2PInterface(P2PConnection):
# If the peer supports wtxid-relay
self . wtxidrelay = wtxidrelay
def peer_connect ( self , * args , services = NODE_NETWORK | NODE_WITNESS , send_version = True , * * kwargs ) :
def peer_connect_send_version ( self , services ) :
# Send a version msg
vt = msg_version ( )
vt . nServices = services
vt . addrTo . ip = self . dstaddr
vt . addrTo . port = self . dstport
vt . addrFrom . ip = " 0.0.0.0 "
vt . addrFrom . port = 0
self . on_connection_send_msg = vt # Will be sent in connection_made callback
def peer_connect ( self , * args , services = NODE_NETWORK | NODE_WITNESS , send_version = True , * * kwargs ) :
create_conn = super ( ) . peer_connect ( * args , * * kwargs )
if send_version :
# Send a version msg
vt = msg_version ( )
vt . nServices = services
vt . addrTo . ip = self . dstaddr
vt . addrTo . port = self . dstport
vt . addrFrom . ip = " 0.0.0.0 "
vt . addrFrom . port = 0
self . on_connection_send_msg = vt # Will be sent soon after connection_made
self . peer_connect_send_version ( services )
return create_conn
def peer_accept_connection ( self , * args , services = NODE_NETWORK | NODE_WITNESS , * * kwargs ) :
create_conn = super ( ) . peer_accept_connection ( * args , * * kwargs )
self . peer_connect_send_version ( services )
return create_conn
@ -414,6 +435,10 @@ class P2PInterface(P2PConnection):
wait_until_helper ( test_function , timeout = timeout , lock = p2p_lock , timeout_factor = self . timeout_factor )
def wait_for_connect ( self , timeout = 60 ) :
test_function = lambda : self . is_connected
wait_until_helper ( test_function , timeout = timeout , lock = p2p_lock )
def wait_for_disconnect ( self , timeout = 60 ) :
test_function = lambda : not self . is_connected
self . wait_until ( test_function , timeout = timeout , check_connected = False )
@ -527,6 +552,8 @@ class NetworkThread(threading.Thread):
# There is only one event loop and no more than one thread must be created
assert not self . network_event_loop
NetworkThread . listeners = { }
NetworkThread . protos = { }
NetworkThread . network_event_loop = asyncio . new_event_loop ( )
def run ( self ) :
@ -542,6 +569,48 @@ class NetworkThread(threading.Thread):
# Safe to remove event loop.
NetworkThread . network_event_loop = None
@classmethod
def listen ( cls , p2p , callback , port = None , addr = None , idx = 1 ) :
""" Ensure a listening server is running on the given port, and run the
protocol specified by ` p2p ` on the next connection to it . Once ready
for connections , call ` callback ` . """
if port is None :
assert 0 < idx < = MAX_NODES
port = p2p_port ( MAX_NODES - idx )
if addr is None :
addr = ' 127.0.0.1 '
coroutine = cls . create_listen_server ( addr , port , callback , p2p )
cls . network_event_loop . call_soon_threadsafe ( cls . network_event_loop . create_task , coroutine )
@classmethod
async def create_listen_server ( cls , addr , port , callback , proto ) :
def peer_protocol ( ) :
""" Returns a function that does the protocol handling for a new
connection . To allow different connections to have different
behaviors , the protocol function is first put in the cls . protos
dict . When the connection is made , the function removes the
protocol function from that dict , and returns it so the event loop
can start executing it . """
response = cls . protos . get ( ( addr , port ) )
cls . protos [ ( addr , port ) ] = None
return response
if ( addr , port ) not in cls . listeners :
# When creating a listener on a given (addr, port) we only need to
# do it once. If we want different behaviors for different
# connections, we can accomplish this by providing different
# `proto` functions
listener = await cls . network_event_loop . create_server ( peer_protocol , addr , port )
logger . debug ( " Listening server on %s : %d should be started " % ( addr , port ) )
cls . listeners [ ( addr , port ) ] = listener
cls . protos [ ( addr , port ) ] = proto
callback ( addr , port )
class P2PDataStore ( P2PInterface ) :
""" A P2P data store class.