|
//facebook的工业级实现:https://github.com/facebook/folly/blob/master/folly/experimental/FunctionScheduler.h//本文对原代码做些许删减、调整,保留主体,便于阅读//.h#pragma once#include <chrono>#include <condition_variable>#include <mutex>#include <thread>#include <unordered_map>#include <vector>#include <string>/** * Schedules any number of functions to run at various intervals. E.g., * * FunctionScheduler fs; * * fs.addFunction([&] { LOG(INFO) << &#34;tick...&#34;; }, seconds(1), &#34;ticker&#34;); * fs.addFunction(std::bind(&TestClass::doStuff, this), minutes(5), &#34;stuff&#34;); * fs.start(); * ........ * fs.cancelFunction(&#34;ticker&#34;); * fs.addFunction([&] { LOG(INFO) << &#34;tock...&#34;; }, minutes(3), &#34;tocker&#34;); * ........ * fs.shutdown(); * * * Note: the class uses only one thread - if you want to use more than one * thread, either use multiple FunctionScheduler objects, or check out * ThreadedRepeatingFunctionRunner.h for a much simpler contract of * &#34;run each function periodically in its own thread&#34;. * * start() schedules the functions, while shutdown() terminates further * scheduling. */ //A type alias for function that is called to determine the time interval for the next scheduled run.using IntervalDistributionFunc = std::function<std::chrono::microseconds()>;//A type alias for function that returns the next run time, given the current start time.using NextRunTimeFunc = std::function<std::chrono::steady_clock::time_point(std::chrono::steady_clock::time_point)>;struct RepeatFunc{ std::function<void()> cb; NextRunTimeFunc nextRunTimeFunc; std::chrono::steady_clock::time_point nextRunTime; std::string name; std::chrono::microseconds startDelay; std::string intervalDescr; bool runOnce; RepeatFunc(std::function<void()>&& cback, IntervalDistributionFunc&& intervalFn, const std::string& nameID, const std::string& intervalDistDescription, std::chrono::microseconds delay, bool once) : cb(std::move(cback)), nextRunTimeFunc(getNextRunTimeFunc(std::move(intervalFn))), name(nameID), intervalDescr(intervalDistDescription), startDelay(delay), runOnce(once) {} static NextRunTimeFunc getNextRunTimeFunc(IntervalDistributionFunc&& intervalFn) { return[intervalFn = std::move(intervalFn)](std::chrono::steady_clock::time_point curTime) mutable { return curTime + intervalFn(); }; } std::chrono::steady_clock::time_point getNextRunTime() const { return nextRunTime; } void setNextRunTimeSteady() { nextRunTime = nextRunTimeFunc(nextRunTime); } void setNextRunTimeStrict(std::chrono::steady_clock::time_point curTime) { nextRunTime = nextRunTimeFunc(curTime); } void resetNextRunTime(std::chrono::steady_clock::time_point curTime) { nextRunTime = curTime + startDelay; } void cancel() { // Simply reset cb to an empty function. cb = {}; } bool isValid() const { return bool(cb); }};class FunctionScheduler {public: FunctionScheduler(); ~FunctionScheduler(); /** * Starts the scheduler. * Returns false if the scheduler was already running. */ bool start(); /** * Stops the FunctionScheduler. * It may be restarted later by calling start() again. * Returns false if the scheduler was not running. */ bool shutdown(); /** * By default steady is false, meaning schedules may lag behind overtime. * This could be due to long running tasks or time drift because of randomness * in thread wakeup time. * By setting steady to true, FunctionScheduler will attempt to catch up. * i.e. more like a cronjob * * NOTE: it&#39;s only safe to set this before calling start() */ void setSteady(bool steady) { steady_ = steady; } /** * Adds a new function to the FunctionScheduler. * Functions will not be run until start() is called. When start() is called, each function will be run after its specified startDelay. * Functions may also be added after start() has been called, in which case startDelay is still honored. * Throws an exception on error. In particular, each function must have a unique name--two functions cannot be added with the same name. */ void addFunction(std::function<void()>&& cb, std::chrono::microseconds interval, std::string nameID, std::chrono::microseconds startDelay = std::chrono::microseconds(0)); //Adds a new function to the FunctionScheduler to run only once. void addFunctionOnce(std::function<void()>&& cb, std::string nameID, std::chrono::microseconds startDelay = std::chrono::microseconds(0)); /** * Cancels the function with the specified name, so it will no longer be run. * Returns false if no function exists with the specified name. */ bool cancelFunction(std::string nameID); bool cancelFunctionAndWait(std::string nameID);private: struct RunTimeOrder { bool operator()(const std::unique_ptr<RepeatFunc>& f1,const std::unique_ptr<RepeatFunc>& f2) const { return f1->getNextRunTime() > f2->getNextRunTime(); } }; typedef std::vector<std::unique_ptr<RepeatFunc>> FunctionHeap; typedef std::unordered_map<std::string, RepeatFunc*> FunctionMap; void run(); void runOneFunction(std::unique_lock<std::mutex>& lock,std::chrono::steady_clock::time_point now); template <typename IntervalFunc> void addFunctionToHeapChecked(std::function<void()>&& cb,IntervalFunc&& fn,const std::string& nameID, const std::string& intervalDescr, std::chrono::microseconds startDelay, bool runOnce); std::thread thread_; std::mutex mutex_; bool running_{ false }; FunctionHeap functions_;// This is a heap, ordered by next run time. FunctionMap functionsMap_; RunTimeOrder fnCmp_; // The function currently being invoked by the running thread.This is null when the running thread is idle RepeatFunc* currentFunction_{ nullptr }; // Condition variable that is signalled whenever a new function is added or when the FunctionScheduler is stopped. std::condition_variable runningCondvar_; bool steady_{ false }; bool cancellingCurrentFunction_{ false };};//.cpp#include <random>#include <iostream>#include <algorithm>#include <cassert>#include <stdexcept>#include &#34;timer.h&#34;using std::chrono::microseconds;using std::chrono::steady_clock;struct ConstIntervalFunctor { const microseconds constInterval; explicit ConstIntervalFunctor(microseconds interval): constInterval(interval) { if (interval < microseconds::zero()) { throw std::invalid_argument(&#34;FunctionScheduler: time interval must be non-negative&#34;); } } microseconds operator()() const { return constInterval; }};FunctionScheduler::FunctionScheduler() = default;FunctionScheduler::~FunctionScheduler() { shutdown();}bool FunctionScheduler::start(){ std::unique_lock<std::mutex> lock(mutex_); if (running_) { return false; } std::cout << &#34;Starting FunctionScheduler with &#34; << functions_.size() << &#34; functions.&#34;; auto now = steady_clock::now(); // Reset the next run time. for all functions. this is needed since one can shutdown() and start() again for (const auto& f : functions_) { f->resetNextRunTime(now); std::cout << &#34; - func: &#34; << (f->name.empty() ? &#34;(anon)&#34; : f->name.c_str()) << &#34;, period = &#34; << f->intervalDescr << &#34;, delay = &#34; << f->startDelay.count() << &#34;ms&#34; << std::endl; } std::make_heap(functions_.begin(), functions_.end(), fnCmp_); thread_ = std::thread([&] { this->run(); }); running_ = true; return true;}bool FunctionScheduler::shutdown(){ { std::lock_guard<std::mutex> lock(mutex_); if (!running_) { return false; } running_ = false; runningCondvar_.notify_one(); } thread_.join(); return true;}void FunctionScheduler::addFunction(std::function<void()>&& cb, microseconds interval, std::string nameID, microseconds startDelay){ addFunctionToHeapChecked(std::move(cb), ConstIntervalFunctor(interval), nameID, std::to_string(interval.count()) + &#34;us&#34;, startDelay, false /*runOnce*/);}void FunctionScheduler::addFunctionOnce(std::function<void()>&& cb, std::string nameID, microseconds startDelay) { addFunctionToHeapChecked(std::move(cb), ConstIntervalFunctor(microseconds::zero()), nameID, &#34;once&#34;, startDelay, true /*runOnce*/);}template <typename IntervalFunc>void FunctionScheduler::addFunctionToHeapChecked(std::function<void()>&& cb, IntervalFunc&& fn, const std::string& nameID, const std::string& intervalDescr, microseconds startDelay, bool runOnce){ if (!cb) { throw std::invalid_argument(&#34;FunctionScheduler: Scheduled function must be set&#34;); } if (startDelay < microseconds::zero()) { throw std::invalid_argument(&#34;FunctionScheduler: start delay must be non-negative&#34;); } std::unique_lock<std::mutex> lock(mutex_); auto it = functionsMap_.find(nameID); if (it != functionsMap_.end() && it->second->isValid()) { throw std::invalid_argument(&#34;FunctionScheduler: a function named \&#34;&#34; + nameID +&#34;\&#34; already exists&#34;); } if (currentFunction_ && currentFunction_->name == nameID) { throw std::invalid_argument(&#34;FunctionScheduler: a function named \&#34;&#34; + nameID + &#34;\&#34; already exists&#34;); } std::unique_ptr<RepeatFunc> func = std::make_unique<RepeatFunc>(std::move(cb), std::forward<IntervalFunc>(fn), nameID, intervalDescr, startDelay, runOnce); assert(lock.mutex() == &mutex_); assert(lock.owns_lock()); functions_.push_back(std::move(func)); functionsMap_[functions_.back()->name] = functions_.back().get(); if (running_) { functions_.back()->resetNextRunTime(steady_clock::now()); std::push_heap(functions_.begin(), functions_.end(), fnCmp_); // Signal the running thread to wake up and see if it needs to change its current scheduling decision. runningCondvar_.notify_one(); }}bool FunctionScheduler::cancelFunction(std::string nameID) { std::unique_lock<std::mutex> lock(mutex_); if (currentFunction_ && currentFunction_->name == nameID) { functionsMap_.erase(currentFunction_->name); // This function is currently being run. Clear currentFunction_ // The running thread will see this and won&#39;t reschedule the function. currentFunction_ = nullptr; cancellingCurrentFunction_ = true; return true; } auto it = functionsMap_.find(nameID); if (it != functionsMap_.end() && it->second->isValid()) { functionsMap_.erase(it->second->name); it->second->cancel(); return true; } return false;}bool FunctionScheduler::cancelFunctionAndWait(std::string nameID) { std::unique_lock<std::mutex> lock(mutex_); if (currentFunction_ && currentFunction_->name == nameID) { functionsMap_.erase(currentFunction_->name); // This function is currently being run. Clear currentFunction_ // The running thread will see this and won&#39;t reschedule the function. currentFunction_ = nullptr; cancellingCurrentFunction_ = true; runningCondvar_.wait(lock, [this]() { return !cancellingCurrentFunction_; }); return true; } auto it = functionsMap_.find(nameID); if (it != functionsMap_.end() && it->second->isValid()) { functionsMap_.erase(it->second->name); it->second->cancel(); return true; } return false;}void FunctionScheduler::run() { std::unique_lock<std::mutex> lock(mutex_); while (running_) { if (functions_.empty()) { runningCondvar_.wait(lock); continue; } const auto now = steady_clock::now(); std::pop_heap(functions_.begin(), functions_.end(), fnCmp_); if (!functions_.back()->isValid()) { functions_.pop_back(); continue; } const auto sleepTime = functions_.back()->getNextRunTime() - now; if (sleepTime < microseconds::zero()) { runOneFunction(lock, now); runningCondvar_.notify_all(); } else { // Re-add the function to the heap, and wait until we actually need to run it. std::push_heap(functions_.begin(), functions_.end(), fnCmp_); runningCondvar_.wait_for(lock, sleepTime); } }}void FunctionScheduler::runOneFunction(std::unique_lock<std::mutex>& lock, steady_clock::time_point now) { assert(lock.mutex() == &mutex_); assert(lock.owns_lock()); // The function to run will be at the end of functions_ already. // Fully remove it from functions_ now. // We need to release mutex_ while we invoke this function, and we need to maintain the heap property on functions_ while mutex_ is unlocked. auto func = std::move(functions_.back()); functions_.pop_back(); if (!func->cb) { std::cout << func->name << &#34;function has been canceled while waiting&#34; << std::endl; return; } currentFunction_ = func.get(); if (steady_) { // This allows scheduler to catch up func->setNextRunTimeSteady(); } else { // Note that we set nextRunTime based on the current time where we started // the function call, rather than the time when the function finishes. // This ensures that we call the function once every time interval, as // opposed to waiting time interval seconds between calls. (These can be // different if the function takes a significant amount of time to run.) func->setNextRunTimeStrict(now); } lock.unlock(); try { std::cout << &#34;Now running &#34; << func->name << std::endl; func->cb(); } catch (const std::exception& ex) { std::cout << &#34;Error running the scheduled function <&#34; << func->name<< &#34;>: &#34; << ex.what() << std::endl; } lock.lock(); if (!currentFunction_) { // The function was cancelled while we were running it. We shouldn&#39;t reschedule it; cancellingCurrentFunction_ = false; return; } if (currentFunction_->runOnce) { // Don&#39;t reschedule if the function only needed to run once. functionsMap_.erase(currentFunction_->name); currentFunction_ = nullptr; return; } // Re-insert the function into our functions_ heap. // We only maintain the heap property while running_ is set. (running_ may have been cleared while we were invoking the user&#39;s function.) functions_.push_back(std::move(func)); // Clear currentFunction_ currentFunction_ = nullptr; if (running_) { std::push_heap(functions_.begin(), functions_.end(), fnCmp_); }}
一些解析:
思路:启动一个单独的线程,借助condition_variable::wait_for(mutex&, std::chrono::duration<>&)让线程睡眠要定时的时间。
定时精度:实际是利用内核的线程调度实现计时,因此受内核精度限制,毫秒级的准确度应该没有问题
语法:
(1)Lambda捕获列表初始化
Lambda捕获列表初始化最重要的一点是“支持Capture by Move”。在C++14之前,Lambda是不支持捕获一个Move-Only的对象的,比如:
std::unique_ptr<int> uptr = std::make_unique<int>(123);
auto callback = [uptr]() { // 编译错误,uptr is move-only
std::cout << *uptr << std::endl;
};
通过捕获列表初始化,完成Move-Only对象的“Capture by Move”:
std::unique_ptr<int> uptr = std::make_unique<int>(123);
auto callback = [uptr = std::move(uptr)]() { //将uptr移动给Lambda表达式中的参数
std::cout << *uptr << std::endl;
};
//... 将callback传给另一个线程
//return=>uptr是nullptr
具体见FOCUS:现代 C++:Lambda 表达式
(2)堆
heap本质上是用array或者vector实现的完全二叉树,二叉树的root节点代表整个heap的最大值(大根堆)或最小值(小根堆)
C++并没有将heap作为容器,而是作为算法放到<algorithm>中,默认是大根堆,可以通过指定比较算法构造小根堆。常用的API有以下几个:
• std::make_heap(RandomIt first, RandomIt last, Compare comp): 在范围 [first, last) 中构造最大堆。
• std::push_heap(RandomIt first, RandomIt last, Compare comp): 范围[first, last-1)已经是最大堆,将位于位置last-1的元素插入堆中。
• std::pop_heap(RandomIt first, RandomIt last, Compare comp): 交换在位置 first 的值和在位置 last-1 的值,并令子范围 [first, last-1) 变为堆。这拥有从范围 [first, last) 所定义的堆移除首个元素的效果。
make_heap之后的堆顶元素需要使用front(),push_heap之后max元素在front(),pop_heap之后max元素在back()
typedef std::vector<int> Heap;
Heap ints = {10,8,9,3,5,6,7,1,2,4};
std::make_heap(ints.begin(), ints.end()); //front是10
std::pop_heap(ints.begin(), ints.end()); //back是10,front是9
ints.pop_back();
ints.push_back(12);
std::push_heap(ints.begin(), ints.end()); //front是12
具体见:龙虾天天:Heap题型刷题套路
(3)std::function置空
std::function<void()> func;
std::cout << (bool)func << std::endl; //0
func = {};
std::cout << (bool)func << std::endl; //0
func = nullptr;
std::cout << (bool)func << std::endl; //0
using std::chrono::microseconds;
struct ConstIntervalFunctor
{
const std::chrono::microseconds constInterval;
explicit ConstIntervalFunctor(std::chrono::microseconds interval) : constInterval(interval)
{
if (interval < std::chrono::microseconds::zero())
{
throw std::invalid_argument(&#34;FunctionScheduler: time interval must be non-negative&#34;);
}
}
std::chrono::microseconds operator()() const { return constInterval; }
};
using IntervalDistributionFunc = std::function<std::chrono::microseconds()>;
using NextRunTimeFunc = std::function<std::chrono::steady_clock::time_point(std::chrono::steady_clock::time_point)>;
NextRunTimeFunc getNextRunTimeFunc(IntervalDistributionFunc&& intervalFn)
{
return[intervalFn = std::move(intervalFn)](std::chrono::steady_clock::time_point curTime) mutable
{
return curTime + intervalFn();
};
}
int main(int argc, char*argv[])
{
IntervalDistributionFunc intFunc = ConstIntervalFunctor(microseconds(3000));
NextRunTimeFunc nextFunc = getNextRunTimeFunc(std::move(intFunc));
std::chrono::steady_clock::time_point curTime = std::chrono::steady_clock::now();
std::cout << curTime.time_since_epoch().count() << std::endl; //241856668842400
std::chrono::steady_clock::time_point retTime = nextFunc(curTime);
std::cout << retTime.time_since_epoch().count() << std::endl; //241856671842400
} |
|