[net processing] Move m_wtxid_relay to Peer

Also, remove cs_main guard from m_wtxid_relay_peers and make it atomic.
This should be fine since we don't need m_wtxid_relay_peers to be
synchronized with m_wtxid_relay exactly at all times.

After this change, RelayTransaction no longer requires cs_main.
pull/21160/head
John Newbery 4 years ago
parent 36346703f8
commit 785f55f7ee

@ -232,6 +232,9 @@ struct Peer {
/** Whether a ping has been requested by the user */ /** Whether a ping has been requested by the user */
std::atomic<bool> m_ping_queued{false}; std::atomic<bool> m_ping_queued{false};
/** Whether this peer relays txs via wtxid */
std::atomic<bool> m_wtxid_relay{false};
/** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */ /** A vector of addresses to send to the peer, limited to MAX_ADDR_TO_SEND. */
std::vector<CAddress> m_addrs_to_send; std::vector<CAddress> m_addrs_to_send;
/** Probabilistic filter to track recent addr messages relayed with this /** Probabilistic filter to track recent addr messages relayed with this
@ -331,9 +334,6 @@ public:
const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) override; const std::chrono::microseconds time_received, const std::atomic<bool>& interruptMsgProc) override;
private: private:
void _RelayTransaction(const uint256& txid, const uint256& wtxid)
EXCLUSIVE_LOCKS_REQUIRED(cs_main);
/** Consider evicting an outbound peer based on the amount of time they've been behind our tip */ /** Consider evicting an outbound peer based on the amount of time they've been behind our tip */
void ConsiderEviction(CNode& pto, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main); void ConsiderEviction(CNode& pto, std::chrono::seconds time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
@ -464,7 +464,7 @@ private:
std::map<uint256, std::pair<NodeId, bool>> mapBlockSource GUARDED_BY(cs_main); std::map<uint256, std::pair<NodeId, bool>> mapBlockSource GUARDED_BY(cs_main);
/** Number of peers with wtxid relay. */ /** Number of peers with wtxid relay. */
int m_wtxid_relay_peers GUARDED_BY(cs_main) = 0; std::atomic<int> m_wtxid_relay_peers{0};
/** Number of outbound peers with m_chain_sync.m_protect. */ /** Number of outbound peers with m_chain_sync.m_protect. */
int m_outbound_peers_with_protect_from_disconnect GUARDED_BY(cs_main) = 0; int m_outbound_peers_with_protect_from_disconnect GUARDED_BY(cs_main) = 0;
@ -779,9 +779,6 @@ struct CNodeState {
//! A rolling bloom filter of all announced tx CInvs to this peer. //! A rolling bloom filter of all announced tx CInvs to this peer.
CRollingBloomFilter m_recently_announced_invs = CRollingBloomFilter{INVENTORY_MAX_RECENT_RELAY, 0.000001}; CRollingBloomFilter m_recently_announced_invs = CRollingBloomFilter{INVENTORY_MAX_RECENT_RELAY, 0.000001};
//! Whether this peer relays txs via wtxid
bool m_wtxid_relay{false};
CNodeState(bool is_inbound) : m_is_inbound(is_inbound) {} CNodeState(bool is_inbound) : m_is_inbound(is_inbound) {}
}; };
@ -1211,8 +1208,7 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler)
CTransactionRef tx = m_mempool.get(txid); CTransactionRef tx = m_mempool.get(txid);
if (tx != nullptr) { if (tx != nullptr) {
LOCK(cs_main); RelayTransaction(txid, tx->GetWitnessHash());
_RelayTransaction(txid, tx->GetWitnessHash());
} else { } else {
m_mempool.RemoveUnbroadcastTx(txid, true); m_mempool.RemoveUnbroadcastTx(txid, true);
} }
@ -1239,6 +1235,8 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
PeerRef peer = RemovePeer(nodeid); PeerRef peer = RemovePeer(nodeid);
assert(peer != nullptr); assert(peer != nullptr);
misbehavior = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score); misbehavior = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score);
m_wtxid_relay_peers -= peer->m_wtxid_relay;
assert(m_wtxid_relay_peers >= 0);
} }
CNodeState *state = State(nodeid); CNodeState *state = State(nodeid);
assert(state != nullptr); assert(state != nullptr);
@ -1256,8 +1254,6 @@ void PeerManagerImpl::FinalizeNode(const CNode& node)
assert(m_peers_downloading_from >= 0); assert(m_peers_downloading_from >= 0);
m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect; m_outbound_peers_with_protect_from_disconnect -= state->m_chain_sync.m_protect;
assert(m_outbound_peers_with_protect_from_disconnect >= 0); assert(m_outbound_peers_with_protect_from_disconnect >= 0);
m_wtxid_relay_peers -= state->m_wtxid_relay;
assert(m_wtxid_relay_peers >= 0);
mapNodeState.erase(nodeid); mapNodeState.erase(nodeid);
@ -1742,21 +1738,22 @@ void PeerManagerImpl::SendPings()
void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid) void PeerManagerImpl::RelayTransaction(const uint256& txid, const uint256& wtxid)
{ {
WITH_LOCK(cs_main, _RelayTransaction(txid, wtxid);); std::map<const NodeId, const uint256&> relay_peers;
} {
// Don't hold m_peer_mutex while calling ForEachNode() to avoid an
// m_peer_mutex/cs_vNodes lock inversion. During shutdown, FinalizeNode()
// is called while holding cs_vNodes.
LOCK(m_peer_mutex);
for (auto& it : m_peer_map) {
relay_peers.emplace(it.first, it.second->m_wtxid_relay ? wtxid : txid);
}
}
void PeerManagerImpl::_RelayTransaction(const uint256& txid, const uint256& wtxid) m_connman.ForEachNode([&relay_peers](CNode* node) {
{ auto it = relay_peers.find(node->GetId());
m_connman.ForEachNode([&txid, &wtxid](CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) { if (it == relay_peers.end()) return; // Should never happen
AssertLockHeld(::cs_main);
CNodeState* state = State(pnode->GetId()); node->PushTxInventory(it->second);
if (state == nullptr) return;
if (state->m_wtxid_relay) {
pnode->PushTxInventory(wtxid);
} else {
pnode->PushTxInventory(txid);
}
}); });
} }
@ -2317,7 +2314,7 @@ void PeerManagerImpl::ProcessOrphanTx(std::set<uint256>& orphan_work_set)
if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) { if (result.m_result_type == MempoolAcceptResult::ResultType::VALID) {
LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
_RelayTransaction(orphanHash, porphanTx->GetWitnessHash()); RelayTransaction(orphanHash, porphanTx->GetWitnessHash());
m_orphanage.AddChildrenToWorkSet(*porphanTx, orphan_work_set); m_orphanage.AddChildrenToWorkSet(*porphanTx, orphan_work_set);
m_orphanage.EraseTx(orphanHash); m_orphanage.EraseTx(orphanHash);
for (const CTransactionRef& removedTx : result.m_replaced_transactions.value()) { for (const CTransactionRef& removedTx : result.m_replaced_transactions.value()) {
@ -2864,9 +2861,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
return; return;
} }
if (pfrom.GetCommonVersion() >= WTXID_RELAY_VERSION) { if (pfrom.GetCommonVersion() >= WTXID_RELAY_VERSION) {
LOCK(cs_main); if (!peer->m_wtxid_relay) {
if (!State(pfrom.GetId())->m_wtxid_relay) { peer->m_wtxid_relay = true;
State(pfrom.GetId())->m_wtxid_relay = true;
m_wtxid_relay_peers++; m_wtxid_relay_peers++;
} else { } else {
LogPrint(BCLog::NET, "ignoring duplicate wtxidrelay from peer=%d\n", pfrom.GetId()); LogPrint(BCLog::NET, "ignoring duplicate wtxidrelay from peer=%d\n", pfrom.GetId());
@ -3020,7 +3016,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// Ignore INVs that don't match wtxidrelay setting. // Ignore INVs that don't match wtxidrelay setting.
// Note that orphan parent fetching always uses MSG_TX GETDATAs regardless of the wtxidrelay setting. // Note that orphan parent fetching always uses MSG_TX GETDATAs regardless of the wtxidrelay setting.
// This is fine as no INV messages are involved in that process. // This is fine as no INV messages are involved in that process.
if (State(pfrom.GetId())->m_wtxid_relay) { if (peer->m_wtxid_relay) {
if (inv.IsMsgTx()) continue; if (inv.IsMsgTx()) continue;
} else { } else {
if (inv.IsMsgWtx()) continue; if (inv.IsMsgWtx()) continue;
@ -3298,13 +3294,9 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
const uint256& txid = ptx->GetHash(); const uint256& txid = ptx->GetHash();
const uint256& wtxid = ptx->GetWitnessHash(); const uint256& wtxid = ptx->GetWitnessHash();
LOCK2(cs_main, g_cs_orphans); const uint256& hash = peer->m_wtxid_relay ? wtxid : txid;
CNodeState* nodestate = State(pfrom.GetId());
const uint256& hash = nodestate->m_wtxid_relay ? wtxid : txid;
pfrom.AddKnownTx(hash); pfrom.AddKnownTx(hash);
if (nodestate->m_wtxid_relay && txid != wtxid) { if (peer->m_wtxid_relay && txid != wtxid) {
// Insert txid into filterInventoryKnown, even for // Insert txid into filterInventoryKnown, even for
// wtxidrelay peers. This prevents re-adding of // wtxidrelay peers. This prevents re-adding of
// unconfirmed parents to the recently_announced // unconfirmed parents to the recently_announced
@ -3313,6 +3305,8 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
pfrom.AddKnownTx(txid); pfrom.AddKnownTx(txid);
} }
LOCK2(cs_main, g_cs_orphans);
m_txrequest.ReceivedResponse(pfrom.GetId(), txid); m_txrequest.ReceivedResponse(pfrom.GetId(), txid);
if (tx.HasWitness()) m_txrequest.ReceivedResponse(pfrom.GetId(), wtxid); if (tx.HasWitness()) m_txrequest.ReceivedResponse(pfrom.GetId(), wtxid);
@ -3337,7 +3331,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
LogPrintf("Not relaying non-mempool transaction %s from forcerelay peer=%d\n", tx.GetHash().ToString(), pfrom.GetId()); LogPrintf("Not relaying non-mempool transaction %s from forcerelay peer=%d\n", tx.GetHash().ToString(), pfrom.GetId());
} else { } else {
LogPrintf("Force relaying tx %s from peer=%d\n", tx.GetHash().ToString(), pfrom.GetId()); LogPrintf("Force relaying tx %s from peer=%d\n", tx.GetHash().ToString(), pfrom.GetId());
_RelayTransaction(tx.GetHash(), tx.GetWitnessHash()); RelayTransaction(tx.GetHash(), tx.GetWitnessHash());
} }
} }
return; return;
@ -3351,7 +3345,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type,
// requests for it. // requests for it.
m_txrequest.ForgetTxHash(tx.GetHash()); m_txrequest.ForgetTxHash(tx.GetHash());
m_txrequest.ForgetTxHash(tx.GetWitnessHash()); m_txrequest.ForgetTxHash(tx.GetWitnessHash());
_RelayTransaction(tx.GetHash(), tx.GetWitnessHash()); RelayTransaction(tx.GetHash(), tx.GetWitnessHash());
m_orphanage.AddChildrenToWorkSet(tx, peer->m_orphan_work_set); m_orphanage.AddChildrenToWorkSet(tx, peer->m_orphan_work_set);
pfrom.m_last_tx_time = GetTime<std::chrono::seconds>(); pfrom.m_last_tx_time = GetTime<std::chrono::seconds>();
@ -4841,8 +4835,8 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
LOCK(pto->m_tx_relay->cs_filter); LOCK(pto->m_tx_relay->cs_filter);
for (const auto& txinfo : vtxinfo) { for (const auto& txinfo : vtxinfo) {
const uint256& hash = state.m_wtxid_relay ? txinfo.tx->GetWitnessHash() : txinfo.tx->GetHash(); const uint256& hash = peer->m_wtxid_relay ? txinfo.tx->GetWitnessHash() : txinfo.tx->GetHash();
CInv inv(state.m_wtxid_relay ? MSG_WTX : MSG_TX, hash); CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
pto->m_tx_relay->setInventoryTxToSend.erase(hash); pto->m_tx_relay->setInventoryTxToSend.erase(hash);
// Don't send transactions that peers will not put into their mempool // Don't send transactions that peers will not put into their mempool
if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) { if (txinfo.fee < filterrate.GetFee(txinfo.vsize)) {
@ -4873,7 +4867,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
const CFeeRate filterrate{pto->m_tx_relay->minFeeFilter.load()}; const CFeeRate filterrate{pto->m_tx_relay->minFeeFilter.load()};
// Topologically and fee-rate sort the inventory we send for privacy and priority reasons. // Topologically and fee-rate sort the inventory we send for privacy and priority reasons.
// A heap is used so that not all items need sorting if only a few are being sent. // A heap is used so that not all items need sorting if only a few are being sent.
CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool, state.m_wtxid_relay); CompareInvMempoolOrder compareInvMempoolOrder(&m_mempool, peer->m_wtxid_relay);
std::make_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder); std::make_heap(vInvTx.begin(), vInvTx.end(), compareInvMempoolOrder);
// No reason to drain out at many times the network's capacity, // No reason to drain out at many times the network's capacity,
// especially since we have many peers and some will draw much shorter delays. // especially since we have many peers and some will draw much shorter delays.
@ -4885,7 +4879,7 @@ bool PeerManagerImpl::SendMessages(CNode* pto)
std::set<uint256>::iterator it = vInvTx.back(); std::set<uint256>::iterator it = vInvTx.back();
vInvTx.pop_back(); vInvTx.pop_back();
uint256 hash = *it; uint256 hash = *it;
CInv inv(state.m_wtxid_relay ? MSG_WTX : MSG_TX, hash); CInv inv(peer->m_wtxid_relay ? MSG_WTX : MSG_TX, hash);
// Remove it from the to-be-sent set // Remove it from the to-be-sent set
pto->m_tx_relay->setInventoryTxToSend.erase(it); pto->m_tx_relay->setInventoryTxToSend.erase(it);
// Check if not in the filter already // Check if not in the filter already

Loading…
Cancel
Save