net: Move SocketSendData lock annotation to header

Also, add lock annotation to SendMessages

Can be reviewed with "--word-diff-regex=."
pull/826/head
MarcoFalke 4 years ago
parent fa0a71781a
commit fa210689e2
No known key found for this signature in database
GPG Key ID: CE2B75697E69A548

@ -790,30 +790,30 @@ void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vec
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr}; CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
} }
size_t CConnman::SocketSendData(CNode *pnode) const EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_vSend) size_t CConnman::SocketSendData(CNode& node) const
{ {
auto it = pnode->vSendMsg.begin(); auto it = node.vSendMsg.begin();
size_t nSentSize = 0; size_t nSentSize = 0;
while (it != pnode->vSendMsg.end()) { while (it != node.vSendMsg.end()) {
const auto &data = *it; const auto& data = *it;
assert(data.size() > pnode->nSendOffset); assert(data.size() > node.nSendOffset);
int nBytes = 0; int nBytes = 0;
{ {
LOCK(pnode->cs_hSocket); LOCK(node.cs_hSocket);
if (pnode->hSocket == INVALID_SOCKET) if (node.hSocket == INVALID_SOCKET)
break; break;
nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); nBytes = send(node.hSocket, reinterpret_cast<const char*>(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
} }
if (nBytes > 0) { if (nBytes > 0) {
pnode->nLastSend = GetSystemTimeInSeconds(); node.nLastSend = GetSystemTimeInSeconds();
pnode->nSendBytes += nBytes; node.nSendBytes += nBytes;
pnode->nSendOffset += nBytes; node.nSendOffset += nBytes;
nSentSize += nBytes; nSentSize += nBytes;
if (pnode->nSendOffset == data.size()) { if (node.nSendOffset == data.size()) {
pnode->nSendOffset = 0; node.nSendOffset = 0;
pnode->nSendSize -= data.size(); node.nSendSize -= data.size();
pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize; node.fPauseSend = node.nSendSize > nSendBufferMaxSize;
it++; it++;
} else { } else {
// could not send full message; stop sending more // could not send full message; stop sending more
@ -823,10 +823,9 @@ size_t CConnman::SocketSendData(CNode *pnode) const EXCLUSIVE_LOCKS_REQUIRED(pno
if (nBytes < 0) { if (nBytes < 0) {
// error // error
int nErr = WSAGetLastError(); int nErr = WSAGetLastError();
if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) {
{
LogPrintf("socket send error %s\n", NetworkErrorString(nErr)); LogPrintf("socket send error %s\n", NetworkErrorString(nErr));
pnode->CloseSocketDisconnect(); node.CloseSocketDisconnect();
} }
} }
// couldn't send anything at all // couldn't send anything at all
@ -834,11 +833,11 @@ size_t CConnman::SocketSendData(CNode *pnode) const EXCLUSIVE_LOCKS_REQUIRED(pno
} }
} }
if (it == pnode->vSendMsg.end()) { if (it == node.vSendMsg.end()) {
assert(pnode->nSendOffset == 0); assert(node.nSendOffset == 0);
assert(pnode->nSendSize == 0); assert(node.nSendSize == 0);
} }
pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); node.vSendMsg.erase(node.vSendMsg.begin(), it);
return nSentSize; return nSentSize;
} }
@ -1508,7 +1507,7 @@ void CConnman::SocketHandler()
if (sendSet) { if (sendSet) {
// Send data // Send data
size_t bytes_sent = WITH_LOCK(pnode->cs_vSend, return SocketSendData(pnode)); size_t bytes_sent = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
if (bytes_sent) RecordBytesSent(bytes_sent); if (bytes_sent) RecordBytesSent(bytes_sent);
} }
@ -2992,7 +2991,7 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
// If write queue empty, attempt "optimistic write" // If write queue empty, attempt "optimistic write"
if (optimisticSend == true) if (optimisticSend == true)
nBytesSent = SocketSendData(pnode); nBytesSent = SocketSendData(*pnode);
} }
if (nBytesSent) if (nBytesSent)
RecordBytesSent(nBytesSent); RecordBytesSent(nBytesSent);

@ -778,7 +778,7 @@ class NetEventsInterface
{ {
public: public:
virtual bool ProcessMessages(CNode* pnode, std::atomic<bool>& interrupt) = 0; virtual bool ProcessMessages(CNode* pnode, std::atomic<bool>& interrupt) = 0;
virtual bool SendMessages(CNode* pnode) = 0; virtual bool SendMessages(CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(pnode->cs_sendProcessing) = 0;
virtual void InitializeNode(CNode* pnode) = 0; virtual void InitializeNode(CNode* pnode) = 0;
virtual void FinalizeNode(const CNode& node, bool& update_connection_time) = 0; virtual void FinalizeNode(const CNode& node, bool& update_connection_time) = 0;
@ -1057,7 +1057,7 @@ private:
NodeId GetNewNodeId(); NodeId GetNewNodeId();
size_t SocketSendData(CNode *pnode) const; size_t SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
void DumpAddresses(); void DumpAddresses();
// Network stats // Network stats

Loading…
Cancel
Save