1#ifndef NEFORCE_CORE_ASYNC_THREAD_POOL_HPP__
2#define NEFORCE_CORE_ASYNC_THREAD_POOL_HPP__
19NEFORCE_BEGIN_NAMESPACE__
41 task_group() =
default;
42 ~task_group() =
default;
65 void wait() const noexcept {
81class NEFORCE_API local_queue {
105 constexpr static size_t mask_ = queue_size - 1;
108 return static_cast<uint64_t>(steal) << 32 |
static_cast<uint64_t>(local_head);
111 NEFORCE_NODISCARD
static pair<uint32_t, uint32_t> unpack(
const uint64_t head)
noexcept {
118 local_queue() =
default;
119 ~local_queue() =
default;
120 local_queue(
const local_queue&) =
delete;
121 local_queue& operator=(
const local_queue&) =
delete;
122 local_queue(local_queue&& other)
noexcept;
123 local_queue& operator=(local_queue&& other)
noexcept;
129 NEFORCE_NODISCARD
size_t capacity() const noexcept {
return tasks_.size(); }
135 NEFORCE_NODISCARD
bool empty() const noexcept {
return size() == 0U; }
144 const auto steal = unpack(head).first;
145 const auto used =
static_cast<size_t>(tail - steal);
146 const size_t remain =
capacity() - used;
154 NEFORCE_NODISCARD
size_t size() const noexcept {
157 const auto local_head = unpack(head).second;
158 return static_cast<size_t>(tail - local_head);
167 steal_strategy_ = strategy;
168 fixed_batch_size_ = batch_size;
201struct NEFORCE_API worker_context {
209 worker_context() =
default;
210 worker_context(
const worker_context&) =
delete;
211 worker_context& operator=(
const worker_context&) =
delete;
212 worker_context(worker_context&& other)
noexcept;
213 worker_context& operator=(worker_context&& other)
noexcept;
235 enum class priority_type :
uint32_t {
293 NEFORCE_NODISCARD
explicit operator bool() const noexcept {
return future.valid() &&
task_info; }
355 static size_t max_thread_threshhold() noexcept;
364 struct priority_task {
372 info(_NEFORCE
move(info)) {}
374 bool operator<(
const priority_task& other)
const noexcept {
return priority < other.priority; }
377 struct thread_pool_id_generator {
378 static uint32_t& get_id() noexcept {
380 return pool_thread_id;
382 static uint32_t get_new_id() noexcept {
return get_id()++; }
383 static void reset_id() noexcept { get_id() = 0; }
386 unordered_map<id_type, unique_ptr<lazy_thread>> threads_map_;
387 unordered_map<id_type, worker_context> worker_contexts_;
388 vector<atomic<worker_context*>> worker_contexts_ptr_;
389 mutex worker_contexts_mtx_;
391 timer_scheduler<steady_clock> timer_;
393 id_type init_thread_size_{0};
394 size_t thread_threshhold_;
396 priority_queue<priority_task> task_queue_;
397 atomic<uint32_t> task_size_{0};
398 atomic<uint32_t> idle_thread_size_{0};
399 size_t task_threshhold_{task_max_threshhold};
401 mutable mutex task_queue_mtx_;
402 condition_variable not_full_;
403 condition_variable not_empty_;
404 condition_variable exit_cond_;
406 atomic<pool_mode> pool_mode_{pool_mode::fixed};
407 atomic<bool> is_running_{
false};
409 atomic<size_t> total_submitted_tasks_{0};
410 atomic<size_t> total_completed_tasks_{0};
411 atomic<size_t> total_stolen_tasks_{0};
412 atomic<size_t> steal_worker_count_{0};
413 atomic<uint64_t> next_task_id_{0};
418 void thread_function(id_type thread_id);
419 optional<task_type> try_steal_task(worker_context& ctx);
421 pool_statistics statistics_unsafe()
const;
473 NEFORCE_NODISCARD
bool running() const noexcept {
return is_running_; }
492 bool start(
size_t init_thread_size = 3);
509 template <
typename Func,
typename... Args>
520 template <
typename Func,
typename... Args>
536 template <
typename Func,
typename... Args>
549 template <
typename Func,
typename... Args>
565 template <
typename Func,
typename... Args>
577 template <
typename Func,
typename... Args>
589 token->cancelled.store(
true);
599 template <
typename... Types>
620template <typename Func, typename... Args>
623 static_assert(
is_invocable_v<Func, Args...>,
"Func must be invocable with Args");
631 current_group->increment();
636 group = current_group, info]()
mutable -> Result {
637 struct context_guard {
644 group_inner(
move(g)) {
653 ~context_guard()
noexcept {
662 group_inner->decrement();
671 context_guard guard(info, group);
673 return _NEFORCE
apply(func, args);
676 info->error = e.
what();
680 info->error =
"Unknown exception";
686 task_type job([
task] { (*task)(); });
693 info->error =
"Task queue is full";
695 auto dummy_task = _NEFORCE
make_shared<packaged_task<Result()>>([]() -> Result {
return Result(); });
702 ++total_submitted_tasks_;
708 if (ctx !=
nullptr && ctx->queue.remain_size() > 0) {
709 ctx->queue.push_back(
move(job));
710 ++total_submitted_tasks_;
714 [&]() ->
bool {
return task_queue_.
size() < task_threshhold_; })) {
716 info->error =
"Task queue is full";
718 auto dummy_task = _NEFORCE
make_shared<packaged_task<Result()>>([]() -> Result {
return Result(); });
723 task_queue_.
emplace(
move(job),
static_cast<priority_type
>(0), info);
725 ++total_submitted_tasks_;
730 if (pool_mode_.
load() == pool_mode::cached && task_size_.
load() > idle_thread_size_) {
731 id_type thread_id = 0;
735 if (threads_map_.
size() < thread_threshhold_) {
736 thread_id = thread_pool_id_generator::get_new_id();
737 auto worker_func = [
this, thread_id]() { thread_function(thread_id); };
739 threads_map_.
emplace(thread_id, _NEFORCE
move(ptr));
743 if (thread_id != 0) {
746 if (thread_id >= worker_contexts_ptr_.
size()) {
747 worker_contexts_ptr_.
reserve(thread_id + 1);
748 for (
size_t i = worker_contexts_ptr_.
size(); i <= thread_id; ++i) {
756 threads_map_[thread_id]->start();
757 threads_map_[thread_id]->detach();
764template <
typename Func,
typename... Args>
767 static_assert(
is_invocable_v<Func, Args...>,
"Func must be invocable with Args");
773 auto task = _NEFORCE
make_shared<packaged_task<Result()>>(
776 struct context_guard {
777 shared_ptr<task_info> info;
779 explicit context_guard(shared_ptr<task_info> i) :
786 ~context_guard() noexcept {
794 context_guard guard(info);
797 return _NEFORCE
apply(func, tup);
798 }
catch (
const exception& e) {
800 info->error = e.
what();
805 future<Result> res = task->get_future();
808 timer_.add_task(expire_time, [
this, task = _NEFORCE
move(task),
priority]()
mutable {
812 return submit_result<Result>{_NEFORCE
move(res), _NEFORCE
move(info)};
815template <
typename Func,
typename... Args>
819 auto task = _NEFORCE
make_shared<function<void()>>(
823 weak_ptr<task_type> weak_handler(handler_ptr);
824 *handler_ptr = [
this, state, task, interval_ms,
priority, weak_handler]() {
825 if (state->cancelled.load()) {
831 if (state->cancelled.load()) {
834 if (
auto locked = weak_handler.lock()) {
836 timer_.add_task(next_time, [locked]() { (*locked)(); });
841 timer_.add_task(first_time, [handler_ptr]() { (*handler_ptr)(); });
851NEFORCE_END_NAMESPACE__
void notify_one() noexcept
通知一个等待线程
cv_status wait_for(unique_lock< mutex > &lock, const duration< Rep, Period > &rest)
等待指定的持续时间
size_t remain_size() const noexcept
获取剩余容量
bool empty() const noexcept
检查队列是否为空
size_t size() const noexcept
获取队列当前大小
optional< function< void()> > try_pop()
从队列头部弹出任务
size_t capacity() const noexcept
获取队列容量
static constexpr size_t queue_size
队列容量
static void set_steal_strategy(const steal_strategy strategy, const uint32_t batch_size=4)
设置窃取策略
void push_back(function< void()> task)
推送任务到队列尾部
optional< function< void()> > be_stolen_by(local_queue &dst_queue)
被其他队列窃取任务
static constexpr T max() noexcept
获取类型的最大值
size_type size() const noexcept(noexcept(_NEFORCE declval< Sequence >().size()))
获取优先队列大小
void emplace(Args &&... args)
在优先队列中就地构造元素
static tuple< future_result_t< Types >... > wait(future< Types > &&... futures)
等待多个future完成
task_info::priority_type priority_type
优先级类型别名
bool set_thread_threshhold(size_t threshhold) noexcept
设置线程数阈值
static constexpr size_t max_idle_seconds
最大空闲秒数
periodic_token submit_every(int64_t interval_ms, priority_type priority, Func &&func, Args &&... args)
提交周期性任务
submit_result< invoke_result_t< Func, Args... > > submit_task(Func &&func, Args &&... args)
提交任务(使用默认优先级0)
pool_statistics stop()
停止线程池
submit_result< invoke_result_t< Func, Args... > > submit_after(int64_t delay_ms, Func &&func, Args &&... args)
提交延迟任务(使用默认优先级0)
pool_statistics statistics() const
获取线程池统计信息
periodic_token submit_every(int64_t interval_ms, Func &&func, Args &&... args)
提交周期性任务(使用默认优先级0)
static void cancel_periodic_task(const periodic_token &token)
取消周期性任务
submit_result< invoke_result_t< Func, Args... > > submit_task(priority_type priority, Func &&func, Args &&... args)
提交任务
shared_ptr< periodic_task_state > periodic_token
周期性任务令牌
local_queue::steal_strategy steal_strategy
窃取策略类型别名
bool set_task_threshhold(size_t threshhold) noexcept
设置任务队列阈值
static constexpr size_t task_max_threshhold
最大任务队列阈值
bool running() const noexcept
检查线程池是否正在运行
pool_mode mode() const noexcept
获取线程池模式
bool set_mode(pool_mode mode) noexcept
设置线程池模式
submit_result< invoke_result_t< Func, Args... > > submit_after(int64_t delay_ms, priority_type priority, Func &&func, Args &&... args)
提交延迟任务
bool start(size_t init_thread_size=3)
启动线程池
bool set_steal_mode(steal_strategy strategy, uint32_t steal_batch=4) noexcept
设置窃取策略
static timestamp now() noexcept
获取当前时间戳
pair< iterator, bool > emplace(Args &&... args)
在unordered_map中就地构造元素
size_type size() const noexcept
获取元素数量
constexpr void reserve(const size_type n)
预留容量
constexpr size_type size() const noexcept
获取当前元素数量
constexpr void emplace_back(Args &&... args)
在末尾构造元素
constexpr T && forward(remove_reference_t< T > &x) noexcept
完美转发左值
enable_if_t< is_void_v< T >, future_result_t< T > > get(future< T > &f)
通用future结果获取函数
unsigned char uint8_t
8位无符号整数类型
unsigned int uint32_t
32位无符号整数类型
long long int64_t
64位有符号整数类型
unsigned long long uint64_t
64位无符号整数类型
constexpr iter_difference_t< Iterator > count(Iterator first, Iterator last, const T &value)
统计范围内等于指定值的元素数量
duration< int64_t, milli > milliseconds
毫秒持续时间
duration< int64_t > seconds
秒持续时间
constexpr bool is_invocable_v
is_invocable的便捷变量模板
typename inner::__invoke_result_aux< F, Args... >::type invoke_result_t
invoke_result的便捷别名
constexpr auto memory_order_release
释放内存顺序常量
constexpr auto memory_order_relaxed
宽松内存顺序常量
constexpr auto memory_order_acquire
获取内存顺序常量
enable_if_t<!is_unbounded_array_v< T > &&is_constructible_v< T, Args... >, shared_ptr< T > > make_shared(Args &&... args)
融合分配创建共享指针
constexpr Iterator2 move(Iterator1 first, Iterator1 last, Iterator2 result) noexcept(noexcept(inner::__move_aux(first, last, result)))
移动范围元素
int priority() noexcept
获取线程优先级
worker_context *& get_worker_context() noexcept
获取当前线程的工作线程上下文
shared_ptr< task_group > & get_current_task_group() noexcept
获取当前线程的任务组
constexpr tuple< unwrap_ref_decay_t< Types >... > make_tuple(Types &&... args)
从参数创建元组
constexpr decltype(auto) apply(Func &&f, Tuple &&t) noexcept(inner::__apply_unpack_tuple< _NEFORCE is_nothrow_invocable, Func, Tuple >::value)
将元组元素解包作为参数调用函数
constexpr decltype(auto) size(const Container &cont) noexcept(noexcept(cont.size()))
获取容器的大小
constexpr unique_ptr< T > make_unique(Args &&... args)
创建unique_ptr
T load(const memory_order mo=memory_order_seq_cst) const noexcept
原子加载操作
void store(T value, const memory_order mo=memory_order_seq_cst) noexcept
原子存储操作
const char * what() const noexcept
获取错误信息
static time_point now() noexcept
获取当前时间点
_NEFORCE future< T > future
任务的future
shared_ptr< _NEFORCE task_info > task_info
任务信息
void increment() noexcept
增加运行计数
atomic< size_t > running_count
正在运行的任务计数
void wait() const noexcept
等待组内所有任务完成
void decrement() noexcept
减少运行计数
timestamp submit_time
提交时间
timestamp start_time
开始执行时间
bool is_finished() const noexcept
检查任务是否已完成
task_info(const uint64_t task_id, const priority_type priority)
构造函数
int64_t exec_time() const noexcept
获取任务执行时间
uint32_t worker_thread_id
执行任务的线程ID
priority_type priority
任务优先级
timestamp finish_time
完成时间
atomic< bool > cancelled
是否已取消
size_t total_completed
总完成任务数
size_t total_stolen
总窃取任务数
string to_string() const
转换为字符串
size_t total_submitted
总提交任务数
atomic< bool > is_stealing
是否正在执行窃取操作
size_t consecutive_idle_count
连续空闲次数