From d0ebd93270758ea97ea956b8821e17a2d001ea94 Mon Sep 17 00:00:00 2001 From: Anthony Towns Date: Fri, 28 Feb 2020 12:04:04 +1000 Subject: [PATCH] scheduler: switch from boost to std Changes from boost::chrono to std::chrono, boost::condition_var to std::condition_var, boost::mutex to sync.h Mutex, and reverselock.h to sync.h REVERSE_LOCK. Also adds threadsafety annotations to CScheduler members. --- src/rpc/misc.cpp | 2 +- src/scheduler.cpp | 58 +++++++++++++----------------------- src/scheduler.h | 33 ++++++++++---------- src/test/scheduler_tests.cpp | 28 ++++++++--------- test/lint/lint-includes.sh | 1 - 5 files changed, 52 insertions(+), 70 deletions(-) diff --git a/src/rpc/misc.cpp b/src/rpc/misc.cpp index 60a3a0dda93..83571839343 100644 --- a/src/rpc/misc.cpp +++ b/src/rpc/misc.cpp @@ -393,7 +393,7 @@ static UniValue mockscheduler(const JSONRPCRequest& request) // protect against null pointer dereference CHECK_NONFATAL(g_rpc_node); CHECK_NONFATAL(g_rpc_node->scheduler); - g_rpc_node->scheduler->MockForward(boost::chrono::seconds(delta_seconds)); + g_rpc_node->scheduler->MockForward(std::chrono::seconds(delta_seconds)); return NullUniValue; } diff --git a/src/scheduler.cpp b/src/scheduler.cpp index 72cca89d99c..7cb7754fdec 100644 --- a/src/scheduler.cpp +++ b/src/scheduler.cpp @@ -5,7 +5,6 @@ #include #include -#include #include #include @@ -20,18 +19,9 @@ CScheduler::~CScheduler() } -#if BOOST_VERSION < 105000 -static boost::system_time toPosixTime(const boost::chrono::system_clock::time_point& t) -{ - // Creating the posix_time using from_time_t loses sub-second precision. So rather than exporting the time_point to time_t, - // start with a posix_time at the epoch (0) and add the milliseconds that have passed since then. - return boost::posix_time::from_time_t(0) + boost::posix_time::milliseconds(boost::chrono::duration_cast(t.time_since_epoch()).count()); -} -#endif - void CScheduler::serviceQueue() { - boost::unique_lock lock(newTaskMutex); + WAIT_LOCK(newTaskMutex, lock); ++nThreadsServicingQueue; // newTaskMutex is locked throughout this loop EXCEPT @@ -40,7 +30,7 @@ void CScheduler::serviceQueue() while (!shouldStop()) { try { if (!shouldStop() && taskQueue.empty()) { - reverse_lock > rlock(lock); + REVERSE_LOCK(lock); } while (!shouldStop() && taskQueue.empty()) { // Wait until there is something to do. @@ -50,21 +40,13 @@ void CScheduler::serviceQueue() // Wait until either there is a new task, or until // the time of the first item on the queue: -// wait_until needs boost 1.50 or later; older versions have timed_wait: -#if BOOST_VERSION < 105000 - while (!shouldStop() && !taskQueue.empty() && - newTaskScheduled.timed_wait(lock, toPosixTime(taskQueue.begin()->first))) { - // Keep waiting until timeout - } -#else - // Some boost versions have a conflicting overload of wait_until that returns void. - // Explicitly use a template here to avoid hitting that overload. while (!shouldStop() && !taskQueue.empty()) { - boost::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; - if (newTaskScheduled.wait_until<>(lock, timeToWaitFor) == boost::cv_status::timeout) + std::chrono::system_clock::time_point timeToWaitFor = taskQueue.begin()->first; + if (newTaskScheduled.wait_until(lock, timeToWaitFor) == std::cv_status::timeout) { break; // Exit loop after timeout, it means we reached the time of the event + } } -#endif + // If there are multiple threads, the queue can empty while we're waiting (another // thread may service the task we were waiting on). if (shouldStop() || taskQueue.empty()) @@ -76,7 +58,7 @@ void CScheduler::serviceQueue() { // Unlock before calling f, so it can reschedule itself or another task // without deadlocking: - reverse_lock > rlock(lock); + REVERSE_LOCK(lock); f(); } } catch (...) { @@ -91,7 +73,7 @@ void CScheduler::serviceQueue() void CScheduler::stop(bool drain) { { - boost::unique_lock lock(newTaskMutex); + LOCK(newTaskMutex); if (drain) stopWhenEmpty = true; else @@ -100,10 +82,10 @@ void CScheduler::stop(bool drain) newTaskScheduled.notify_all(); } -void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::time_point t) +void CScheduler::schedule(CScheduler::Function f, std::chrono::system_clock::time_point t) { { - boost::unique_lock lock(newTaskMutex); + LOCK(newTaskMutex); taskQueue.insert(std::make_pair(t, f)); } newTaskScheduled.notify_one(); @@ -111,18 +93,18 @@ void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::t void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds) { - schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds)); + schedule(f, std::chrono::system_clock::now() + std::chrono::milliseconds(deltaMilliSeconds)); } -void CScheduler::MockForward(boost::chrono::seconds delta_seconds) +void CScheduler::MockForward(std::chrono::seconds delta_seconds) { - assert(delta_seconds.count() > 0 && delta_seconds < boost::chrono::hours{1}); + assert(delta_seconds.count() > 0 && delta_seconds < std::chrono::hours{1}); { - boost::unique_lock lock(newTaskMutex); + LOCK(newTaskMutex); // use temp_queue to maintain updated schedule - std::multimap temp_queue; + std::multimap temp_queue; for (const auto& element : taskQueue) { temp_queue.emplace_hint(temp_queue.cend(), element.first - delta_seconds, element.second); @@ -147,10 +129,10 @@ void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds scheduleFromNow(std::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds); } -size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, - boost::chrono::system_clock::time_point &last) const +size_t CScheduler::getQueueInfo(std::chrono::system_clock::time_point &first, + std::chrono::system_clock::time_point &last) const { - boost::unique_lock lock(newTaskMutex); + LOCK(newTaskMutex); size_t result = taskQueue.size(); if (!taskQueue.empty()) { first = taskQueue.begin()->first; @@ -160,7 +142,7 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first, } bool CScheduler::AreThreadsServicingQueue() const { - boost::unique_lock lock(newTaskMutex); + LOCK(newTaskMutex); return nThreadsServicingQueue; } @@ -174,7 +156,7 @@ void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() { if (m_are_callbacks_running) return; if (m_callbacks_pending.empty()) return; } - m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this)); + m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this), std::chrono::system_clock::now()); } void SingleThreadedSchedulerClient::ProcessQueue() { diff --git a/src/scheduler.h b/src/scheduler.h index d18be0ea5eb..4d5aa3068e6 100644 --- a/src/scheduler.h +++ b/src/scheduler.h @@ -7,11 +7,12 @@ // // NOTE: -// boost::thread / boost::chrono should be ported to std::thread / std::chrono +// boost::thread should be ported to std::thread // when we support C++11. // -#include -#include +#include +#include +#include #include #include @@ -27,8 +28,8 @@ // s->scheduleFromNow(std::bind(Class::func, this, argument), 3); // boost::thread* t = new boost::thread(std::bind(CScheduler::serviceQueue, s)); // -// ... then at program shutdown, clean up the thread running serviceQueue: -// t->interrupt(); +// ... then at program shutdown, make sure to call stop() to clean up the thread(s) running serviceQueue: +// s->stop(); // t->join(); // delete t; // delete s; // Must be done after thread is interrupted/joined. @@ -43,7 +44,7 @@ public: typedef std::function Function; // Call func at/after time t - void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now()); + void schedule(Function f, std::chrono::system_clock::time_point t); // Convenience method: call f once deltaMilliSeconds from now void scheduleFromNow(Function f, int64_t deltaMilliSeconds); @@ -60,7 +61,7 @@ public: * Iterates through items on taskQueue and reschedules them * to be delta_seconds sooner. */ - void MockForward(boost::chrono::seconds delta_seconds); + void MockForward(std::chrono::seconds delta_seconds); // To keep things as simple as possible, there is no unschedule. @@ -75,20 +76,20 @@ public: // Returns number of tasks waiting to be serviced, // and first and last task times - size_t getQueueInfo(boost::chrono::system_clock::time_point &first, - boost::chrono::system_clock::time_point &last) const; + size_t getQueueInfo(std::chrono::system_clock::time_point &first, + std::chrono::system_clock::time_point &last) const; // Returns true if there are threads actively running in serviceQueue() bool AreThreadsServicingQueue() const; private: - std::multimap taskQueue; - boost::condition_variable newTaskScheduled; - mutable boost::mutex newTaskMutex; - int nThreadsServicingQueue; - bool stopRequested; - bool stopWhenEmpty; - bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } + mutable Mutex newTaskMutex; + std::condition_variable newTaskScheduled; + std::multimap taskQueue GUARDED_BY(newTaskMutex); + int nThreadsServicingQueue GUARDED_BY(newTaskMutex); + bool stopRequested GUARDED_BY(newTaskMutex); + bool stopWhenEmpty GUARDED_BY(newTaskMutex); + bool shouldStop() const EXCLUSIVE_LOCKS_REQUIRED(newTaskMutex) { return stopRequested || (stopWhenEmpty && taskQueue.empty()); } }; /** diff --git a/src/test/scheduler_tests.cpp b/src/test/scheduler_tests.cpp index 00eec58bdfe..90810595b2d 100644 --- a/src/test/scheduler_tests.cpp +++ b/src/test/scheduler_tests.cpp @@ -11,13 +11,13 @@ BOOST_AUTO_TEST_SUITE(scheduler_tests) -static void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, boost::chrono::system_clock::time_point rescheduleTime) +static void microTask(CScheduler& s, boost::mutex& mutex, int& counter, int delta, std::chrono::system_clock::time_point rescheduleTime) { { boost::unique_lock lock(mutex); counter += delta; } - boost::chrono::system_clock::time_point noTime = boost::chrono::system_clock::time_point::min(); + std::chrono::system_clock::time_point noTime = std::chrono::system_clock::time_point::min(); if (rescheduleTime != noTime) { CScheduler::Function f = std::bind(µTask, std::ref(s), std::ref(mutex), std::ref(counter), -delta + 1, noTime); s.schedule(f, rescheduleTime); @@ -45,15 +45,15 @@ BOOST_AUTO_TEST_CASE(manythreads) auto randomMsec = [](FastRandomContext& rc) -> int { return -11 + (int)rc.randrange(1012); }; // [-11, 1000] auto randomDelta = [](FastRandomContext& rc) -> int { return -1000 + (int)rc.randrange(2001); }; // [-1000, 1000] - boost::chrono::system_clock::time_point start = boost::chrono::system_clock::now(); - boost::chrono::system_clock::time_point now = start; - boost::chrono::system_clock::time_point first, last; + std::chrono::system_clock::time_point start = std::chrono::system_clock::now(); + std::chrono::system_clock::time_point now = start; + std::chrono::system_clock::time_point first, last; size_t nTasks = microTasks.getQueueInfo(first, last); BOOST_CHECK(nTasks == 0); for (int i = 0; i < 100; ++i) { - boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng)); - boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng)); + std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng)); + std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng)); int whichCounter = zeroToNine(rng); CScheduler::Function f = std::bind(µTask, std::ref(microTasks), std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]), @@ -71,14 +71,14 @@ BOOST_AUTO_TEST_CASE(manythreads) microThreads.create_thread(std::bind(&CScheduler::serviceQueue, µTasks)); UninterruptibleSleep(std::chrono::microseconds{600}); - now = boost::chrono::system_clock::now(); + now = std::chrono::system_clock::now(); // More threads and more tasks: for (int i = 0; i < 5; i++) microThreads.create_thread(std::bind(&CScheduler::serviceQueue, µTasks)); for (int i = 0; i < 100; i++) { - boost::chrono::system_clock::time_point t = now + boost::chrono::microseconds(randomMsec(rng)); - boost::chrono::system_clock::time_point tReschedule = now + boost::chrono::microseconds(500 + randomMsec(rng)); + std::chrono::system_clock::time_point t = now + std::chrono::microseconds(randomMsec(rng)); + std::chrono::system_clock::time_point tReschedule = now + std::chrono::microseconds(500 + randomMsec(rng)); int whichCounter = zeroToNine(rng); CScheduler::Function f = std::bind(µTask, std::ref(microTasks), std::ref(counterMutex[whichCounter]), std::ref(counter[whichCounter]), @@ -157,14 +157,14 @@ BOOST_AUTO_TEST_CASE(mockforward) scheduler.scheduleFromNow(dummy, 8*min_in_milli); // check taskQueue - boost::chrono::system_clock::time_point first, last; + std::chrono::system_clock::time_point first, last; size_t num_tasks = scheduler.getQueueInfo(first, last); BOOST_CHECK_EQUAL(num_tasks, 3ul); std::thread scheduler_thread([&]() { scheduler.serviceQueue(); }); // bump the scheduler forward 5 minutes - scheduler.MockForward(boost::chrono::seconds(5*60)); + scheduler.MockForward(std::chrono::seconds(5*60)); // ensure scheduler has chance to process all tasks queued for before 1 ms from now. scheduler.scheduleFromNow([&scheduler]{ scheduler.stop(false); }, 1); @@ -178,8 +178,8 @@ BOOST_AUTO_TEST_CASE(mockforward) BOOST_CHECK_EQUAL(counter, 2); // check that the time of the remaining job has been updated - boost::chrono::system_clock::time_point now = boost::chrono::system_clock::now(); - int delta = boost::chrono::duration_cast(first - now).count(); + std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); + int delta = std::chrono::duration_cast(first - now).count(); // should be between 2 & 3 minutes from now BOOST_CHECK(delta > 2*60 && delta < 3*60); } diff --git a/test/lint/lint-includes.sh b/test/lint/lint-includes.sh index ced2fd2bb69..1cece6a5252 100755 --- a/test/lint/lint-includes.sh +++ b/test/lint/lint-includes.sh @@ -53,7 +53,6 @@ EXPECTED_BOOST_INCLUDES=( boost/algorithm/string/classification.hpp boost/algorithm/string/replace.hpp boost/algorithm/string/split.hpp - boost/chrono/chrono.hpp boost/date_time/posix_time/posix_time.hpp boost/filesystem.hpp boost/filesystem/fstream.hpp