@ -72,13 +72,35 @@ private:
std : : deque < WorkItem * > queue ;
bool running ;
size_t maxDepth ;
int numThreads ;
/** RAII object to keep track of number of running worker threads */
class ThreadCounter
{
public :
WorkQueue & wq ;
ThreadCounter ( WorkQueue & w ) : wq ( w )
{
boost : : lock_guard < boost : : mutex > lock ( wq . cs ) ;
wq . numThreads + = 1 ;
}
~ ThreadCounter ( )
{
boost : : lock_guard < boost : : mutex > lock ( wq . cs ) ;
wq . numThreads - = 1 ;
wq . cond . notify_all ( ) ;
}
} ;
public :
WorkQueue ( size_t maxDepth ) : running ( true ) ,
maxDepth ( maxDepth )
maxDepth ( maxDepth ) ,
numThreads ( 0 )
{
}
/* Precondition: worker threads have all stopped */
/*( Precondition: worker threads have all stopped
* ( call WaitExit )
*/
~ WorkQueue ( )
{
while ( ! queue . empty ( ) ) {
@ -100,6 +122,7 @@ public:
/** Thread function */
void Run ( )
{
ThreadCounter count ( * this ) ;
while ( running ) {
WorkItem * i = 0 ;
{
@ -122,6 +145,13 @@ public:
running = false ;
cond . notify_all ( ) ;
}
/** Wait for worker threads to exit */
void WaitExit ( )
{
boost : : unique_lock < boost : : mutex > lock ( cs ) ;
while ( numThreads > 0 )
cond . wait ( lock ) ;
}
/** Return current depth of queue */
size_t Depth ( )
@ -155,6 +185,8 @@ static std::vector<CSubNet> rpc_allow_subnets;
static WorkQueue < HTTPClosure > * workQueue = 0 ;
//! Handlers for (sub)paths
std : : vector < HTTPPathHandler > pathHandlers ;
//! Bound listening sockets
std : : vector < evhttp_bound_socket * > boundSockets ;
/** Check if a network address is allowed to access the HTTP server */
static bool ClientAllowed ( const CNetAddr & netaddr )
@ -264,6 +296,13 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
}
}
/** Callback to reject HTTP requests after shutdown. */
static void http_reject_request_cb ( struct evhttp_request * req , void * )
{
LogPrint ( " http " , " Rejecting request while shutting down \n " ) ;
evhttp_send_error ( req , HTTP_SERVUNAVAIL , NULL ) ;
}
/** Event dispatcher thread */
static void ThreadHTTP ( struct event_base * base , struct evhttp * http )
{
@ -278,7 +317,6 @@ static void ThreadHTTP(struct event_base* base, struct evhttp* http)
static bool HTTPBindAddresses ( struct evhttp * http )
{
int defaultPort = GetArg ( " -rpcport " , BaseParams ( ) . RPCPort ( ) ) ;
int nBound = 0 ;
std : : vector < std : : pair < std : : string , uint16_t > > endpoints ;
// Determine what addresses to bind to
@ -304,13 +342,14 @@ static bool HTTPBindAddresses(struct evhttp* http)
// Bind addresses
for ( std : : vector < std : : pair < std : : string , uint16_t > > : : iterator i = endpoints . begin ( ) ; i ! = endpoints . end ( ) ; + + i ) {
LogPrint ( " http " , " Binding RPC on address %s port %i \n " , i - > first , i - > second ) ;
if ( evhttp_bind_socket ( http , i - > first . empty ( ) ? NULL : i - > first . c_str ( ) , i - > second ) = = 0 ) {
nBound + = 1 ;
evhttp_bound_socket * bind_handle = evhttp_bind_socket_with_handle ( http , i - > first . empty ( ) ? NULL : i - > first . c_str ( ) , i - > second ) ;
if ( bind_handle ) {
boundSockets . push_back ( bind_handle ) ;
} else {
LogPrintf ( " Binding RPC on address %s port %i failed. \n " , i - > first , i - > second ) ;
}
}
return nBound > 0 ;
return ! boundSockets . empty ( ) ;
}
/** Simple wrapper to set thread name and run work queue */
@ -410,8 +449,21 @@ bool StartHTTPServer(boost::thread_group& threadGroup)
void InterruptHTTPServer ( )
{
LogPrint ( " http " , " Interrupting HTTP server \n " ) ;
if ( eventBase )
event_base_loopbreak ( eventBase ) ;
if ( eventHTTP ) {
// Unlisten sockets
BOOST_FOREACH ( evhttp_bound_socket * socket , boundSockets ) {
evhttp_del_accept_socket ( eventHTTP , socket ) ;
}
// Reject requests on current connections
evhttp_set_gencb ( eventHTTP , http_reject_request_cb , NULL ) ;
}
if ( eventBase ) {
// Force-exit event loop after predefined time
struct timeval tv ;
tv . tv_sec = 10 ;
tv . tv_usec = 0 ;
event_base_loopexit ( eventBase , & tv ) ;
}
if ( workQueue )
workQueue - > Interrupt ( ) ;
}
@ -419,7 +471,11 @@ void InterruptHTTPServer()
void StopHTTPServer ( )
{
LogPrint ( " http " , " Stopping HTTP server \n " ) ;
delete workQueue ;
if ( workQueue ) {
LogPrint ( " http " , " Waiting for HTTP worker threads to exit \n " ) ;
workQueue - > WaitExit ( ) ;
delete workQueue ;
}
if ( eventHTTP ) {
evhttp_free ( eventHTTP ) ;
eventHTTP = 0 ;