Merge #18309: zmq: Add support to listen on multiple interfaces

e66870c5a4 zmq: Append address to notify log output (nthumann)
241803da21 test: Add zmq test to support multiple interfaces (nthumann)
a0b2e5cb6a doc: Add release notes to support multiple interfaces (nthumann)
b1c3f180ec doc: Adjust ZMQ usage to support multiple interfaces (nthumann)
347c94f551 zmq: Add support to listen on multiple interfaces (Nicolas Thumann)

Pull request description:

  This PR adds support for ZeroMQ to listen on multiple interfaces, just like the RPC server.
  Currently, if you specify more than one e.g. `zmqpubhashblock` paramter, only the first one will be used. Therefore a user may be forced to listen on all interfaces (e.g. `zmqpubhashblock=0.0.0.0:28332`), which can result in an increased attack surface.
  With this PR a user can specify multiple interfaces to listen on, e.g.
  `-zmqpubhashblock=tcp://127.0.0.1:28332 -zmqpubhashblock=tcp://192.168.1.123:28332`.

ACKs for top commit:
  laanwj:
    Code review ACK e66870c5a4
  instagibbs:
    reACK e66870c5a4

Tree-SHA512: f38ab4a6ff00dc821e5f4842508cefadb701e70bb3893992c1b32049be20247c8aa9476a1f886050c5f17fe7f2ce99ee30193ce2c81a7482a5a51f8fc22300c7
pull/764/head
Wladimir J. van der Laan 4 years ago
commit a0185d90a7
No known key found for this signature in database
GPG Key ID: 1E4AED62986CD25D

@ -0,0 +1,4 @@
Command-line options
-----------------------------
The same ZeroMQ notification (e.g. `-zmqpubhashtx=address`) can now be specified multiple times to publish the same notification to different ZeroMQ sockets.

@ -67,6 +67,7 @@ Currently, the following notifications are supported:
The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
The same notification can be specified more than once.
The option to set the PUB socket's outbound message high water mark
(SNDHWM) may be set individually for each notification:
@ -82,6 +83,7 @@ The high water mark value must be an integer greater than or equal to 0.
For instance:
$ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://192.168.1.2:28332 \
-zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw \
-zmqpubhashtxhwm=10000

@ -42,10 +42,8 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
for (const auto& entry : factories)
{
std::string arg("-zmq" + entry.first);
if (gArgs.IsArgSet(arg))
{
const auto& factory = entry.second;
const std::string address = gArgs.GetArg(arg, "");
const auto& factory = entry.second;
for (const std::string& address : gArgs.GetArgs(arg)) {
std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
notifier->SetType(entry.first);
notifier->SetAddress(address);

@ -180,7 +180,7 @@ bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
@ -190,7 +190,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
@ -199,7 +199,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
const Consensus::Params& consensusParams = Params().GetConsensus();
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
@ -221,7 +221,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address);
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
ss << transaction;
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
@ -232,7 +232,7 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address);
char data[sizeof(uint256)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
@ -243,7 +243,7 @@ bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex)
bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address);
char data[sizeof(uint256)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
@ -254,7 +254,7 @@ bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pinde
bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address);
unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];
@ -266,7 +266,7 @@ bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction
bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address);
unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1];
for (unsigned int i = 0; i < sizeof(uint256); i++)
data[sizeof(uint256) - 1 - i] = hash.begin()[i];

@ -75,6 +75,7 @@ class ZMQTest (BitcoinTestFramework):
self.test_sequence()
self.test_mempool_sync()
self.test_reorg()
self.test_multiple_interfaces()
finally:
# Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context")
@ -506,5 +507,28 @@ class ZMQTest (BitcoinTestFramework):
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
def test_multiple_interfaces(self):
# Set up two subscribers with different addresses
subscribers = []
for i in range(2):
address = 'tcp://127.0.0.1:%d' % (28334 + i)
socket = self.ctx.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
hashblock = ZMQSubscriber(socket, b"hashblock")
socket.connect(address)
subscribers.append({'address': address, 'hashblock': hashblock})
self.restart_node(0, ['-zmqpub%s=%s' % (subscriber['hashblock'].topic.decode(), subscriber['address']) for subscriber in subscribers])
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
# Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
# Should receive the same block hash on both subscribers
assert_equal(self.nodes[0].getbestblockhash(), subscribers[0]['hashblock'].receive().hex())
assert_equal(self.nodes[0].getbestblockhash(), subscribers[1]['hashblock'].receive().hex())
if __name__ == '__main__':
ZMQTest().main()

Loading…
Cancel
Save