@ -4,6 +4,7 @@
# include <compat.h>
# include <compat.h>
# include <logging.h>
# include <logging.h>
# include <threadinterrupt.h>
# include <tinyformat.h>
# include <tinyformat.h>
# include <util/sock.h>
# include <util/sock.h>
# include <util/system.h>
# include <util/system.h>
@ -12,12 +13,18 @@
# include <codecvt>
# include <codecvt>
# include <cwchar>
# include <cwchar>
# include <locale>
# include <locale>
# include <stdexcept>
# include <string>
# include <string>
# ifdef USE_POLL
# ifdef USE_POLL
# include <poll.h>
# include <poll.h>
# endif
# endif
static inline bool IOErrorIsPermanent ( int err )
{
return err ! = WSAEAGAIN & & err ! = WSAEINTR & & err ! = WSAEWOULDBLOCK & & err ! = WSAEINPROGRESS ;
}
Sock : : Sock ( ) : m_socket ( INVALID_SOCKET ) { }
Sock : : Sock ( ) : m_socket ( INVALID_SOCKET ) { }
Sock : : Sock ( SOCKET s ) : m_socket ( s ) { }
Sock : : Sock ( SOCKET s ) : m_socket ( s ) { }
@ -125,6 +132,124 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occur
# endif /* USE_POLL */
# endif /* USE_POLL */
}
}
void Sock : : SendComplete ( const std : : string & data ,
std : : chrono : : milliseconds timeout ,
CThreadInterrupt & interrupt ) const
{
const auto deadline = GetTime < std : : chrono : : milliseconds > ( ) + timeout ;
size_t sent { 0 } ;
for ( ; ; ) {
const ssize_t ret { Send ( data . data ( ) + sent , data . size ( ) - sent , MSG_NOSIGNAL ) } ;
if ( ret > 0 ) {
sent + = static_cast < size_t > ( ret ) ;
if ( sent = = data . size ( ) ) {
break ;
}
} else {
const int err { WSAGetLastError ( ) } ;
if ( IOErrorIsPermanent ( err ) ) {
throw std : : runtime_error ( strprintf ( " send(): %s " , NetworkErrorString ( err ) ) ) ;
}
}
const auto now = GetTime < std : : chrono : : milliseconds > ( ) ;
if ( now > = deadline ) {
throw std : : runtime_error ( strprintf (
" Send timeout (sent only %u of %u bytes before that) " , sent , data . size ( ) ) ) ;
}
if ( interrupt ) {
throw std : : runtime_error ( strprintf (
" Send interrupted (sent only %u of %u bytes before that) " , sent , data . size ( ) ) ) ;
}
// Wait for a short while (or the socket to become ready for sending) before retrying
// if nothing was sent.
const auto wait_time = std : : min ( deadline - now , std : : chrono : : milliseconds { MAX_WAIT_FOR_IO } ) ;
Wait ( wait_time , SEND ) ;
}
}
std : : string Sock : : RecvUntilTerminator ( uint8_t terminator ,
std : : chrono : : milliseconds timeout ,
CThreadInterrupt & interrupt ) const
{
const auto deadline = GetTime < std : : chrono : : milliseconds > ( ) + timeout ;
std : : string data ;
bool terminator_found { false } ;
// We must not consume any bytes past the terminator from the socket.
// One option is to read one byte at a time and check if we have read a terminator.
// However that is very slow. Instead, we peek at what is in the socket and only read
// as many bytes as possible without crossing the terminator.
// Reading 64 MiB of random data with 262526 terminator chars takes 37 seconds to read
// one byte at a time VS 0.71 seconds with the "peek" solution below. Reading one byte
// at a time is about 50 times slower.
for ( ; ; ) {
char buf [ 512 ] ;
const ssize_t peek_ret { Recv ( buf , sizeof ( buf ) , MSG_PEEK ) } ;
switch ( peek_ret ) {
case - 1 : {
const int err { WSAGetLastError ( ) } ;
if ( IOErrorIsPermanent ( err ) ) {
throw std : : runtime_error ( strprintf ( " recv(): %s " , NetworkErrorString ( err ) ) ) ;
}
break ;
}
case 0 :
throw std : : runtime_error ( " Connection unexpectedly closed by peer " ) ;
default :
auto end = buf + peek_ret ;
auto terminator_pos = std : : find ( buf , end , terminator ) ;
terminator_found = terminator_pos ! = end ;
const size_t try_len { terminator_found ? terminator_pos - buf + 1 :
static_cast < size_t > ( peek_ret ) } ;
const ssize_t read_ret { Recv ( buf , try_len , 0 ) } ;
if ( read_ret < 0 | | static_cast < size_t > ( read_ret ) ! = try_len ) {
throw std : : runtime_error (
strprintf ( " recv() returned %u bytes on attempt to read %u bytes but previous "
" peek claimed %u bytes are available " ,
read_ret , try_len , peek_ret ) ) ;
}
// Don't include the terminator in the output.
const size_t append_len { terminator_found ? try_len - 1 : try_len } ;
data . append ( buf , buf + append_len ) ;
if ( terminator_found ) {
return data ;
}
}
const auto now = GetTime < std : : chrono : : milliseconds > ( ) ;
if ( now > = deadline ) {
throw std : : runtime_error ( strprintf (
" Receive timeout (received %u bytes without terminator before that) " , data . size ( ) ) ) ;
}
if ( interrupt ) {
throw std : : runtime_error ( strprintf (
" Receive interrupted (received %u bytes without terminator before that) " ,
data . size ( ) ) ) ;
}
// Wait for a short while (or the socket to become ready for reading) before retrying.
const auto wait_time = std : : min ( deadline - now , std : : chrono : : milliseconds { MAX_WAIT_FOR_IO } ) ;
Wait ( wait_time , RECV ) ;
}
}
# ifdef WIN32
# ifdef WIN32
std : : string NetworkErrorString ( int err )
std : : string NetworkErrorString ( int err )
{
{