NexusForce 1.0.0
A Modern C++ Library with extended functionality, web components, and utility libraries
载入中...
搜索中...
未找到
thread_pool.hpp
浏览该文件的文档.
1#ifndef NEFORCE_CORE_ASYNC_THREAD_POOL_HPP__
2#define NEFORCE_CORE_ASYNC_THREAD_POOL_HPP__
3
11
19NEFORCE_BEGIN_NAMESPACE__
20
26
32
40struct task_group {
41 task_group() = default;
42 ~task_group() = default;
43
45
49 void increment() noexcept { running_count.fetch_add(1, memory_order_relaxed); }
50
56 void decrement() noexcept {
57 if (running_count.fetch_sub(1, memory_order_release) == 1) {
58 running_count.notify_all();
59 }
60 }
61
65 void wait() const noexcept {
67 while (count != 0) {
68 running_count.wait(count);
70 }
71 }
72};
73
81class NEFORCE_API local_queue {
82public:
87 enum class steal_strategy : uint8_t {
88 half,
89 fixed_batch,
90 single,
91 adaptive
92 };
93
94 static constexpr size_t queue_size = 256;
95
96private:
97 static steal_strategy steal_strategy_;
98 static uint32_t fixed_batch_size_;
99
100 array<function<void()>, queue_size> tasks_;
101 atomic<uint64_t> head_{0};
102 atomic<uint32_t> tail_{0};
103
104private:
105 constexpr static size_t mask_ = queue_size - 1;
106
107 NEFORCE_NODISCARD static uint64_t pack(const uint32_t steal, const uint32_t local_head) noexcept {
108 return static_cast<uint64_t>(steal) << 32 | static_cast<uint64_t>(local_head);
109 }
110
111 NEFORCE_NODISCARD static pair<uint32_t, uint32_t> unpack(const uint64_t head) noexcept {
112 return {static_cast<uint32_t>(head >> 32), static_cast<uint32_t>(head)};
113 }
114
115 uint32_t be_stolen_by_impl(local_queue& dst, uint32_t dst_tail);
116
117public:
118 local_queue() = default;
119 ~local_queue() = default;
120 local_queue(const local_queue&) = delete;
121 local_queue& operator=(const local_queue&) = delete;
122 local_queue(local_queue&& other) noexcept;
123 local_queue& operator=(local_queue&& other) noexcept;
124
129 NEFORCE_NODISCARD size_t capacity() const noexcept { return tasks_.size(); }
130
135 NEFORCE_NODISCARD bool empty() const noexcept { return size() == 0U; }
136
141 NEFORCE_NODISCARD size_t remain_size() const noexcept {
142 const auto tail = tail_.load(memory_order_acquire);
143 const auto head = head_.load(memory_order_acquire);
144 const auto steal = unpack(head).first;
145 const auto used = static_cast<size_t>(tail - steal);
146 const size_t remain = capacity() - used;
147 return remain;
148 }
149
154 NEFORCE_NODISCARD size_t size() const noexcept {
155 const auto tail = tail_.load(memory_order_acquire);
156 const auto head = head_.load(memory_order_acquire);
157 const auto local_head = unpack(head).second;
158 return static_cast<size_t>(tail - local_head);
159 }
160
166 static void set_steal_strategy(const steal_strategy strategy, const uint32_t batch_size = 4) {
167 steal_strategy_ = strategy;
168 fixed_batch_size_ = batch_size;
169 }
170
175 void push_back(function<void()> task) {
176 const uint32_t tail = tail_.load(memory_order_relaxed);
177 tasks_[tail & mask_] = move(task);
178 tail_.store(tail + 1, memory_order_release);
179 }
180
186
192 optional<function<void()>> be_stolen_by(local_queue& dst_queue);
193};
194
201struct NEFORCE_API worker_context {
203
205 id_type id{0};
208
209 worker_context() = default;
210 worker_context(const worker_context&) = delete;
211 worker_context& operator=(const worker_context&) = delete;
212 worker_context(worker_context&& other) noexcept;
213 worker_context& operator=(worker_context&& other) noexcept;
214};
215
216
223struct task_info {
234
235 enum class priority_type : uint32_t {
236 };
237
238 const uint64_t id;
244 string error;
245 priority_type priority;
246
252 explicit task_info(const uint64_t task_id, const priority_type priority) :
253 id(task_id),
255
260 NEFORCE_NODISCARD bool is_finished() const noexcept {
261 const auto s = status.load(memory_order_acquire);
262 return s == status::completed || s == status::failed;
263 }
264
269 NEFORCE_NODISCARD int64_t exec_time() const noexcept {
270 if (start_time.value() == 0 || finish_time.value() == 0) {
271 return -1;
272 }
273 return finish_time.value() - start_time.value();
274 }
275};
276
284template <typename T>
286 _NEFORCE future<T> future;
288
293 NEFORCE_NODISCARD explicit operator bool() const noexcept { return future.valid() && task_info; }
294};
295
296
308class NEFORCE_API thread_pool {
309public:
314 enum class pool_mode : uint8_t {
316 cached
317 };
318
326
331 struct NEFORCE_API pool_statistics : istringify<pool_statistics> {
335 size_t queue_size;
339
344 NEFORCE_NODISCARD string to_string() const;
345 };
346
350 using priority_type = task_info::priority_type;
351
353 static constexpr size_t max_idle_seconds = 60;
354
355 static size_t max_thread_threshhold() noexcept;
356
357private:
358 using task_type = function<void()>;
359
364 struct priority_task {
365 task_type task;
368
369 priority_task(task_type t, const priority_type p, shared_ptr<task_info> info) noexcept :
370 task(move(t)),
371 priority(p),
372 info(_NEFORCE move(info)) {}
373
374 bool operator<(const priority_task& other) const noexcept { return priority < other.priority; }
375 };
376
377 struct thread_pool_id_generator {
378 static uint32_t& get_id() noexcept {
379 static uint32_t pool_thread_id = 0;
380 return pool_thread_id;
381 }
382 static uint32_t get_new_id() noexcept { return get_id()++; }
383 static void reset_id() noexcept { get_id() = 0; }
384 };
385
386 unordered_map<id_type, unique_ptr<lazy_thread>> threads_map_;
387 unordered_map<id_type, worker_context> worker_contexts_;
388 vector<atomic<worker_context*>> worker_contexts_ptr_;
389 mutex worker_contexts_mtx_;
390
391 timer_scheduler<steady_clock> timer_;
392
393 id_type init_thread_size_{0};
394 size_t thread_threshhold_;
395
396 priority_queue<priority_task> task_queue_;
397 atomic<uint32_t> task_size_{0};
398 atomic<uint32_t> idle_thread_size_{0};
399 size_t task_threshhold_{task_max_threshhold};
400
401 mutable mutex task_queue_mtx_;
402 condition_variable not_full_;
403 condition_variable not_empty_;
404 condition_variable exit_cond_;
405
406 atomic<pool_mode> pool_mode_{pool_mode::fixed};
407 atomic<bool> is_running_{false};
408
409 atomic<size_t> total_submitted_tasks_{0};
410 atomic<size_t> total_completed_tasks_{0};
411 atomic<size_t> total_stolen_tasks_{0};
412 atomic<size_t> steal_worker_count_{0};
413 atomic<uint64_t> next_task_id_{0};
414
415private:
416 uint64_t generate_task_id() { return next_task_id_.fetch_add(1, memory_order_relaxed); }
417
418 void thread_function(id_type thread_id);
419 optional<task_type> try_steal_task(worker_context& ctx);
420
421 pool_statistics statistics_unsafe() const;
422
423public:
428
433
434 thread_pool(const thread_pool&) = delete;
435 thread_pool& operator=(const thread_pool&) = delete;
436
437 thread_pool(thread_pool&&) = delete;
438 thread_pool& operator=(thread_pool&&) = delete;
439
445 bool set_mode(pool_mode mode) noexcept;
446
453 bool set_steal_mode(steal_strategy strategy, uint32_t steal_batch = 4) noexcept;
454
460 bool set_task_threshhold(size_t threshhold) noexcept;
461
467 bool set_thread_threshhold(size_t threshhold) noexcept;
468
473 NEFORCE_NODISCARD bool running() const noexcept { return is_running_; }
474
479 NEFORCE_NODISCARD pool_mode mode() const noexcept { return pool_mode_; }
480
485 NEFORCE_NODISCARD pool_statistics statistics() const;
486
492 bool start(size_t init_thread_size = 3);
493
499
509 template <typename Func, typename... Args>
510 submit_result<invoke_result_t<Func, Args...>> submit_task(priority_type priority, Func&& func, Args&&... args);
511
520 template <typename Func, typename... Args>
521 submit_result<invoke_result_t<Func, Args...>> submit_task(Func&& func, Args&&... args) {
522 return this->submit_task(static_cast<priority_type>(0), _NEFORCE forward<Func>(func),
523 _NEFORCE forward<Args>(args)...);
524 }
525
536 template <typename Func, typename... Args>
538 Args&&... args);
539
549 template <typename Func, typename... Args>
550 submit_result<invoke_result_t<Func, Args...>> submit_after(int64_t delay_ms, Func&& func, Args&&... args) {
551 return this->submit_after(delay_ms, static_cast<priority_type>(0), _NEFORCE forward<Func>(func),
552 _NEFORCE forward<Args>(args)...);
553 }
554
565 template <typename Func, typename... Args>
566 periodic_token submit_every(int64_t interval_ms, priority_type priority, Func&& func, Args&&... args);
567
577 template <typename Func, typename... Args>
578 periodic_token submit_every(int64_t interval_ms, Func&& func, Args&&... args) {
579 return this->submit_every(interval_ms, static_cast<priority_type>(0), _NEFORCE forward<Func>(func),
580 _NEFORCE forward<Args>(args)...);
581 }
582
587 static void cancel_periodic_task(const periodic_token& token) {
588 if (token) {
589 token->cancelled.store(true);
590 }
591 }
592
599 template <typename... Types>
600 static tuple<future_result_t<Types>...> wait(future<Types>&&... futures) {
601 return _NEFORCE make_tuple(_NEFORCE get(futures)...);
602 }
603};
604
605
610NEFORCE_API worker_context*& get_worker_context() noexcept;
611
617
619
620template <typename Func, typename... Args>
621submit_result<invoke_result_t<Func, Args...>> thread_pool::submit_task(const priority_type priority, Func&& func,
622 Args&&... args) {
623 static_assert(is_invocable_v<Func, Args...>, "Func must be invocable with Args");
624
625 using Result = invoke_result_t<Func, Args...>;
626
627 auto info = make_shared<task_info>(generate_task_id(), priority);
628
629 const auto current_group = get_current_task_group();
630 if (current_group) {
631 current_group->increment();
632 }
633
634 auto task = _NEFORCE make_shared<packaged_task<Result()>>(
635 [func = _NEFORCE forward<Func>(func), args = _NEFORCE make_tuple(_NEFORCE forward<Args>(args)...),
636 group = current_group, info]() mutable -> Result {
637 struct context_guard {
639 shared_ptr<task_group> group_inner;
640 shared_ptr<task_group> prev_group_inner;
641
642 explicit context_guard(shared_ptr<task_info> i, shared_ptr<task_group> g) :
643 info(move(i)),
644 group_inner(move(g)) {
646 info->start_time = timestamp::now();
647 info->worker_thread_id = get_worker_context() ? get_worker_context()->id : 0;
648
649 prev_group_inner = get_current_task_group();
650 get_current_task_group() = group_inner;
651 }
652
653 ~context_guard() noexcept {
654 try {
655 info->finish_time = timestamp::now();
656 auto expected = task_info::status::running;
657 info->status.compare_exchange_strong(expected, task_info::status::completed,
659
660 get_current_task_group() = prev_group_inner;
661 if (group_inner) {
662 group_inner->decrement();
663 }
664 // NOLINTNEXTLINE(bugprone-empty-catch)
665 } catch (...) {
666 /* ignore */
667 }
668 }
669 };
670
671 context_guard guard(info, group);
672 try {
673 return _NEFORCE apply(func, args);
674 } catch (const exception& e) {
676 info->error = e.what();
677 throw;
678 } catch (...) {
680 info->error = "Unknown exception";
681 throw;
682 }
683 });
684
685 future<Result> res = task->get_future();
686 task_type job([task] { (*task)(); });
687
688 if (static_cast<uint32_t>(priority) > 0) {
689 unique_lock<mutex> lock(task_queue_mtx_);
690
691 if (!not_full_.wait_for(lock, seconds(1), [&]() -> bool { return task_queue_.size() < task_threshhold_; })) {
693 info->error = "Task queue is full";
694
695 auto dummy_task = _NEFORCE make_shared<packaged_task<Result()>>([]() -> Result { return Result(); });
696 (*dummy_task)();
697 return submit_result<Result>{dummy_task->get_future(), info};
698 }
699
700 task_queue_.emplace(move(job), priority, info);
701 ++task_size_;
702 ++total_submitted_tasks_;
703 not_empty_.notify_one();
704
705 } else {
706 auto* ctx = get_worker_context();
707
708 if (ctx != nullptr && ctx->queue.remain_size() > 0) {
709 ctx->queue.push_back(move(job));
710 ++total_submitted_tasks_;
711 } else {
712 unique_lock<mutex> lock(task_queue_mtx_);
713 if (!not_full_.wait_for(lock, seconds(1),
714 [&]() -> bool { return task_queue_.size() < task_threshhold_; })) {
716 info->error = "Task queue is full";
717
718 auto dummy_task = _NEFORCE make_shared<packaged_task<Result()>>([]() -> Result { return Result(); });
719 (*dummy_task)();
720 return submit_result<Result>{dummy_task->get_future(), info};
721 }
722
723 task_queue_.emplace(move(job), static_cast<priority_type>(0), info);
724 ++task_size_;
725 ++total_submitted_tasks_;
726 not_empty_.notify_one();
727 }
728 }
729
730 if (pool_mode_.load() == pool_mode::cached && task_size_.load() > idle_thread_size_) {
731 id_type thread_id = 0;
732
733 {
734 unique_lock<mutex> lock(task_queue_mtx_);
735 if (threads_map_.size() < thread_threshhold_) {
736 thread_id = thread_pool_id_generator::get_new_id();
737 auto worker_func = [this, thread_id]() { thread_function(thread_id); };
738 auto ptr = _NEFORCE make_unique<lazy_thread>(_NEFORCE move(worker_func));
739 threads_map_.emplace(thread_id, _NEFORCE move(ptr));
740 }
741 }
742
743 if (thread_id != 0) {
744 {
745 lock<mutex> ctx_lock(worker_contexts_mtx_);
746 if (thread_id >= worker_contexts_ptr_.size()) {
747 worker_contexts_ptr_.reserve(thread_id + 1);
748 for (size_t i = worker_contexts_ptr_.size(); i <= thread_id; ++i) {
750 tmp.store(nullptr, memory_order_relaxed);
751 worker_contexts_ptr_.emplace_back(_NEFORCE move(tmp));
752 }
753 }
754 }
755
756 threads_map_[thread_id]->start();
757 threads_map_[thread_id]->detach();
758 }
759 }
760
761 return submit_result<Result>{_NEFORCE move(res), _NEFORCE move(info)};
762}
763
764template <typename Func, typename... Args>
765submit_result<invoke_result_t<Func, Args...>>
766thread_pool::submit_after(const int64_t delay_ms, const priority_type priority, Func&& func, Args&&... args) {
767 static_assert(is_invocable_v<Func, Args...>, "Func must be invocable with Args");
768
769 using Result = invoke_result_t<Func, Args...>;
770
771 auto info = make_shared<task_info>(generate_task_id(), priority);
772
773 auto task = _NEFORCE make_shared<packaged_task<Result()>>(
774 [func = _NEFORCE forward<Func>(func), tup = _NEFORCE make_tuple(_NEFORCE forward<Args>(args)...),
775 info]() mutable {
776 struct context_guard {
777 shared_ptr<task_info> info;
778
779 explicit context_guard(shared_ptr<task_info> i) :
780 info(move(i)) {
782 info->start_time = timestamp::now();
783 info->worker_thread_id = get_worker_context() ? get_worker_context()->id : 0;
784 }
785
786 ~context_guard() noexcept {
787 info->finish_time = timestamp::now();
788 auto expected = task_info::status::running;
789 info->status.compare_exchange_strong(expected, task_info::status::completed,
791 }
792 };
793
794 context_guard guard(info);
795
796 try {
797 return _NEFORCE apply(func, tup);
798 } catch (const exception& e) {
800 info->error = e.what();
801 throw;
802 }
803 });
804
805 future<Result> res = task->get_future();
806
807 auto expire_time = steady_clock::now() + milliseconds(delay_ms);
808 timer_.add_task(expire_time, [this, task = _NEFORCE move(task), priority]() mutable {
809 this->submit_task(priority, [task]() { (*task)(); });
810 });
811
812 return submit_result<Result>{_NEFORCE move(res), _NEFORCE move(info)};
813}
814
815template <typename Func, typename... Args>
816thread_pool::periodic_token thread_pool::submit_every(int64_t interval_ms, const priority_type priority, Func&& func,
817 Args&&... args) {
819 auto task = _NEFORCE make_shared<function<void()>>(
820 [func = _NEFORCE forward<Func>(func),
821 tup = _NEFORCE make_tuple(_NEFORCE forward<Args>(args)...)]() mutable { _NEFORCE apply(func, tup); });
822 auto handler_ptr = _NEFORCE make_shared<task_type>();
823 weak_ptr<task_type> weak_handler(handler_ptr);
824 *handler_ptr = [this, state, task, interval_ms, priority, weak_handler]() {
825 if (state->cancelled.load()) {
826 return;
827 }
828
829 this->submit_task(priority, [task]() { (*task)(); });
830
831 if (state->cancelled.load()) {
832 return;
833 }
834 if (auto locked = weak_handler.lock()) {
835 auto next_time = steady_clock::now() + milliseconds(interval_ms);
836 timer_.add_task(next_time, [locked]() { (*locked)(); });
837 }
838 };
839
840 auto first_time = steady_clock::now() + milliseconds(interval_ms);
841 timer_.add_task(first_time, [handler_ptr]() { (*handler_ptr)(); });
842 return state;
843}
844
846 // ThreadPool
848 // AsyncComponents
850
851NEFORCE_END_NAMESPACE__
852#endif // NEFORCE_CORE_ASYNC_THREAD_POOL_HPP__
固定大小数组容器
void notify_one() noexcept
通知一个等待线程
cv_status wait_for(unique_lock< mutex > &lock, const duration< Rep, Period > &rest)
等待指定的持续时间
函数包装器主模板声明
独占future类模板
线程本地任务队列
size_t remain_size() const noexcept
获取剩余容量
bool empty() const noexcept
检查队列是否为空
size_t size() const noexcept
获取队列当前大小
optional< function< void()> > try_pop()
从队列头部弹出任务
size_t capacity() const noexcept
获取队列容量
static constexpr size_t queue_size
队列容量
static void set_steal_strategy(const steal_strategy strategy, const uint32_t batch_size=4)
设置窃取策略
void push_back(function< void()> task)
推送任务到队列尾部
optional< function< void()> > be_stolen_by(local_queue &dst_queue)
被其他队列窃取任务
steal_strategy
任务窃取策略
锁管理器模板
static constexpr T max() noexcept
获取类型的最大值
可选值类
size_type size() const noexcept(noexcept(_NEFORCE declval< Sequence >().size()))
获取优先队列大小
void emplace(Args &&... args)
在优先队列中就地构造元素
共享智能指针类模板
异步任务
static tuple< future_result_t< Types >... > wait(future< Types > &&... futures)
等待多个future完成
task_info::priority_type priority_type
优先级类型别名
bool set_thread_threshhold(size_t threshhold) noexcept
设置线程数阈值
uint32_t id_type
线程ID类型别名
static constexpr size_t max_idle_seconds
最大空闲秒数
periodic_token submit_every(int64_t interval_ms, priority_type priority, Func &&func, Args &&... args)
提交周期性任务
submit_result< invoke_result_t< Func, Args... > > submit_task(Func &&func, Args &&... args)
提交任务(使用默认优先级0)
pool_statistics stop()
停止线程池
submit_result< invoke_result_t< Func, Args... > > submit_after(int64_t delay_ms, Func &&func, Args &&... args)
提交延迟任务(使用默认优先级0)
pool_statistics statistics() const
获取线程池统计信息
periodic_token submit_every(int64_t interval_ms, Func &&func, Args &&... args)
提交周期性任务(使用默认优先级0)
static void cancel_periodic_task(const periodic_token &token)
取消周期性任务
submit_result< invoke_result_t< Func, Args... > > submit_task(priority_type priority, Func &&func, Args &&... args)
提交任务
shared_ptr< periodic_task_state > periodic_token
周期性任务令牌
pool_mode
线程池运行模式
thread_pool()
默认构造函数
local_queue::steal_strategy steal_strategy
窃取策略类型别名
bool set_task_threshhold(size_t threshhold) noexcept
设置任务队列阈值
static constexpr size_t task_max_threshhold
最大任务队列阈值
bool running() const noexcept
检查线程池是否正在运行
~thread_pool()
析构函数
pool_mode mode() const noexcept
获取线程池模式
bool set_mode(pool_mode mode) noexcept
设置线程池模式
submit_result< invoke_result_t< Func, Args... > > submit_after(int64_t delay_ms, priority_type priority, Func &&func, Args &&... args)
提交延迟任务
bool start(size_t init_thread_size=3)
启动线程池
bool set_steal_mode(steal_strategy strategy, uint32_t steal_batch=4) noexcept
设置窃取策略
时间戳类
static timestamp now() noexcept
获取当前时间戳
独占锁管理器模板
pair< iterator, bool > emplace(Args &&... args)
在unordered_map中就地构造元素
size_type size() const noexcept
获取元素数量
constexpr void reserve(const size_type n)
预留容量
constexpr size_type size() const noexcept
获取当前元素数量
constexpr void emplace_back(Args &&... args)
在末尾构造元素
日期时间处理库
constexpr T && forward(remove_reference_t< T > &x) noexcept
完美转发左值
enable_if_t< is_void_v< T >, future_result_t< T > > get(future< T > &f)
通用future结果获取函数
@ fixed
固定霍夫曼编码
unsigned char uint8_t
8位无符号整数类型
unsigned int uint32_t
32位无符号整数类型
long long int64_t
64位有符号整数类型
unsigned long long uint64_t
64位无符号整数类型
constexpr iter_difference_t< Iterator > count(Iterator first, Iterator last, const T &value)
统计范围内等于指定值的元素数量
duration< int64_t, milli > milliseconds
毫秒持续时间
duration< int64_t > seconds
秒持续时间
constexpr bool is_invocable_v
is_invocable的便捷变量模板
typename inner::__invoke_result_aux< F, Args... >::type invoke_result_t
invoke_result的便捷别名
constexpr auto memory_order_release
释放内存顺序常量
constexpr auto memory_order_relaxed
宽松内存顺序常量
constexpr auto memory_order_acquire
获取内存顺序常量
enable_if_t<!is_unbounded_array_v< T > &&is_constructible_v< T, Args... >, shared_ptr< T > > make_shared(Args &&... args)
融合分配创建共享指针
constexpr Iterator2 move(Iterator1 first, Iterator1 last, Iterator2 result) noexcept(noexcept(inner::__move_aux(first, last, result)))
移动范围元素
int priority() noexcept
获取线程优先级
worker_context *& get_worker_context() noexcept
获取当前线程的工作线程上下文
shared_ptr< task_group > & get_current_task_group() noexcept
获取当前线程的任务组
constexpr tuple< unwrap_ref_decay_t< Types >... > make_tuple(Types &&... args)
从参数创建元组
constexpr decltype(auto) apply(Func &&f, Tuple &&t) noexcept(inner::__apply_unpack_tuple< _NEFORCE is_nothrow_invocable, Func, Tuple >::value)
将元组元素解包作为参数调用函数
constexpr decltype(auto) size(const Container &cont) noexcept(noexcept(cont.size()))
获取容器的大小
constexpr unique_ptr< T > make_unique(Args &&... args)
创建unique_ptr
延迟启动线程实现
可选值类型
优先队列容器适配器
通用原子类型模板
T load(const memory_order mo=memory_order_seq_cst) const noexcept
原子加载操作
void store(T value, const memory_order mo=memory_order_seq_cst) noexcept
原子存储操作
异常基类
const char * what() const noexcept
获取错误信息
可字符串化接口
static time_point now() noexcept
获取当前时间点
任务提交结果
_NEFORCE future< T > future
任务的future
shared_ptr< _NEFORCE task_info > task_info
任务信息
void increment() noexcept
增加运行计数
atomic< size_t > running_count
正在运行的任务计数
void wait() const noexcept
等待组内所有任务完成
void decrement() noexcept
减少运行计数
timestamp submit_time
提交时间
timestamp start_time
开始执行时间
bool is_finished() const noexcept
检查任务是否已完成
const uint64_t id
任务ID
task_info(const uint64_t task_id, const priority_type priority)
构造函数
status
任务状态枚举
int64_t exec_time() const noexcept
获取任务执行时间
string error
错误信息
uint32_t worker_thread_id
执行任务的线程ID
priority_type priority
任务优先级
timestamp finish_time
完成时间
atomic< bool > cancelled
是否已取消
size_t total_completed
总完成任务数
size_t busy_threads
忙碌线程数
size_t idle_threads
空闲线程数
size_t total_stolen
总窃取任务数
size_t queue_size
全局队列大小
string to_string() const
转换为字符串
size_t total_submitted
总提交任务数
工作线程上下文
atomic< bool > is_stealing
是否正在执行窃取操作
id_type id
线程ID
local_queue queue
本地任务队列
uint32_t id_type
线程ID类型
size_t consecutive_idle_count
连续空闲次数
异步定时器
无序映射容器
弱智能指针实现