@ -68,11 +68,13 @@ static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
/** Maximum number of announced transactions from a peer */
/** Maximum number of announced transactions from a peer */
static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ ;
static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ ;
/** How many microseconds to delay requesting transactions from inbound peers */
/** How many microseconds to delay requesting transactions from inbound peers */
static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000 ;
static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000 ; // 2 seconds
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000 ;
static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000 ; // 1 minute
/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000 ;
static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000 ; // 2 seconds
/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */
static constexpr int64_t TX_EXPIRY_INTERVAL = 10 * GETDATA_TX_INTERVAL ;
static_assert ( INBOUND_PEER_TX_DELAY > = MAX_GETDATA_RANDOM_DELAY ,
static_assert ( INBOUND_PEER_TX_DELAY > = MAX_GETDATA_RANDOM_DELAY ,
" To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY " ) ;
" To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY " ) ;
/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
@ -343,8 +345,11 @@ struct CNodeState {
//! Store all the transactions a peer has recently announced
//! Store all the transactions a peer has recently announced
std : : set < uint256 > m_tx_announced ;
std : : set < uint256 > m_tx_announced ;
//! Store transactions which were requested by us
//! Store transactions which were requested by us, with timestamp
std : : set < uint256 > m_tx_in_flight ;
std : : map < uint256 , int64_t > m_tx_in_flight ;
//! Periodically check for stuck getdata requests
int64_t m_check_expiry_timer { 0 } ;
} ;
} ;
TxDownloadState m_tx_download ;
TxDownloadState m_tx_download ;
@ -702,30 +707,40 @@ void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LO
}
}
}
}
int64_t CalculateTxGetDataTime ( const uint256 & txid , int64_t current_time , bool use_inbound_delay ) EXCLUSIVE_LOCKS_REQUIRED ( cs_main )
void RequestTx ( CNodeState * state , const uint256 & txid , int64_t nNow ) EXCLUSIVE_LOCKS_REQUIRED ( cs_main )
{
{
CNodeState : : TxDownloadState & peer_download_state = state - > m_tx_download ;
if ( peer_download_state . m_tx_announced . size ( ) > = MAX_PEER_TX_ANNOUNCEMENTS | | peer_download_state . m_tx_announced . count ( txid ) ) {
// Too many queued announcements from this peer, or we already have
// this announcement
return ;
}
peer_download_state . m_tx_announced . insert ( txid ) ;
int64_t process_time ;
int64_t process_time ;
int64_t last_request_time = GetTxRequestTime ( txid ) ;
int64_t last_request_time = GetTxRequestTime ( txid ) ;
// First time requesting this tx
// First time requesting this tx
if ( last_request_time = = 0 ) {
if ( last_request_time = = 0 ) {
process_time = nNow ;
process_time = current_time ;
} else {
} else {
// Randomize the delay to avoid biasing some peers over others (such as due to
// Randomize the delay to avoid biasing some peers over others (such as due to
// fixed ordering of peer processing in ThreadMessageHandler)
// fixed ordering of peer processing in ThreadMessageHandler)
process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand ( MAX_GETDATA_RANDOM_DELAY ) ;
process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand ( MAX_GETDATA_RANDOM_DELAY ) ;
}
}
// We delay processing announcements from non-preferred (eg inbound) peers
// We delay processing announcements from inbound peers
if ( ! state - > fPreferredDownload ) process_time + = INBOUND_PEER_TX_DELAY ;
if ( use_inbound_delay ) process_time + = INBOUND_PEER_TX_DELAY ;
return process_time ;
}
void RequestTx ( CNodeState * state , const uint256 & txid , int64_t nNow ) EXCLUSIVE_LOCKS_REQUIRED ( cs_main )
{
CNodeState : : TxDownloadState & peer_download_state = state - > m_tx_download ;
if ( peer_download_state . m_tx_announced . size ( ) > = MAX_PEER_TX_ANNOUNCEMENTS | |
peer_download_state . m_tx_process_time . size ( ) > = MAX_PEER_TX_ANNOUNCEMENTS | |
peer_download_state . m_tx_announced . count ( txid ) ) {
// Too many queued announcements from this peer, or we already have
// this announcement
return ;
}
peer_download_state . m_tx_announced . insert ( txid ) ;
// Calculate the time to try requesting this transaction. Use
// fPreferredDownload as a proxy for outbound peers.
int64_t process_time = CalculateTxGetDataTime ( txid , nNow , ! state - > fPreferredDownload ) ;
peer_download_state . m_tx_process_time . emplace ( process_time , txid ) ;
peer_download_state . m_tx_process_time . emplace ( process_time , txid ) ;
}
}
@ -1544,12 +1559,19 @@ void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnm
if ( ! vNotFound . empty ( ) ) {
if ( ! vNotFound . empty ( ) ) {
// Let the peer know that we didn't find what it asked for, so it doesn't
// Let the peer know that we didn't find what it asked for, so it doesn't
// have to wait around forever. Currently only SPV clients actually care
// have to wait around forever.
// about this message: it's needed when they are recursively walking the
// SPV clients care about this message: it's needed when they are
// dependencies of relevant unconfirmed transactions. SPV clients want to
// recursively walking the dependencies of relevant unconfirmed
// do that because they want to know about (and store and rebroadcast and
// transactions. SPV clients want to do that because they want to know
// risk analyze) the dependencies of transactions relevant to them, without
// about (and store and rebroadcast and risk analyze) the dependencies
// having to download the entire memory pool.
// of transactions relevant to them, without having to download the
// entire memory pool.
// Also, other nodes can use these messages to automatically request a
// transaction from some other peer that annnounced it, and stop
// waiting for us to respond.
// In normal operation, we often send NOTFOUND messages for parents of
// transactions that we relay; if a peer is missing a parent, they may
// assume we have them and request the parents from us.
connman - > PushMessage ( pfrom , msgMaker . Make ( NetMsgType : : NOTFOUND , vNotFound ) ) ;
connman - > PushMessage ( pfrom , msgMaker . Make ( NetMsgType : : NOTFOUND , vNotFound ) ) ;
}
}
}
}
@ -3146,8 +3168,27 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
}
}
if ( strCommand = = NetMsgType : : NOTFOUND ) {
if ( strCommand = = NetMsgType : : NOTFOUND ) {
// We do not care about the NOTFOUND message, but logging an Unknown Command
// Remove the NOTFOUND transactions from the peer
// message would be undesirable as we transmit it ourselves.
LOCK ( cs_main ) ;
CNodeState * state = State ( pfrom - > GetId ( ) ) ;
std : : vector < CInv > vInv ;
vRecv > > vInv ;
if ( vInv . size ( ) < = MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER ) {
for ( CInv & inv : vInv ) {
if ( inv . type = = MSG_TX | | inv . type = = MSG_WITNESS_TX ) {
// If we receive a NOTFOUND message for a txid we requested, erase
// it from our data structures for this peer.
auto in_flight_it = state - > m_tx_download . m_tx_in_flight . find ( inv . hash ) ;
if ( in_flight_it = = state - > m_tx_download . m_tx_in_flight . end ( ) ) {
// Skip any further work if this is a spurious NOTFOUND
// message.
continue ;
}
state - > m_tx_download . m_tx_in_flight . erase ( in_flight_it ) ;
state - > m_tx_download . m_tx_announced . erase ( inv . hash ) ;
}
}
}
return true ;
return true ;
}
}
@ -3945,9 +3986,33 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
//
//
// Message: getdata (non-blocks)
// Message: getdata (non-blocks)
//
//
// For robustness, expire old requests after a long timeout, so that
// we can resume downloading transactions from a peer even if they
// were unresponsive in the past.
// Eventually we should consider disconnecting peers, but this is
// conservative.
if ( state . m_tx_download . m_check_expiry_timer < = nNow ) {
for ( auto it = state . m_tx_download . m_tx_in_flight . begin ( ) ; it ! = state . m_tx_download . m_tx_in_flight . end ( ) ; ) {
if ( it - > second < = nNow - TX_EXPIRY_INTERVAL ) {
LogPrint ( BCLog : : NET , " timeout of inflight tx %s from peer=%d \n " , it - > first . ToString ( ) , pto - > GetId ( ) ) ;
state . m_tx_download . m_tx_announced . erase ( it - > first ) ;
state . m_tx_download . m_tx_in_flight . erase ( it + + ) ;
} else {
+ + it ;
}
}
// On average, we do this check every TX_EXPIRY_INTERVAL. Randomize
// so that we're not doing this for all peers at the same time.
state . m_tx_download . m_check_expiry_timer = nNow + TX_EXPIRY_INTERVAL / 2 + GetRand ( TX_EXPIRY_INTERVAL ) ;
}
auto & tx_process_time = state . m_tx_download . m_tx_process_time ;
auto & tx_process_time = state . m_tx_download . m_tx_process_time ;
while ( ! tx_process_time . empty ( ) & & tx_process_time . begin ( ) - > first < = nNow & & state . m_tx_download . m_tx_in_flight . size ( ) < MAX_PEER_TX_IN_FLIGHT ) {
while ( ! tx_process_time . empty ( ) & & tx_process_time . begin ( ) - > first < = nNow & & state . m_tx_download . m_tx_in_flight . size ( ) < MAX_PEER_TX_IN_FLIGHT ) {
const uint256 & txid = tx_process_time . begin ( ) - > second ;
const uint256 txid = tx_process_time . begin ( ) - > second ;
// Erase this entry from tx_process_time (it may be added back for
// processing at a later time, see below)
tx_process_time . erase ( tx_process_time . begin ( ) ) ;
CInv inv ( MSG_TX | GetFetchFlags ( pto ) , txid ) ;
CInv inv ( MSG_TX | GetFetchFlags ( pto ) , txid ) ;
if ( ! AlreadyHave ( inv ) ) {
if ( ! AlreadyHave ( inv ) ) {
// If this transaction was last requested more than 1 minute ago,
// If this transaction was last requested more than 1 minute ago,
@ -3961,20 +4026,20 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
vGetData . clear ( ) ;
vGetData . clear ( ) ;
}
}
UpdateTxRequestTime ( inv . hash , nNow ) ;
UpdateTxRequestTime ( inv . hash , nNow ) ;
state . m_tx_download . m_tx_in_flight . insert( inv . hash ) ;
state . m_tx_download . m_tx_in_flight . emplace( inv . hash , nNow ) ;
} else {
} else {
// This transaction is in flight from someone else; queue
// This transaction is in flight from someone else; queue
// up processing to happen after the download times out
// up processing to happen after the download times out
// (with a slight delay for inbound peers, to prefer
// (with a slight delay for inbound peers, to prefer
// requests to outbound peers).
// requests to outbound peers).
RequestTx ( & state , txid , nNow ) ;
int64_t next_process_time = CalculateTxGetDataTime ( txid , nNow , ! state . fPreferredDownload ) ;
tx_process_time . emplace ( next_process_time , txid ) ;
}
}
} else {
} else {
// We have already seen this transaction, no need to download.
// We have already seen this transaction, no need to download.
state . m_tx_download . m_tx_announced . erase ( inv . hash ) ;
state . m_tx_download . m_tx_announced . erase ( inv . hash ) ;
state . m_tx_download . m_tx_in_flight . erase ( inv . hash ) ;
state . m_tx_download . m_tx_in_flight . erase ( inv . hash ) ;
}
}
tx_process_time . erase ( tx_process_time . begin ( ) ) ;
}
}