NexusForce 1.0.0
A Modern C++ Library with extended functionality, web components, and utility libraries
载入中...
搜索中...
未找到
timer.hpp
浏览该文件的文档.
1#ifndef NEFORCE_CORE_ASYNC_TIMER_HPP__
2#define NEFORCE_CORE_ASYNC_TIMER_HPP__
3
11
20NEFORCE_BEGIN_NAMESPACE__
21
27
33
42template <typename Clock>
44public:
45 using clock_type = Clock;
46 using time_point = typename clock_type::time_point;
47 using duration = typename clock_type::duration;
48 using token = size_t;
49 using handler_type = function<void()>;
50
51private:
58 struct node {
59 time_point expire;
60 token id;
61 handler_type handler;
62
63 node(time_point exp, const token tid, handler_type&& h) :
64 expire(exp),
65 id(tid),
66 handler(move(h)) {}
67
68 node(const node&) = default;
69 node& operator=(const node&) = default;
70 node(node&&) = default;
71 node& operator=(node&&) = default;
72
73 ~node() = default;
74
78 bool operator<(const node& other) const {
79 if (expire < other.expire) {
80 return true;
81 }
82 if (expire > other.expire) {
83 return false;
84 }
85 return id < other.id;
86 }
87 };
88
89 set<node> nodes_;
93
94 thread thread_;
95 mutable mutex mutex_;
97 token next_id_{1};
98 atomic<bool> stopped_{false};
99
100 friend class thread_pool;
101
102private:
111 void run() {
112 while (!stopped_.load()) {
113 unique_lock<mutex> lock(mutex_);
114
115 if (nodes_.empty()) {
116 cv_.wait_for(lock, 500_ms, [this] { return stopped_.load() || !nodes_.empty(); });
117 if (stopped_.load()) {
118 break;
119 }
120 }
121
122 time_point now = clock_type::now();
123 while (!nodes_.empty() && nodes_.begin()->expire <= now) {
124 auto it = nodes_.begin();
125 node current_node = *it;
126 nodes_.erase(it);
127 node_map_.erase(current_node.id);
128
129 lock.unlock_quiet();
130 if (!stopped_.load()) {
131 current_node.handler();
132 }
133 lock.lock_quiet();
134
135 cancel_flags_.erase(current_node.id);
136 promises_.erase(current_node.id);
137 now = clock_type::now();
138 }
139
140 if (!nodes_.empty()) {
141 auto expire_time = nodes_.begin()->expire;
142 cv_.wait_until(lock, expire_time, [this] {
143 return stopped_.load(memory_order_acquire) || nodes_.empty() ||
144 nodes_.begin()->expire <= clock_type::now();
145 });
146 }
147 }
148 }
149
150public:
154 timer_scheduler() { thread_ = thread(&timer_scheduler::run, this); }
155
160 stopped_.store(true);
161 cv_.notify_one();
162 if (thread_.joinable()) {
163 thread_.join();
164 }
165 }
166
167 timer_scheduler(const timer_scheduler&) = delete;
168 timer_scheduler& operator=(const timer_scheduler&) = delete;
170 timer_scheduler& operator=(timer_scheduler&&) = delete;
171
181 unique_lock<mutex> lock(mutex_);
182 token id = next_id_++;
183
184 auto flag = make_shared<atomic<bool>>(false);
186 handler_type wrapped = [flag, h = move(handler), promise]() {
187 if (!flag->load(memory_order_acquire)) {
188 h();
189 }
191 };
192
193 const bool is_earliest = nodes_.empty() || expire < nodes_.begin()->expire;
194
195 node new_node(expire, id, _NEFORCE move(wrapped));
196 auto result = nodes_.insert(new_node);
197 node_map_[id] = result.first;
198 cancel_flags_[id] = move(flag);
199 promises_[id] = promise;
200
201 if (is_earliest) {
202 cv_.notify_one();
203 }
204 lock.unlock_quiet();
205 return id;
206 }
207
215 bool cancel(token id) {
216 unique_lock<mutex> lock(mutex_);
217
218 const auto flag_it = cancel_flags_.find(id);
219 if (flag_it == cancel_flags_.end()) {
220 return false;
221 }
222 flag_it->second->store(true, memory_order_release);
223
224 auto node_it = node_map_.find(id);
225 if (node_it != node_map_.end()) {
226 const bool is_earliest = (node_it->second == nodes_.begin());
227 nodes_.erase(node_it->second);
228 node_map_.erase(node_it);
229
230 if (is_earliest) {
231 cv_.notify_one();
232 }
233
234 cancel_flags_.erase(flag_it);
235 promises_.erase(id);
236 } else {
237 const auto prom_it = promises_.find(id);
239 if (prom_it != promises_.end()) {
240 prom = prom_it->second;
241 promises_.erase(prom_it);
242 }
243 lock.unlock_quiet();
244
245 if (prom) {
246 prom->get_future().wait();
247 }
248 }
249
250 return true;
251 }
252
256 void cancel_all() {
257 unique_lock<mutex> lock(mutex_);
258 for (auto& cancel_flag: cancel_flags_) {
259 cancel_flag.second->store(true, memory_order_release);
260 }
261 nodes_.clear();
262 node_map_.clear();
263 cancel_flags_.clear();
264 lock.unlock_quiet();
265 cv_.notify_one();
266 }
267
272 NEFORCE_NODISCARD size_t size() const {
273 lock<mutex> lock(mutex_);
274 return nodes_.size();
275 }
276
282 NEFORCE_NODISCARD bool is_pending(token id) const {
283 lock<mutex> lock(mutex_);
284 return node_map_.find(id) != node_map_.end();
285 }
286};
287
296template <typename Clock>
297class basic_timer {
298public:
299 using clock_type = Clock;
300 using time_point = typename clock_type::time_point;
301 using duration = typename clock_type::duration;
304
305private:
307 token task_id_{0};
308 time_point expire_{clock_type::now()};
309
310public:
311 basic_timer() :
312 scheduler_(make_shared<timer_scheduler<Clock>>()) {}
313
318 if (scheduler_) {
319 cancel();
320 }
321 }
322
323 basic_timer(const basic_timer&) = delete;
324 basic_timer& operator=(const basic_timer&) = delete;
325
329 basic_timer(basic_timer&& other) noexcept :
330 scheduler_(_NEFORCE move(other.scheduler_)),
331 task_id_(other.task_id_),
332 expire_(other.expire_) {
333 other.task_id_ = 0;
334 }
335
339 basic_timer& operator=(basic_timer&& other) noexcept {
340 if (_NEFORCE addressof(other) == this) {
341 return *this;
342 }
343
344 cancel();
345 scheduler_ = _NEFORCE move(other.scheduler_);
346 task_id_ = other.task_id_;
347 expire_ = other.expire_;
348 other.task_id_ = 0;
349
350 return *this;
351 }
352
359 void expires_at(const time_point& expiry_time) {
360 cancel();
361 expire_ = expiry_time;
362 }
363
370 void expires_after(const duration& expiry_duration) {
371 cancel();
372 expire_ = clock_type::now() + expiry_duration;
373 }
374
380
385 NEFORCE_NODISCARD time_point expiry() const { return expire_; }
386
391 NEFORCE_NODISCARD bool is_active() const { return task_id_ != 0 && scheduler_ && scheduler_->is_pending(task_id_); }
392
401 template <typename WaitHandler>
402 void async_wait(WaitHandler&& handler) {
403 cancel();
404 task_id_ = scheduler_->add_task(expire_, handler_type(_NEFORCE forward<WaitHandler>(handler)));
405 }
406
412 void cancel() {
413 if (scheduler_ && task_id_ != 0) {
414 scheduler_->cancel(task_id_);
415 task_id_ = 0;
416 }
417 }
418};
419
424
429 // AsyncTimer
431 // AsyncComponents
433
434NEFORCE_END_NAMESPACE__
435#endif // NEFORCE_CORE_ASYNC_TIMER_HPP__
原子类型完整实现
基本定时器
void expires_from_now(const int64_t ms)
设置从当前时间开始的毫秒数
void async_wait(WaitHandler &&handler)
异步等待定时器到期
void expires_after(const duration &expiry_duration)
设置相对到期时间
void expires_at(const time_point &expiry_time)
设置绝对到期时间
void cancel()
取消定时任务
typename timer_scheduler< Clock >::token token
任务标识符类型
typename timer_scheduler< Clock >::handler_type handler_type
回调函数类型
typename clock_type::duration duration
时长类型
basic_timer & operator=(basic_timer &&other) noexcept
移动赋值运算符
Clock clock_type
时钟类型
basic_timer(basic_timer &&other) noexcept
移动构造函数
~basic_timer()
析构函数,自动取消未完成的任务
typename clock_type::time_point time_point
时间点类型
time_point expiry() const
获取到期时间点
bool is_active() const
检查定时器是否活跃(有待执行的任务)
函数包装器主模板声明
锁管理器模板
映射容器
定义 map.hpp:37
非递归互斥锁
Promise类模板
void set_value(Res value)
设置结果值
集合容器
共享智能指针类模板
线程类
定时任务调度器
size_t token
任务标识符类型
typename clock_type::time_point time_point
时间点类型
bool is_pending(token id) const
检查任务是否仍在等待或执行中
timer_scheduler()
构造函数,启动调度线程
~timer_scheduler()
析构函数,停止调度线程并等待其结束
Clock clock_type
时钟类型
typename clock_type::duration duration
时长类型
bool cancel(token id)
取消定时任务
size_t size() const
获取当前待处理的任务数量
void cancel_all()
取消所有定时任务
token add_task(time_point expire, handler_type &&handler)
添加定时任务
function< void()> handler_type
回调函数类型
独占锁管理器模板
条件变量行为
集合容器
通用函数包装器
constexpr T && forward(remove_reference_t< T > &x) noexcept
完美转发左值
constexpr T * addressof(T &x) noexcept
获取对象的地址
basic_timer< system_clock > system_timer
基于系统时钟的定时器
basic_timer< steady_clock > steady_timer
基于稳定时钟的定时器
long long int64_t
64位有符号整数类型
duration< int64_t, milli > milliseconds
毫秒持续时间
constexpr auto memory_order_release
释放内存顺序常量
constexpr auto memory_order_acquire
获取内存顺序常量
uint64_t size_t
无符号大小类型
enable_if_t<!is_unbounded_array_v< T > &&is_constructible_v< T, Args... >, shared_ptr< T > > make_shared(Args &&... args)
融合分配创建共享指针
constexpr Iterator2 move(Iterator1 first, Iterator1 last, Iterator2 result) noexcept(noexcept(inner::__move_aux(first, last, result)))
移动范围元素
thread::id id() noexcept
获取当前线程标识符
映射容器
NeForce 异步任务包装器
共享智能指针实现
static time_point now() noexcept
获取当前时间点
时间点类模板
线程管理类