NexusForce 1.0.0
A Modern C++ Library with extended functionality, web components, and utility libraries
载入中...
搜索中...
未找到
thread_pool.hpp
浏览该文件的文档.
1#ifndef NEFORCE_CORE_ASYNC_THREAD_POOL_HPP__
2#define NEFORCE_CORE_ASYNC_THREAD_POOL_HPP__
3
11
18NEFORCE_BEGIN_NAMESPACE__
19
25
31
33NEFORCE_BEGIN_INNER__
34
35class NEFORCE_API manual_thread {
36public:
37 using id_type = uint32_t;
38
39private:
40 using thread_func = function<void(id_type)>;
41
42 thread_func func_;
43 id_type thread_id_;
44
45public:
46 explicit manual_thread(thread_func func) noexcept;
47 ~manual_thread() = default;
48
49 NEFORCE_NODISCARD id_type id() const noexcept { return thread_id_; }
50 void start();
51};
52
53NEFORCE_END_INNER__
55
56
64struct task_group {
65 task_group() = default;
66 ~task_group() = default;
67
69
73 void increment() noexcept { running_count.fetch_add(1, memory_order_relaxed); }
74
80 void decrement() noexcept {
81 if (running_count.fetch_sub(1, memory_order_release) == 1) {
82 running_count.notify_all();
83 }
84 }
85
89 void wait() const noexcept {
91 while (count != 0) {
92 running_count.wait(count);
94 }
95 }
96};
97
105class NEFORCE_API local_queue {
106public:
111 enum class steal_strategy : uint8_t {
112 half,
113 fixed_batch,
114 single,
115 adaptive
116 };
117
118 static constexpr size_t queue_size = 256;
119
120private:
121 static steal_strategy steal_strategy_;
122 static uint32_t fixed_batch_size_;
123
124 array<function<void()>, queue_size> tasks_{};
125 atomic<uint64_t> head_{0};
126 atomic<uint32_t> tail_{0};
127
128private:
129 constexpr static size_t mask_ = queue_size - 1;
130
131 NEFORCE_NODISCARD static uint64_t pack(const uint32_t steal, const uint32_t local_head) noexcept {
132 return static_cast<uint64_t>(steal) << 32 | static_cast<uint64_t>(local_head);
133 }
134
135 NEFORCE_NODISCARD static pair<uint32_t, uint32_t> unpack(const uint64_t head) noexcept {
136 return {static_cast<uint32_t>(head >> 32), static_cast<uint32_t>(head)};
137 }
138
139 uint32_t be_stolen_by_impl(local_queue& dst, uint32_t dst_tail);
140
141public:
142 local_queue() = default;
143 ~local_queue() = default;
144 local_queue(const local_queue&) = delete;
145 local_queue& operator=(const local_queue&) = delete;
146 local_queue(local_queue&& other) noexcept;
147 local_queue& operator=(local_queue&& other) noexcept;
148
153 NEFORCE_NODISCARD size_t capacity() const noexcept { return tasks_.size(); }
154
159 NEFORCE_NODISCARD bool empty() const noexcept { return size() == 0u; }
160
165 NEFORCE_NODISCARD size_t remain_size() const noexcept {
166 const auto tail = tail_.load(memory_order_acquire);
167 const auto head = head_.load(memory_order_acquire);
168 const auto steal = unpack(head).first;
169 const size_t used = static_cast<size_t>(tail - steal);
170 const size_t remain = capacity() - used;
171 return remain;
172 }
173
178 NEFORCE_NODISCARD size_t size() const noexcept {
179 const auto tail = tail_.load(memory_order_acquire);
180 const auto head = head_.load(memory_order_acquire);
181 const auto local_head = unpack(head).second;
182 return static_cast<size_t>(tail - local_head);
183 }
184
190 static void set_steal_strategy(const steal_strategy strategy, const uint32_t batch_size = 4) {
191 steal_strategy_ = strategy;
192 fixed_batch_size_ = batch_size;
193 }
194
199 void push_back(function<void()> task) {
200 const uint32_t tail = tail_.load(memory_order_relaxed);
201 tasks_[tail & mask_] = move(task);
202 tail_.store(tail + 1, memory_order_release);
203 }
204
210
216 optional<function<void()>> be_stolen_by(local_queue& dst_queue);
217};
218
225struct NEFORCE_API worker_context {
226 using id_type = inner::manual_thread::id_type;
227
229 id_type id{0};
232
233 worker_context() = default;
234 worker_context(const worker_context&) = delete;
235 worker_context& operator=(const worker_context&) = delete;
236 worker_context(worker_context&& other) noexcept;
237 worker_context& operator=(worker_context&& other) noexcept;
238};
239
240
247struct task_info {
258
259 enum class priority_type : uint32_t {
260 };
261
262 const uint64_t id;
267 inner::manual_thread::id_type worker_thread_id{0};
268 string error{};
269 priority_type priority;
270
276 explicit task_info(const uint64_t task_id, const priority_type priority) :
277 id(task_id),
279
284 NEFORCE_NODISCARD bool is_finished() const noexcept {
285 const auto s = status.load(memory_order_acquire);
286 return s == status::completed || s == status::failed;
287 }
288
293 NEFORCE_NODISCARD int64_t exec_time() const noexcept {
294 if (start_time.value() == 0 || finish_time.value() == 0) {
295 return -1;
296 }
297 return finish_time.value() - start_time.value();
298 }
299};
300
308template <typename T>
310 _NEFORCE future<T> future;
312
317 NEFORCE_NODISCARD explicit operator bool() const noexcept { return future.valid() && task_info; }
318};
319
320
332class NEFORCE_API thread_pool {
333public:
338 enum class pool_mode : uint8_t {
340 cached
341 };
342
350
355 struct NEFORCE_API pool_statistics : istringify<pool_statistics> {
359 size_t queue_size;
363
368 NEFORCE_NODISCARD string to_string() const;
369 };
370
372 using id_type = inner::manual_thread::id_type;
374 using priority_type = task_info::priority_type;
375
377 static constexpr size_t max_idle_seconds = 60;
378
379 static size_t max_thread_threshhold();
380
381private:
382 using task_type = function<void()>;
383
388 struct priority_task {
389 task_type task;
392
393 priority_task(task_type t, const priority_type p, shared_ptr<task_info> info) noexcept :
394 task(move(t)),
395 priority(p),
396 info(_NEFORCE move(info)) {}
397
398 bool operator<(const priority_task& other) const noexcept { return priority < other.priority; }
399 };
400
401 unordered_map<id_type, unique_ptr<inner::manual_thread>> threads_map_;
402 unordered_map<id_type, worker_context> worker_contexts_;
403 vector<atomic<worker_context*>> worker_contexts_ptr_;
404 mutex worker_contexts_mtx_;
405
406 timer_scheduler<steady_clock> timer_{};
407
408 id_type init_thread_size_{0};
409 size_t thread_threshhold_;
410
411 priority_queue<priority_task> task_queue_{};
412 atomic<uint32_t> task_size_{0};
413 atomic<uint32_t> idle_thread_size_{0};
414 size_t task_threshhold_{task_max_threshhold};
415
416 mutable mutex task_queue_mtx_{};
417 condition_variable not_full_{};
418 condition_variable not_empty_{};
419 condition_variable exit_cond_{};
420
421 atomic<pool_mode> pool_mode_{pool_mode::fixed};
422 atomic<bool> is_running_{false};
423
424 atomic<size_t> total_submitted_tasks_{0};
425 atomic<size_t> total_completed_tasks_{0};
426 atomic<size_t> total_stolen_tasks_{0};
427 atomic<size_t> steal_worker_count_{0};
428 atomic<uint64_t> next_task_id_{0};
429
430private:
431 uint64_t generate_task_id() { return next_task_id_.fetch_add(1, memory_order_relaxed); }
432
433 void thread_function(id_type thread_id);
434 optional<task_type> try_steal_task(worker_context& ctx);
435
436 pool_statistics statistics_unsafe() const;
437
438public:
443
448
449 thread_pool(const thread_pool&) = delete;
450 thread_pool& operator=(const thread_pool&) = delete;
451
452 thread_pool(thread_pool&&) = default;
453 thread_pool& operator=(thread_pool&&) = default;
454
460 bool set_mode(pool_mode mode) noexcept;
461
468 bool set_steal_mode(steal_strategy strategy, uint32_t steal_batch = 4) noexcept;
469
475 bool set_task_threshhold(size_t threshhold) noexcept;
476
482 bool set_thread_threshhold(size_t threshhold) noexcept;
483
488 NEFORCE_NODISCARD bool running() const noexcept { return is_running_; }
489
494 NEFORCE_NODISCARD pool_mode mode() const noexcept { return pool_mode_; }
495
500 NEFORCE_NODISCARD pool_statistics statistics() const;
501
507 bool start(size_t init_thread_size = 3);
508
514
524 template <typename Func, typename... Args>
525 submit_result<invoke_result_t<Func, Args...>> submit_task(priority_type priority, Func&& func, Args&&... args);
526
535 template <typename Func, typename... Args>
536 submit_result<invoke_result_t<Func, Args...>> submit_task(Func&& func, Args&&... args) {
537 return this->submit_task(static_cast<priority_type>(0), _NEFORCE forward<Func>(func),
538 _NEFORCE forward<Args>(args)...);
539 }
540
551 template <typename Func, typename... Args>
553 Args&&... args);
554
564 template <typename Func, typename... Args>
565 submit_result<invoke_result_t<Func, Args...>> submit_after(int64_t delay_ms, Func&& func, Args&&... args) {
566 return this->submit_after(delay_ms, static_cast<priority_type>(0), _NEFORCE forward<Func>(func),
567 _NEFORCE forward<Args>(args)...);
568 }
569
580 template <typename Func, typename... Args>
581 periodic_token submit_every(int64_t interval_ms, priority_type priority, Func&& func, Args&&... args);
582
592 template <typename Func, typename... Args>
593 periodic_token submit_every(int64_t interval_ms, Func&& func, Args&&... args) {
594 return this->submit_every(interval_ms, static_cast<priority_type>(0), _NEFORCE forward<Func>(func),
595 _NEFORCE forward<Args>(args)...);
596 }
597
602 static void cancel_periodic_task(const periodic_token& token) {
603 if (token) {
604 token->cancelled.store(true);
605 }
606 }
607
614 template <typename... Types>
615 static tuple<future_result_t<Types>...> wait(future<Types>&&... futures) {
616 return _NEFORCE make_tuple(_NEFORCE get(futures)...);
617 }
618};
619
620
625NEFORCE_API worker_context*& get_worker_context() noexcept;
626
632
634
635template <typename Func, typename... Args>
636submit_result<invoke_result_t<Func, Args...>> thread_pool::submit_task(const priority_type priority, Func&& func,
637 Args&&... args) {
638 static_assert(is_invocable_v<Func, Args...>, "Func must be invocable with Args");
639
640 using Result = invoke_result_t<Func, Args...>;
641
642 auto info = make_shared<task_info>(generate_task_id(), priority);
643
644 const auto current_group = get_current_task_group();
645 if (current_group) {
646 current_group->increment();
647 }
648
649 auto task = _NEFORCE make_shared<packaged_task<Result()>>(
650 [func = _NEFORCE forward<Func>(func), args = _NEFORCE make_tuple(_NEFORCE forward<Args>(args)...),
651 group = current_group, info]() mutable -> Result {
652 struct context_guard {
654 shared_ptr<task_group> group_inner;
655 shared_ptr<task_group> prev_group_inner;
656
657 explicit context_guard(shared_ptr<task_info> i, shared_ptr<task_group> g) :
658 info(move(i)),
659 group_inner(move(g)) {
661 info->start_time = timestamp::now();
662 info->worker_thread_id = get_worker_context() ? get_worker_context()->id : 0;
663
664 prev_group_inner = get_current_task_group();
665 get_current_task_group() = group_inner;
666 }
667
668 ~context_guard() noexcept {
669 try {
670 info->finish_time = timestamp::now();
671 auto expected = task_info::status::running;
672 info->status.compare_exchange_strong(expected, task_info::status::completed,
674
675 get_current_task_group() = prev_group_inner;
676 if (group_inner) {
677 group_inner->decrement();
678 }
679 // NOLINTNEXTLINE(bugprone-empty-catch)
680 } catch (...) {
681 /* ignore */
682 }
683 }
684 };
685
686 context_guard guard(info, group);
687 try {
688 return _NEFORCE apply(func, args);
689 } catch (const exception& e) {
691 info->error = e.what();
692 throw;
693 } catch (...) {
695 info->error = "Unknown exception";
696 throw;
697 }
698 });
699
700 future<Result> res = task->get_future();
701 task_type job([task] { (*task)(); });
702
703 if (static_cast<uint32_t>(priority) > 0) {
704 unique_lock<mutex> lock(task_queue_mtx_);
705
706 if (!not_full_.wait_for(lock, seconds(1), [&]() -> bool { return task_queue_.size() < task_threshhold_; })) {
708 info->error = "Task queue is full";
709
710 auto dummy_task = _NEFORCE make_shared<packaged_task<Result()>>([]() -> Result { return Result(); });
711 (*dummy_task)();
712 return submit_result<Result>{dummy_task->get_future(), info};
713 }
714
715 task_queue_.emplace(move(job), priority, info);
716 ++task_size_;
717 ++total_submitted_tasks_;
718 not_empty_.notify_one();
719
720 } else {
721 auto* ctx = get_worker_context();
722
723 if (ctx != nullptr && ctx->queue.remain_size() > 0) {
724 ctx->queue.push_back(move(job));
725 ++total_submitted_tasks_;
726 } else {
727 unique_lock<mutex> lock(task_queue_mtx_);
728 if (!not_full_.wait_for(lock, seconds(1),
729 [&]() -> bool { return task_queue_.size() < task_threshhold_; })) {
731 info->error = "Task queue is full";
732
733 auto dummy_task = _NEFORCE make_shared<packaged_task<Result()>>([]() -> Result { return Result(); });
734 (*dummy_task)();
735 return submit_result<Result>{dummy_task->get_future(), info};
736 }
737
738 task_queue_.emplace(move(job), static_cast<priority_type>(0), info);
739 ++task_size_;
740 ++total_submitted_tasks_;
741 not_empty_.notify_one();
742 }
743 }
744
745 if (pool_mode_.load() == pool_mode::cached && task_size_.load() > idle_thread_size_) {
746
747 inner::manual_thread* t_ptr = nullptr;
748 id_type thread_id = 0;
749
750 {
751 unique_lock<mutex> lock(task_queue_mtx_);
752 if (threads_map_.size() < thread_threshhold_) {
753 auto ptr =
754 _NEFORCE make_unique<inner::manual_thread>([this](const id_type id) { thread_function(id); });
755
756 thread_id = ptr->id();
757 t_ptr = ptr.get();
758 threads_map_.emplace(thread_id, move(ptr));
759 }
760 }
761
762 if (t_ptr != nullptr) {
763 {
764 lock<mutex> ctx_lock(worker_contexts_mtx_);
765 if (thread_id >= worker_contexts_ptr_.size()) {
766 worker_contexts_ptr_.reserve(thread_id + 1);
767 for (size_t i = worker_contexts_ptr_.size(); i <= thread_id; i++) {
769 tmp.store(nullptr, memory_order_relaxed);
770 worker_contexts_ptr_.emplace_back(move(tmp));
771 }
772 }
773 }
774
775 t_ptr->start();
776 }
777 }
778
779 return submit_result<Result>{move(res), move(info)};
780}
781
782template <typename Func, typename... Args>
783submit_result<invoke_result_t<Func, Args...>>
784thread_pool::submit_after(const int64_t delay_ms, const priority_type priority, Func&& func, Args&&... args) {
785 static_assert(is_invocable_v<Func, Args...>, "Func must be invocable with Args");
786
787 using Result = invoke_result_t<Func, Args...>;
788
789 auto info = make_shared<task_info>(generate_task_id(), priority);
790
791 auto task = _NEFORCE make_shared<packaged_task<Result()>>(
792 [func = _NEFORCE forward<Func>(func), tup = _NEFORCE make_tuple(_NEFORCE forward<Args>(args)...),
793 info]() mutable {
794 struct context_guard {
795 shared_ptr<task_info> info;
796
797 explicit context_guard(shared_ptr<task_info> i) :
798 info(move(i)) {
800 info->start_time = timestamp::now();
801 info->worker_thread_id = get_worker_context() ? get_worker_context()->id : 0;
802 }
803
804 ~context_guard() noexcept {
805 info->finish_time = timestamp::now();
806 auto expected = task_info::status::running;
807 info->status.compare_exchange_strong(expected, task_info::status::completed,
809 }
810 };
811
812 context_guard guard(info);
813
814 try {
815 return _NEFORCE apply(func, tup);
816 } catch (const exception& e) {
818 info->error = e.what();
819 throw;
820 }
821 });
822
823 future<Result> res = task->get_future();
824
825 auto expire_time = steady_clock::now() + milliseconds(delay_ms);
826 timer_.add_task(expire_time, [this, task = _NEFORCE move(task), priority]() mutable {
827 this->submit_task(priority, [task]() { (*task)(); });
828 });
829
830 return submit_result<Result>{_NEFORCE move(res), info};
831}
832
833template <typename Func, typename... Args>
834thread_pool::periodic_token thread_pool::submit_every(int64_t interval_ms, const priority_type priority, Func&& func,
835 Args&&... args) {
837 auto task = _NEFORCE make_shared<function<void()>>(
838 [func = _NEFORCE forward<Func>(func),
839 tup = _NEFORCE make_tuple(_NEFORCE forward<Args>(args)...)]() mutable { _NEFORCE apply(func, tup); });
840 auto handler_ptr = _NEFORCE make_shared<task_type>();
841 *handler_ptr = [this, state, task, interval_ms, priority, handler_ptr]() {
842 if (state->cancelled.load()) {
843 return;
844 }
845
846 this->submit_task(priority, [task]() { (*task)(); });
847
848 if (state->cancelled.load()) {
849 return;
850 }
851 auto next_time = steady_clock::now() + milliseconds(interval_ms);
852 timer_.add_task(next_time, [handler_ptr]() { (*handler_ptr)(); });
853 };
854
855 auto first_time = steady_clock::now() + milliseconds(interval_ms);
856 timer_.add_task(first_time, [handler_ptr]() { (*handler_ptr)(); });
857 return state;
858}
859
861 // ThreadPool
863 // AsyncComponents
865
866NEFORCE_END_NAMESPACE__
867#endif // NEFORCE_CORE_ASYNC_THREAD_POOL_HPP__
固定大小数组容器
void notify_one() noexcept
通知一个等待线程
cv_status wait_for(unique_lock< mutex > &lock, const duration< Rep, Period > &rest)
等待指定的持续时间
函数包装器主模板声明
独占future类模板
线程本地任务队列
NEFORCE_NODISCARD size_t size() const noexcept
获取队列当前大小
optional< function< void()> > try_pop()
从队列头部弹出任务
NEFORCE_NODISCARD size_t capacity() const noexcept
获取队列容量
static constexpr size_t queue_size
队列容量
static void set_steal_strategy(const steal_strategy strategy, const uint32_t batch_size=4)
设置窃取策略
void push_back(function< void()> task)
推送任务到队列尾部
NEFORCE_NODISCARD bool empty() const noexcept
检查队列是否为空
optional< function< void()> > be_stolen_by(local_queue &dst_queue)
被其他队列窃取任务
steal_strategy
任务窃取策略
NEFORCE_NODISCARD size_t remain_size() const noexcept
获取剩余容量
锁管理器模板
static NEFORCE_NODISCARD constexpr T max() noexcept
获取类型的最大值
可选值类
NEFORCE_NODISCARD size_type size() const noexcept(noexcept(_NEFORCE declval< Sequence >().size()))
获取优先队列大小
void emplace(Args &&... args)
在优先队列中就地构造元素
共享智能指针类模板
异步任务
static tuple< future_result_t< Types >... > wait(future< Types > &&... futures)
等待多个future完成
task_info::priority_type priority_type
优先级类型别名
bool set_thread_threshhold(size_t threshhold) noexcept
设置线程数阈值
static constexpr size_t max_idle_seconds
最大空闲秒数
periodic_token submit_every(int64_t interval_ms, priority_type priority, Func &&func, Args &&... args)
提交周期性任务
submit_result< invoke_result_t< Func, Args... > > submit_task(Func &&func, Args &&... args)
提交任务(使用默认优先级0)
inner::manual_thread::id_type id_type
线程ID类型别名
pool_statistics stop()
停止线程池
NEFORCE_NODISCARD pool_statistics statistics() const
获取线程池统计信息
NEFORCE_NODISCARD pool_mode mode() const noexcept
获取线程池模式
submit_result< invoke_result_t< Func, Args... > > submit_after(int64_t delay_ms, Func &&func, Args &&... args)
提交延迟任务(使用默认优先级0)
periodic_token submit_every(int64_t interval_ms, Func &&func, Args &&... args)
提交周期性任务(使用默认优先级0)
static void cancel_periodic_task(const periodic_token &token)
取消周期性任务
submit_result< invoke_result_t< Func, Args... > > submit_task(priority_type priority, Func &&func, Args &&... args)
提交任务
shared_ptr< periodic_task_state > periodic_token
周期性任务令牌
pool_mode
线程池运行模式
NEFORCE_NODISCARD bool running() const noexcept
检查线程池是否正在运行
thread_pool()
默认构造函数
local_queue::steal_strategy steal_strategy
窃取策略类型别名
bool set_task_threshhold(size_t threshhold) noexcept
设置任务队列阈值
static constexpr size_t task_max_threshhold
最大任务队列阈值
~thread_pool()
析构函数
bool set_mode(pool_mode mode) noexcept
设置线程池模式
submit_result< invoke_result_t< Func, Args... > > submit_after(int64_t delay_ms, priority_type priority, Func &&func, Args &&... args)
提交延迟任务
bool start(size_t init_thread_size=3)
启动线程池
bool set_steal_mode(steal_strategy strategy, uint32_t steal_batch=4) noexcept
设置窃取策略
时间戳类
static NEFORCE_NODISCARD timestamp now() noexcept
获取当前时间戳
独占锁管理器模板
pair< iterator, bool > emplace(Args &&... args)
在unordered_map中就地构造元素
NEFORCE_NODISCARD size_type size() const noexcept
获取元素数量
NEFORCE_CONSTEXPR20 void reserve(const size_type n)
预留容量
NEFORCE_NODISCARD NEFORCE_CONSTEXPR20 size_type size() const noexcept
获取当前元素数量
NEFORCE_CONSTEXPR20 void emplace_back(Args &&... args)
在末尾构造元素
日期时间处理库
NEFORCE_NODISCARD constexpr T && forward(remove_reference_t< T > &x) noexcept
完美转发左值
NEFORCE_ALWAYS_INLINE enable_if_t< is_void_v< T >, future_result_t< T > > get(future< T > &f)
通用future结果获取函数
@ fixed
固定霍夫曼编码
unsigned char uint8_t
8位无符号整数类型
unsigned int uint32_t
32位无符号整数类型
long long int64_t
64位有符号整数类型
unsigned long long uint64_t
64位无符号整数类型
constexpr iter_difference_t< Iterator > count(Iterator first, Iterator last, const T &value)
统计范围内等于指定值的元素数量
duration< int64_t, milli > milliseconds
毫秒持续时间
duration< int64_t > seconds
秒持续时间
NEFORCE_INLINE17 constexpr bool is_invocable_v
is_invocable的便捷变量模板
typename inner::__invoke_result_aux< F, Args... >::type invoke_result_t
invoke_result的便捷别名
NEFORCE_INLINE17 constexpr auto memory_order_release
释放内存顺序常量
NEFORCE_INLINE17 constexpr auto memory_order_relaxed
宽松内存顺序常量
NEFORCE_INLINE17 constexpr auto memory_order_acquire
获取内存顺序常量
NEFORCE_NODISCARD constexpr bool operator<(const normal_iterator< LeftIter > &lhs, const normal_iterator< RightIter > &rhs) noexcept
小于比较运算符
enable_if_t<!is_unbounded_array_v< T > &&is_constructible_v< T, Args... >, shared_ptr< T > > make_shared(Args &&... args)
融合分配创建共享指针
constexpr Iterator2 move(Iterator1 first, Iterator1 last, Iterator2 result) noexcept(noexcept(inner::__move_aux(first, last, result)))
移动范围元素
bool NEFORCE_API priority(int priority) noexcept
设置线程优先级
NEFORCE_ALWAYS_INLINE_INLINE thread::id id() noexcept
获取当前线程标识符
NEFORCE_API shared_ptr< task_group > & get_current_task_group() noexcept
获取当前线程的任务组
NEFORCE_API worker_context *& get_worker_context() noexcept
获取当前线程的工作线程上下文
NEFORCE_NODISCARD constexpr tuple< unwrap_ref_decay_t< Types >... > make_tuple(Types &&... args)
从参数创建元组
constexpr decltype(auto) apply(Func &&f, Tuple &&t) noexcept(inner::__apply_unpack_tuple< _NEFORCE is_nothrow_invocable, Func, Tuple >::value)
将元组元素解包作为参数调用函数
NEFORCE_NODISCARD NEFORCE_ALWAYS_INLINE constexpr decltype(auto) size(const Container &cont) noexcept(noexcept(cont.size()))
获取容器的大小
NEFORCE_CONSTEXPR20 unique_ptr< T > make_unique(Args &&... args)
创建unique_ptr
可选值类型
NeForce 异步任务包装器
优先队列容器适配器
通用原子类型模板
T load(const memory_order mo=memory_order_seq_cst) const noexcept
原子加载操作
void store(T value, const memory_order mo=memory_order_seq_cst) noexcept
原子存储操作
异常基类
NEFORCE_NODISCARD const char * what() const noexcept
获取错误信息
可字符串化接口
static time_point now() noexcept
获取当前时间点
任务提交结果
_NEFORCE future< T > future
任务的future
shared_ptr< _NEFORCE task_info > task_info
任务信息
void increment() noexcept
增加运行计数
atomic< size_t > running_count
正在运行的任务计数
void wait() const noexcept
等待组内所有任务完成
void decrement() noexcept
减少运行计数
timestamp submit_time
提交时间
timestamp start_time
开始执行时间
NEFORCE_NODISCARD bool is_finished() const noexcept
检查任务是否已完成
inner::manual_thread::id_type worker_thread_id
执行任务的线程ID
const uint64_t id
任务ID
task_info(const uint64_t task_id, const priority_type priority)
构造函数
status
任务状态枚举
NEFORCE_NODISCARD int64_t exec_time() const noexcept
获取任务执行时间
string error
错误信息
priority_type priority
任务优先级
timestamp finish_time
完成时间
atomic< bool > cancelled
是否已取消
NEFORCE_NODISCARD string to_string() const
转换为字符串
size_t total_completed
总完成任务数
size_t busy_threads
忙碌线程数
size_t idle_threads
空闲线程数
size_t total_stolen
总窃取任务数
size_t queue_size
全局队列大小
size_t total_submitted
总提交任务数
工作线程上下文
atomic< bool > is_stealing
是否正在执行窃取操作
id_type id
线程ID
local_queue queue
本地任务队列
inner::manual_thread::id_type id_type
线程ID类型
size_t consecutive_idle_count
连续空闲次数
异步定时器
无序映射容器