diff --git a/qa/pull-tester/rpc-tests.sh b/qa/pull-tester/rpc-tests.sh index dd2f8d4e5e..6d57d58755 100755 --- a/qa/pull-tester/rpc-tests.sh +++ b/qa/pull-tester/rpc-tests.sh @@ -30,6 +30,7 @@ testScripts=( 'proxy_test.py' 'merkle_blocks.py' # 'forknotify.py' + 'maxblocksinflight.py' ); if [ "x${ENABLE_BITCOIND}${ENABLE_UTILS}${ENABLE_WALLET}" = "x111" ]; then for (( i = 0; i < ${#testScripts[@]}; i++ )) diff --git a/qa/rpc-tests/maxblocksinflight.py b/qa/rpc-tests/maxblocksinflight.py new file mode 100755 index 0000000000..4bfe203990 --- /dev/null +++ b/qa/rpc-tests/maxblocksinflight.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python2 +# +# Distributed under the MIT/X11 software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +# + +from mininode import * +from test_framework import BitcoinTestFramework +from util import * +import logging + +''' +In this test we connect to one node over p2p, send it numerous inv's, and +compare the resulting number of getdata requests to a max allowed value. We +test for exceeding 128 blocks in flight, which was the limit an 0.9 client will +reach. [0.10 clients shouldn't request more than 16 from a single peer.] +''' +MAX_REQUESTS = 128 + +class TestManager(NodeConnCB): + # set up NodeConnCB callbacks, overriding base class + def on_getdata(self, conn, message): + self.log.debug("got getdata %s" % repr(message)) + # Log the requests + for inv in message.inv: + if inv.hash not in self.blockReqCounts: + self.blockReqCounts[inv.hash] = 0 + self.blockReqCounts[inv.hash] += 1 + + def on_close(self, conn): + if not self.disconnectOkay: + raise EarlyDisconnectError(0) + + def __init__(self): + NodeConnCB.__init__(self) + self.log = logging.getLogger("BlockRelayTest") + self.create_callback_map() + + def add_new_connection(self, connection): + self.connection = connection + self.blockReqCounts = {} + self.disconnectOkay = False + + def run(self): + try: + fail = False + self.connection.rpc.generate(1) # Leave IBD + + numBlocksToGenerate = [ 8, 16, 128, 1024 ] + for count in range(len(numBlocksToGenerate)): + current_invs = [] + for i in range(numBlocksToGenerate[count]): + current_invs.append(CInv(2, random.randrange(0, 1<<256))) + if len(current_invs) >= 50000: + self.connection.send_message(msg_inv(current_invs)) + current_invs = [] + if len(current_invs) > 0: + self.connection.send_message(msg_inv(current_invs)) + + # Wait and see how many blocks were requested + time.sleep(2) + + total_requests = 0 + for key in self.blockReqCounts: + total_requests += self.blockReqCounts[key] + if self.blockReqCounts[key] > 1: + raise AssertionError("Error, test failed: block %064x requested more than once" % key) + if total_requests > MAX_REQUESTS: + raise AssertionError("Error, too many blocks (%d) requested" % total_requests) + print "Round %d: success (total requests: %d)" % (count, total_requests) + except AssertionError as e: + print "TEST FAILED: ", e.args + + self.disconnectOkay = True + self.connection.disconnect_node() + + +class MaxBlocksInFlightTest(BitcoinTestFramework): + def add_options(self, parser): + parser.add_option("--testbinary", dest="testbinary", default="bitcoind", + help="Binary to test max block requests behavior") + + def setup_chain(self): + print "Initializing test directory "+self.options.tmpdir + initialize_chain_clean(self.options.tmpdir, 1) + + def setup_network(self): + self.nodes = start_nodes(1, self.options.tmpdir, + extra_args=[['-debug', '-whitelist=127.0.0.1']], + binary=[self.options.testbinary]) + + def run_test(self): + test = TestManager() + test.add_new_connection(NodeConn('127.0.0.1', p2p_port(0), self.nodes[0], test)) + NetworkThread().start() # Start up network handling in another thread + test.run() + +if __name__ == '__main__': + MaxBlocksInFlightTest().main() diff --git a/qa/rpc-tests/mininode.py b/qa/rpc-tests/mininode.py new file mode 100755 index 0000000000..c5c1bcfbbe --- /dev/null +++ b/qa/rpc-tests/mininode.py @@ -0,0 +1,1247 @@ +# mininode.py - Bitcoin P2P network half-a-node +# +# Distributed under the MIT/X11 software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +# +# This python code was modified from ArtForz' public domain half-a-node, as +# found in the mini-node branch of http://github.com/jgarzik/pynode. +# +# NodeConn: an object which manages p2p connectivity to a bitcoin node +# NodeConnCB: a base class that describes the interface for receiving +# callbacks with network messages from a NodeConn +# CBlock, CTransaction, CBlockHeader, CTxIn, CTxOut, etc....: +# data structures that should map to corresponding structures in +# bitcoin/primitives +# msg_block, msg_tx, msg_headers, etc.: +# data structures that represent network messages +# ser_*, deser_*: functions that handle serialization/deserialization + + +import struct +import socket +import asyncore +import binascii +import time +import sys +import random +import cStringIO +import hashlib +from threading import Lock +from threading import Thread +import logging +import copy + +BIP0031_VERSION = 60000 +MY_VERSION = 60001 # past bip-31 for ping/pong +MY_SUBVERSION = "/python-mininode-tester:0.0.1/" + +MAX_INV_SZ = 50000 + +# Serialization/deserialization tools +def sha256(s): + return hashlib.new('sha256', s).digest() + + +def hash256(s): + return sha256(sha256(s)) + + +def deser_string(f): + nit = struct.unpack(">= 32 + return rs + + +def uint256_from_str(s): + r = 0L + t = struct.unpack("> 24) & 0xFF + v = (c & 0xFFFFFFL) << (8 * (nbytes - 3)) + return v + + +def deser_vector(f, c): + nit = struct.unpack("H", f.read(2))[0] + + def serialize(self): + r = "" + r += struct.pack("H", self.port) + return r + + def __repr__(self): + return "CAddress(nServices=%i ip=%s port=%i)" % (self.nServices, + self.ip, self.port) + + +class CInv(object): + typemap = { + 0: "Error", + 1: "TX", + 2: "Block"} + + def __init__(self, t=0, h=0L): + self.type = t + self.hash = h + + def deserialize(self, f): + self.type = struct.unpack(" 21000000L * 100000000L: + return False + return True + + def __repr__(self): + return "CTransaction(nVersion=%i vin=%s vout=%s nLockTime=%i)" \ + % (self.nVersion, repr(self.vin), repr(self.vout), self.nLockTime) + + +class CBlockHeader(object): + def __init__(self, header=None): + if header is None: + self.set_null() + else: + self.nVersion = header.nVersion + self.hashPrevBlock = header.hashPrevBlock + self.hashMerkleRoot = header.hashMerkleRoot + self.nTime = header.nTime + self.nBits = header.nBits + self.nNonce = header.nNonce + self.sha256 = header.sha256 + self.hash = header.hash + self.calc_sha256() + + def set_null(self): + self.nVersion = 1 + self.hashPrevBlock = 0 + self.hashMerkleRoot = 0 + self.nTime = 0 + self.nBits = 0 + self.nNonce = 0 + self.sha256 = None + self.hash = None + + def deserialize(self, f): + self.nVersion = struct.unpack(" 1: + newhashes = [] + for i in xrange(0, len(hashes), 2): + i2 = min(i+1, len(hashes)-1) + newhashes.append(hash256(hashes[i] + hashes[i2])) + hashes = newhashes + return uint256_from_str(hashes[0]) + + def is_valid(self): + self.calc_sha256() + target = uint256_from_compact(self.nBits) + if self.sha256 > target: + return False + for tx in self.vtx: + if not tx.is_valid(): + return False + if self.calc_merkle_root() != self.hashMerkleRoot: + return False + return True + + def solve(self): + self.calc_sha256() + target = uint256_from_compact(self.nBits) + while self.sha256 > target: + self.nNonce += 1 + self.rehash() + + def __repr__(self): + return "CBlock(nVersion=%i hashPrevBlock=%064x hashMerkleRoot=%064x nTime=%s nBits=%08x nNonce=%08x vtx=%s)" \ + % (self.nVersion, self.hashPrevBlock, self.hashMerkleRoot, + time.ctime(self.nTime), self.nBits, self.nNonce, repr(self.vtx)) + + +class CUnsignedAlert(object): + def __init__(self): + self.nVersion = 1 + self.nRelayUntil = 0 + self.nExpiration = 0 + self.nID = 0 + self.nCancel = 0 + self.setCancel = [] + self.nMinVer = 0 + self.nMaxVer = 0 + self.setSubVer = [] + self.nPriority = 0 + self.strComment = "" + self.strStatusBar = "" + self.strReserved = "" + + def deserialize(self, f): + self.nVersion = struct.unpack("= 106: + self.addrFrom = CAddress() + self.addrFrom.deserialize(f) + self.nNonce = struct.unpack("= 209: + self.nStartingHeight = struct.unpack(" +class msg_headers(object): + command = "headers" + + def __init__(self): + self.headers = [] + + def deserialize(self, f): + # comment in bitcoind indicates these should be deserialized as blocks + blocks = deser_vector(f, CBlock) + for x in blocks: + self.headers.append(CBlockHeader(x)) + + def serialize(self): + blocks = [CBlock(x) for x in self.headers] + return ser_vector(blocks) + + def __repr__(self): + return "msg_headers(headers=%s)" % repr(self.headers) + + +class msg_reject(object): + command = "reject" + + def __init__(self): + self.message = "" + self.code = "" + self.reason = "" + self.data = 0L + + def deserialize(self, f): + self.message = deser_string(f) + self.code = struct.unpack("= 209: + conn.send_message(msg_verack()) + conn.ver_send = min(MY_VERSION, message.nVersion) + if message.nVersion < 209: + conn.ver_recv = conn.ver_send + + def on_verack(self, conn, message): + conn.ver_recv = conn.ver_send + self.verack_received = True + + def on_inv(self, conn, message): + want = msg_getdata() + for i in message.inv: + if i.type != 0: + want.inv.append(i) + if len(want.inv): + conn.send_message(want) + + def on_addr(self, conn, message): pass + def on_alert(self, conn, message): pass + def on_getdata(self, conn, message): pass + def on_getblocks(self, conn, message): pass + def on_tx(self, conn, message): pass + def on_block(self, conn, message): pass + def on_getaddr(self, conn, message): pass + def on_headers(self, conn, message): pass + def on_getheaders(self, conn, message): pass + def on_ping(self, conn, message): + if conn.ver_send > BIP0031_VERSION: + conn.send_message(msg_pong(message.nonce)) + def on_reject(self, conn, message): pass + def on_close(self, conn): pass + def on_mempool(self, conn): pass + def on_pong(self, conn, message): pass + + +# The actual NodeConn class +# This class provides an interface for a p2p connection to a specified node +class NodeConn(asyncore.dispatcher): + messagemap = { + "version": msg_version, + "verack": msg_verack, + "addr": msg_addr, + "alert": msg_alert, + "inv": msg_inv, + "getdata": msg_getdata, + "getblocks": msg_getblocks, + "tx": msg_tx, + "block": msg_block, + "getaddr": msg_getaddr, + "ping": msg_ping, + "pong": msg_pong, + "headers": msg_headers, + "getheaders": msg_getheaders, + "reject": msg_reject, + "mempool": msg_mempool + } + MAGIC_BYTES = { + "mainnet": "\xf9\xbe\xb4\xd9", # mainnet + "testnet3": "\x0b\x11\x09\x07", # testnet3 + "regtest": "\xfa\xbf\xb5\xda" # regtest + } + + def __init__(self, dstaddr, dstport, rpc, callback, net="regtest"): + asyncore.dispatcher.__init__(self) + self.log = logging.getLogger("NodeConn(%s:%d)" % (dstaddr, dstport)) + self.dstaddr = dstaddr + self.dstport = dstport + self.create_socket(socket.AF_INET, socket.SOCK_STREAM) + self.sendbuf = "" + self.recvbuf = "" + self.ver_send = 209 + self.ver_recv = 209 + self.last_sent = 0 + self.state = "connecting" + self.network = net + self.cb = callback + self.sendbufLock = Lock() # for protecting the sendbuffer + self.disconnect = False + + # stuff version msg into sendbuf + vt = msg_version() + vt.addrTo.ip = self.dstaddr + vt.addrTo.port = self.dstport + vt.addrFrom.ip = "0.0.0.0" + vt.addrFrom.port = 0 + self.send_message(vt, True) + print 'MiniNode: Connecting to Bitcoin Node IP # ' + dstaddr + ':' \ + + str(dstport) + + try: + self.connect((dstaddr, dstport)) + except: + self.handle_close() + self.rpc = rpc + + def show_debug_msg(self, msg): + self.log.debug(msg) + + def handle_connect(self): + self.show_debug_msg("MiniNode: Connected & Listening: \n") + self.state = "connected" + + def handle_close(self): + self.show_debug_msg("MiniNode: Closing Connection to %s:%d... " + % (self.dstaddr, self.dstport)) + self.state = "closed" + self.recvbuf = "" + self.sendbuf = "" + try: + self.close() + except: + pass + self.cb.on_close(self) + + def handle_read(self): + try: + t = self.recv(8192) + if len(t) > 0: + self.recvbuf += t + self.got_data() + except: + pass + + def readable(self): + return True + + def writable(self): + if self.disconnect: + self.handle_close() + return False + else: + self.sendbufLock.acquire() + length = len(self.sendbuf) + self.sendbufLock.release() + return (length > 0) + + def handle_write(self): + self.sendbufLock.acquire() + try: + sent = self.send(self.sendbuf) + except: + self.handle_close() + return + self.sendbuf = self.sendbuf[sent:] + self.sendbufLock.release() + + def got_data(self): + while True: + if len(self.recvbuf) < 4: + return + if self.recvbuf[:4] != self.MAGIC_BYTES[self.network]: + raise ValueError("got garbage %s" % repr(self.recvbuf)) + if self.ver_recv < 209: + if len(self.recvbuf) < 4 + 12 + 4: + return + command = self.recvbuf[4:4+12].split("\x00", 1)[0] + msglen = struct.unpack("= 209: + th = sha256(data) + h = sha256(th) + tmsg += h[:4] + tmsg += data + self.sendbuf += tmsg + self.last_sent = time.time() + self.sendbufLock.release() + + def got_message(self, message): + if message.command == "version": + if message.nVersion <= BIP0031_VERSION: + self.messagemap['ping'] = msg_ping_prebip31 + if self.last_sent + 30 * 60 < time.time(): + self.send_message(self.messagemap['ping']()) + self.show_debug_msg("Recv %s" % repr(message)) + self.cb.deliver(self, message) + + def disconnect_node(self): + self.disconnect = True + self.send_message(self.messagemap['ping']()) + + +class NetworkThread(Thread): + def run(self): + asyncore.loop(0.1, True) + + +# An exception we can raise if we detect a potential disconnect +# (p2p or rpc) before the test is complete +class EarlyDisconnectError(Exception): + def __init__(self, value): + self.value = value + + def __str__(self): + return repr(self.value) diff --git a/qa/rpc-tests/util.py b/qa/rpc-tests/util.py index cf789f48e2..385215a088 100644 --- a/qa/rpc-tests/util.py +++ b/qa/rpc-tests/util.py @@ -158,12 +158,14 @@ def _rpchost_to_args(rpchost): rv += ['-rpcport=' + rpcport] return rv -def start_node(i, dirname, extra_args=None, rpchost=None, timewait=None): +def start_node(i, dirname, extra_args=None, rpchost=None, timewait=None, binary=None): """ Start a bitcoind and return RPC connection to it """ datadir = os.path.join(dirname, "node"+str(i)) - args = [ os.getenv("BITCOIND", "bitcoind"), "-datadir="+datadir, "-keypool=1", "-discover=0", "-rest" ] + if binary is None: + binary = os.getenv("BITCOIND", "bitcoind") + args = [ binary, "-datadir="+datadir, "-keypool=1", "-discover=0", "-rest" ] if extra_args is not None: args.extend(extra_args) bitcoind_processes[i] = subprocess.Popen(args) devnull = open("/dev/null", "w+") @@ -179,12 +181,13 @@ def start_node(i, dirname, extra_args=None, rpchost=None, timewait=None): proxy.url = url # store URL on proxy for info return proxy -def start_nodes(num_nodes, dirname, extra_args=None, rpchost=None): +def start_nodes(num_nodes, dirname, extra_args=None, rpchost=None, binary=None): """ Start multiple bitcoinds, return RPC connections to them """ if extra_args is None: extra_args = [ None for i in range(num_nodes) ] - return [ start_node(i, dirname, extra_args[i], rpchost) for i in range(num_nodes) ] + if binary is None: binary = [ None for i in range(num_nodes) ] + return [ start_node(i, dirname, extra_args[i], rpchost, binary=binary[i]) for i in range(num_nodes) ] def log_filename(dirname, n_node, logname): return os.path.join(dirname, "node"+str(n_node), "regtest", logname)