|
|
|
@ -606,34 +606,21 @@ void CNode::copyStats(CNodeStats &stats, const std::vector<bool> &m_asmap)
|
|
|
|
|
}
|
|
|
|
|
#undef X
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Receive bytes from the buffer and deserialize them into messages.
|
|
|
|
|
*
|
|
|
|
|
* @param[in] pch A pointer to the raw data
|
|
|
|
|
* @param[in] nBytes Size of the data
|
|
|
|
|
* @param[out] complete Set True if at least one message has been
|
|
|
|
|
* deserialized and is ready to be processed
|
|
|
|
|
* @return True if the peer should stay connected,
|
|
|
|
|
* False if the peer should be disconnected from.
|
|
|
|
|
*/
|
|
|
|
|
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
|
|
|
|
|
bool CNode::ReceiveMsgBytes(Span<const char> msg_bytes, bool& complete)
|
|
|
|
|
{
|
|
|
|
|
complete = false;
|
|
|
|
|
const auto time = GetTime<std::chrono::microseconds>();
|
|
|
|
|
LOCK(cs_vRecv);
|
|
|
|
|
nLastRecv = std::chrono::duration_cast<std::chrono::seconds>(time).count();
|
|
|
|
|
nRecvBytes += nBytes;
|
|
|
|
|
while (nBytes > 0) {
|
|
|
|
|
nRecvBytes += msg_bytes.size();
|
|
|
|
|
while (msg_bytes.size() > 0) {
|
|
|
|
|
// absorb network data
|
|
|
|
|
int handled = m_deserializer->Read(pch, nBytes);
|
|
|
|
|
int handled = m_deserializer->Read(msg_bytes);
|
|
|
|
|
if (handled < 0) {
|
|
|
|
|
// Serious header problem, disconnect from the peer.
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pch += handled;
|
|
|
|
|
nBytes -= handled;
|
|
|
|
|
|
|
|
|
|
if (m_deserializer->Complete()) {
|
|
|
|
|
// decompose a transport agnostic CNetMessage from the deserializer
|
|
|
|
|
uint32_t out_err_raw_size{0};
|
|
|
|
@ -663,13 +650,13 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int V1TransportDeserializer::readHeader(const char *pch, unsigned int nBytes)
|
|
|
|
|
int V1TransportDeserializer::readHeader(Span<const char> msg_bytes)
|
|
|
|
|
{
|
|
|
|
|
// copy data to temporary parsing buffer
|
|
|
|
|
unsigned int nRemaining = CMessageHeader::HEADER_SIZE - nHdrPos;
|
|
|
|
|
unsigned int nCopy = std::min(nRemaining, nBytes);
|
|
|
|
|
unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size());
|
|
|
|
|
|
|
|
|
|
memcpy(&hdrbuf[nHdrPos], pch, nCopy);
|
|
|
|
|
memcpy(&hdrbuf[nHdrPos], msg_bytes.data(), nCopy);
|
|
|
|
|
nHdrPos += nCopy;
|
|
|
|
|
|
|
|
|
|
// if header incomplete, exit
|
|
|
|
@ -703,18 +690,18 @@ int V1TransportDeserializer::readHeader(const char *pch, unsigned int nBytes)
|
|
|
|
|
return nCopy;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int V1TransportDeserializer::readData(const char *pch, unsigned int nBytes)
|
|
|
|
|
int V1TransportDeserializer::readData(Span<const char> msg_bytes)
|
|
|
|
|
{
|
|
|
|
|
unsigned int nRemaining = hdr.nMessageSize - nDataPos;
|
|
|
|
|
unsigned int nCopy = std::min(nRemaining, nBytes);
|
|
|
|
|
unsigned int nCopy = std::min<unsigned int>(nRemaining, msg_bytes.size());
|
|
|
|
|
|
|
|
|
|
if (vRecv.size() < nDataPos + nCopy) {
|
|
|
|
|
// Allocate up to 256 KiB ahead, but never more than the total message size.
|
|
|
|
|
vRecv.resize(std::min(hdr.nMessageSize, nDataPos + nCopy + 256 * 1024));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
hasher.Write({(const unsigned char*)pch, nCopy});
|
|
|
|
|
memcpy(&vRecv[nDataPos], pch, nCopy);
|
|
|
|
|
hasher.Write(MakeUCharSpan(msg_bytes.first(nCopy)));
|
|
|
|
|
memcpy(&vRecv[nDataPos], msg_bytes.data(), nCopy);
|
|
|
|
|
nDataPos += nCopy;
|
|
|
|
|
|
|
|
|
|
return nCopy;
|
|
|
|
@ -1463,7 +1450,7 @@ void CConnman::SocketHandler()
|
|
|
|
|
if (nBytes > 0)
|
|
|
|
|
{
|
|
|
|
|
bool notify = false;
|
|
|
|
|
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
|
|
|
|
|
if (!pnode->ReceiveMsgBytes(Span<const char>(pchBuf, nBytes), notify))
|
|
|
|
|
pnode->CloseSocketDisconnect();
|
|
|
|
|
RecordBytesRecv(nBytes);
|
|
|
|
|
if (notify) {
|
|
|
|
|