test: Add p2p test for forcerelay permission

pull/17984/head
MarcoFalke 5 years ago
parent fa6b57bcaa
commit aaaae4d0eb
No known key found for this signature in database
GPG Key ID: CE2B75697E69A548

@ -7,21 +7,35 @@
Test that permissions are correctly calculated and applied
"""
from test_framework.address import ADDRESS_BCRT1_P2WSH_OP_TRUE
from test_framework.messages import (
CTransaction,
CTxInWitness,
FromHex,
)
from test_framework.mininode import P2PDataStore
from test_framework.script import (
CScript,
OP_TRUE,
)
from test_framework.test_node import ErrorMatch
from test_framework.test_framework import BitcoinTestFramework
from test_framework.util import (
assert_equal,
connect_nodes,
p2p_port,
wait_until,
)
class P2PPermissionsTests(BitcoinTestFramework):
def set_test_params(self):
self.num_nodes = 2
self.setup_clean_chain = True
self.extra_args = [[],[]]
def run_test(self):
self.check_tx_relay()
self.checkpermission(
# default permissions (no specific permissions)
["-whitelist=127.0.0.1"],
@ -83,6 +97,41 @@ class P2PPermissionsTests(BitcoinTestFramework):
self.nodes[1].assert_start_raises_init_error(["-whitelist=noban@127.0.0.1:230"], "Invalid netmask specified in", match=ErrorMatch.PARTIAL_REGEX)
self.nodes[1].assert_start_raises_init_error(["-whitebind=noban@127.0.0.1/10"], "Cannot resolve -whitebind address", match=ErrorMatch.PARTIAL_REGEX)
def check_tx_relay(self):
block_op_true = self.nodes[0].getblock(self.nodes[0].generatetoaddress(100, ADDRESS_BCRT1_P2WSH_OP_TRUE)[0])
self.sync_all()
self.log.debug("Create a connection from a whitelisted wallet that rebroadcasts raw txs")
# A python mininode is needed to send the raw transaction directly. If a full node was used, it could only
# rebroadcast via the inv-getdata mechanism. However, even for whitelisted connections, a full node would
# currently not request a txid that is already in the mempool.
self.restart_node(1, extra_args=["-whitelist=forcerelay@127.0.0.1"])
p2p_rebroadcast_wallet = self.nodes[1].add_p2p_connection(P2PDataStore())
self.log.debug("Send a tx from the wallet initially")
tx = FromHex(
CTransaction(),
self.nodes[0].createrawtransaction(
inputs=[{
'txid': block_op_true['tx'][0],
'vout': 0,
}], outputs=[{
ADDRESS_BCRT1_P2WSH_OP_TRUE: 5,
}]),
)
tx.wit.vtxinwit = [CTxInWitness()]
tx.wit.vtxinwit[0].scriptWitness.stack = [CScript([OP_TRUE])]
txid = tx.rehash()
self.log.debug("Wait until tx is in node[1]'s mempool")
p2p_rebroadcast_wallet.send_txs_and_test([tx], self.nodes[1])
self.log.debug("Check that node[1] will send the tx to node[0] even though it is already in the mempool")
connect_nodes(self.nodes[1], 0)
with self.nodes[1].assert_debug_log(["Force relaying tx {} from whitelisted peer=0".format(txid)]):
p2p_rebroadcast_wallet.send_txs_and_test([tx], self.nodes[1])
wait_until(lambda: txid in self.nodes[0].getrawmempool())
def checkpermission(self, args, expectedPermissions, whitelisted):
self.restart_node(1, args)
connect_nodes(self.nodes[0], 1)

@ -13,6 +13,8 @@ from . import segwit_addr
ADDRESS_BCRT1_UNSPENDABLE = 'bcrt1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq3xueyj'
ADDRESS_BCRT1_UNSPENDABLE_DESCRIPTOR = 'addr(bcrt1qqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqqq3xueyj)#juyq9d97'
# Coins sent to this address can be spent with a witness stack of just OP_TRUE
ADDRESS_BCRT1_P2WSH_OP_TRUE = 'bcrt1qft5p2uhsdcdc3l2ua4ap5qqfg4pjaqlp250x7us7a8qqhrxrxfsqseac85'
class AddressType(enum.Enum):

Loading…
Cancel
Save