1#ifndef NEFORCE_CORE_ASYNC_THREAD_POOL_HPP__
2#define NEFORCE_CORE_ASYNC_THREAD_POOL_HPP__
18NEFORCE_BEGIN_NAMESPACE__
35class NEFORCE_API manual_thread {
40 using thread_func = function<void(id_type)>;
46 explicit manual_thread(thread_func func)
noexcept;
47 ~manual_thread() =
default;
49 NEFORCE_NODISCARD id_type
id() const noexcept {
return thread_id_; }
65 task_group() =
default;
66 ~task_group() =
default;
89 void wait() const noexcept {
105class NEFORCE_API local_queue {
126 atomic<uint32_t> tail_{0};
129 constexpr static size_t mask_ = queue_size - 1;
132 return static_cast<uint64_t>(steal) << 32 |
static_cast<uint64_t>(local_head);
135 NEFORCE_NODISCARD
static pair<uint32_t, uint32_t> unpack(
const uint64_t head)
noexcept {
142 local_queue() =
default;
143 ~local_queue() =
default;
144 local_queue(
const local_queue&) =
delete;
145 local_queue& operator=(
const local_queue&) =
delete;
146 local_queue(local_queue&& other)
noexcept;
147 local_queue& operator=(local_queue&& other)
noexcept;
153 NEFORCE_NODISCARD
size_t capacity() const noexcept {
return tasks_.size(); }
159 NEFORCE_NODISCARD
bool empty() const noexcept {
return size() == 0u; }
168 const auto steal = unpack(head).first;
169 const size_t used =
static_cast<size_t>(tail - steal);
170 const size_t remain =
capacity() - used;
178 NEFORCE_NODISCARD
size_t size() const noexcept {
181 const auto local_head = unpack(head).second;
182 return static_cast<size_t>(tail - local_head);
191 steal_strategy_ = strategy;
192 fixed_batch_size_ = batch_size;
225struct NEFORCE_API worker_context {
226 using id_type = inner::manual_thread::id_type;
233 worker_context() =
default;
234 worker_context(
const worker_context&) =
delete;
235 worker_context& operator=(
const worker_context&) =
delete;
236 worker_context(worker_context&& other)
noexcept;
237 worker_context& operator=(worker_context&& other)
noexcept;
259 enum class priority_type :
uint32_t {
317 NEFORCE_NODISCARD
explicit operator bool() const noexcept {
return future.valid() &&
task_info; }
372 using id_type = inner::manual_thread::id_type;
379 static size_t max_thread_threshhold();
388 struct priority_task {
396 info(_NEFORCE
move(info)) {}
398 bool operator<(
const priority_task& other)
const noexcept {
return priority < other.priority; }
401 unordered_map<id_type, unique_ptr<inner::manual_thread>> threads_map_;
402 unordered_map<id_type, worker_context> worker_contexts_;
403 vector<atomic<worker_context*>> worker_contexts_ptr_;
404 mutex worker_contexts_mtx_;
406 timer_scheduler<steady_clock> timer_{};
408 id_type init_thread_size_{0};
409 size_t thread_threshhold_;
411 priority_queue<priority_task> task_queue_{};
412 atomic<uint32_t> task_size_{0};
413 atomic<uint32_t> idle_thread_size_{0};
414 size_t task_threshhold_{task_max_threshhold};
416 mutable mutex task_queue_mtx_{};
417 condition_variable not_full_{};
418 condition_variable not_empty_{};
419 condition_variable exit_cond_{};
421 atomic<pool_mode> pool_mode_{pool_mode::fixed};
422 atomic<bool> is_running_{
false};
424 atomic<size_t> total_submitted_tasks_{0};
425 atomic<size_t> total_completed_tasks_{0};
426 atomic<size_t> total_stolen_tasks_{0};
427 atomic<size_t> steal_worker_count_{0};
428 atomic<uint64_t> next_task_id_{0};
433 void thread_function(id_type thread_id);
434 optional<task_type> try_steal_task(worker_context& ctx);
436 pool_statistics statistics_unsafe()
const;
488 NEFORCE_NODISCARD
bool running() const noexcept {
return is_running_; }
507 bool start(
size_t init_thread_size = 3);
524 template <
typename Func,
typename... Args>
535 template <
typename Func,
typename... Args>
551 template <
typename Func,
typename... Args>
564 template <
typename Func,
typename... Args>
580 template <
typename Func,
typename... Args>
592 template <
typename Func,
typename... Args>
604 token->cancelled.store(
true);
614 template <
typename... Types>
635template <typename Func, typename... Args>
638 static_assert(
is_invocable_v<Func, Args...>,
"Func must be invocable with Args");
646 current_group->increment();
651 group = current_group, info]()
mutable -> Result {
652 struct context_guard {
659 group_inner(
move(g)) {
668 ~context_guard()
noexcept {
677 group_inner->decrement();
686 context_guard guard(info, group);
688 return _NEFORCE
apply(func, args);
691 info->error = e.
what();
695 info->error =
"Unknown exception";
701 task_type job([
task] { (*task)(); });
708 info->error =
"Task queue is full";
710 auto dummy_task = _NEFORCE
make_shared<packaged_task<Result()>>([]() -> Result {
return Result(); });
717 ++total_submitted_tasks_;
723 if (ctx !=
nullptr && ctx->queue.remain_size() > 0) {
724 ctx->queue.push_back(
move(job));
725 ++total_submitted_tasks_;
729 [&]() ->
bool {
return task_queue_.
size() < task_threshhold_; })) {
731 info->error =
"Task queue is full";
733 auto dummy_task = _NEFORCE
make_shared<packaged_task<Result()>>([]() -> Result {
return Result(); });
738 task_queue_.
emplace(
move(job),
static_cast<priority_type
>(0), info);
740 ++total_submitted_tasks_;
745 if (pool_mode_.
load() == pool_mode::cached && task_size_.
load() > idle_thread_size_) {
747 inner::manual_thread* t_ptr =
nullptr;
748 id_type thread_id = 0;
752 if (threads_map_.
size() < thread_threshhold_) {
756 thread_id = ptr->id();
762 if (t_ptr !=
nullptr) {
765 if (thread_id >= worker_contexts_ptr_.
size()) {
766 worker_contexts_ptr_.
reserve(thread_id + 1);
767 for (
size_t i = worker_contexts_ptr_.
size(); i <= thread_id; i++) {
782template <
typename Func,
typename... Args>
785 static_assert(
is_invocable_v<Func, Args...>,
"Func must be invocable with Args");
791 auto task = _NEFORCE
make_shared<packaged_task<Result()>>(
794 struct context_guard {
795 shared_ptr<task_info> info;
797 explicit context_guard(shared_ptr<task_info> i) :
804 ~context_guard() noexcept {
812 context_guard guard(info);
815 return _NEFORCE
apply(func, tup);
816 }
catch (
const exception& e) {
818 info->error = e.
what();
823 future<Result> res = task->get_future();
826 timer_.add_task(expire_time, [
this, task = _NEFORCE
move(task),
priority]()
mutable {
830 return submit_result<Result>{_NEFORCE
move(res), info};
833template <
typename Func,
typename... Args>
837 auto task = _NEFORCE
make_shared<function<void()>>(
841 *handler_ptr = [
this, state, task, interval_ms,
priority, handler_ptr]() {
842 if (state->cancelled.load()) {
848 if (state->cancelled.load()) {
852 timer_.add_task(next_time, [handler_ptr]() { (*handler_ptr)(); });
856 timer_.add_task(first_time, [handler_ptr]() { (*handler_ptr)(); });
866NEFORCE_END_NAMESPACE__
void notify_one() noexcept
通知一个等待线程
cv_status wait_for(unique_lock< mutex > &lock, const duration< Rep, Period > &rest)
等待指定的持续时间
NEFORCE_NODISCARD size_t size() const noexcept
获取队列当前大小
optional< function< void()> > try_pop()
从队列头部弹出任务
NEFORCE_NODISCARD size_t capacity() const noexcept
获取队列容量
static constexpr size_t queue_size
队列容量
static void set_steal_strategy(const steal_strategy strategy, const uint32_t batch_size=4)
设置窃取策略
void push_back(function< void()> task)
推送任务到队列尾部
NEFORCE_NODISCARD bool empty() const noexcept
检查队列是否为空
optional< function< void()> > be_stolen_by(local_queue &dst_queue)
被其他队列窃取任务
NEFORCE_NODISCARD size_t remain_size() const noexcept
获取剩余容量
static NEFORCE_NODISCARD constexpr T max() noexcept
获取类型的最大值
NEFORCE_NODISCARD size_type size() const noexcept(noexcept(_NEFORCE declval< Sequence >().size()))
获取优先队列大小
void emplace(Args &&... args)
在优先队列中就地构造元素
static tuple< future_result_t< Types >... > wait(future< Types > &&... futures)
等待多个future完成
task_info::priority_type priority_type
优先级类型别名
bool set_thread_threshhold(size_t threshhold) noexcept
设置线程数阈值
static constexpr size_t max_idle_seconds
最大空闲秒数
periodic_token submit_every(int64_t interval_ms, priority_type priority, Func &&func, Args &&... args)
提交周期性任务
submit_result< invoke_result_t< Func, Args... > > submit_task(Func &&func, Args &&... args)
提交任务(使用默认优先级0)
inner::manual_thread::id_type id_type
线程ID类型别名
pool_statistics stop()
停止线程池
NEFORCE_NODISCARD pool_statistics statistics() const
获取线程池统计信息
NEFORCE_NODISCARD pool_mode mode() const noexcept
获取线程池模式
submit_result< invoke_result_t< Func, Args... > > submit_after(int64_t delay_ms, Func &&func, Args &&... args)
提交延迟任务(使用默认优先级0)
periodic_token submit_every(int64_t interval_ms, Func &&func, Args &&... args)
提交周期性任务(使用默认优先级0)
static void cancel_periodic_task(const periodic_token &token)
取消周期性任务
submit_result< invoke_result_t< Func, Args... > > submit_task(priority_type priority, Func &&func, Args &&... args)
提交任务
shared_ptr< periodic_task_state > periodic_token
周期性任务令牌
NEFORCE_NODISCARD bool running() const noexcept
检查线程池是否正在运行
local_queue::steal_strategy steal_strategy
窃取策略类型别名
bool set_task_threshhold(size_t threshhold) noexcept
设置任务队列阈值
static constexpr size_t task_max_threshhold
最大任务队列阈值
bool set_mode(pool_mode mode) noexcept
设置线程池模式
submit_result< invoke_result_t< Func, Args... > > submit_after(int64_t delay_ms, priority_type priority, Func &&func, Args &&... args)
提交延迟任务
bool start(size_t init_thread_size=3)
启动线程池
bool set_steal_mode(steal_strategy strategy, uint32_t steal_batch=4) noexcept
设置窃取策略
static NEFORCE_NODISCARD timestamp now() noexcept
获取当前时间戳
pair< iterator, bool > emplace(Args &&... args)
在unordered_map中就地构造元素
NEFORCE_NODISCARD size_type size() const noexcept
获取元素数量
NEFORCE_CONSTEXPR20 void reserve(const size_type n)
预留容量
NEFORCE_NODISCARD NEFORCE_CONSTEXPR20 size_type size() const noexcept
获取当前元素数量
NEFORCE_CONSTEXPR20 void emplace_back(Args &&... args)
在末尾构造元素
NEFORCE_NODISCARD constexpr T && forward(remove_reference_t< T > &x) noexcept
完美转发左值
NEFORCE_ALWAYS_INLINE enable_if_t< is_void_v< T >, future_result_t< T > > get(future< T > &f)
通用future结果获取函数
unsigned char uint8_t
8位无符号整数类型
unsigned int uint32_t
32位无符号整数类型
long long int64_t
64位有符号整数类型
unsigned long long uint64_t
64位无符号整数类型
constexpr iter_difference_t< Iterator > count(Iterator first, Iterator last, const T &value)
统计范围内等于指定值的元素数量
duration< int64_t, milli > milliseconds
毫秒持续时间
duration< int64_t > seconds
秒持续时间
NEFORCE_INLINE17 constexpr bool is_invocable_v
is_invocable的便捷变量模板
typename inner::__invoke_result_aux< F, Args... >::type invoke_result_t
invoke_result的便捷别名
NEFORCE_INLINE17 constexpr auto memory_order_release
释放内存顺序常量
NEFORCE_INLINE17 constexpr auto memory_order_relaxed
宽松内存顺序常量
NEFORCE_INLINE17 constexpr auto memory_order_acquire
获取内存顺序常量
NEFORCE_NODISCARD constexpr bool operator<(const normal_iterator< LeftIter > &lhs, const normal_iterator< RightIter > &rhs) noexcept
小于比较运算符
enable_if_t<!is_unbounded_array_v< T > &&is_constructible_v< T, Args... >, shared_ptr< T > > make_shared(Args &&... args)
融合分配创建共享指针
constexpr Iterator2 move(Iterator1 first, Iterator1 last, Iterator2 result) noexcept(noexcept(inner::__move_aux(first, last, result)))
移动范围元素
bool NEFORCE_API priority(int priority) noexcept
设置线程优先级
NEFORCE_ALWAYS_INLINE_INLINE thread::id id() noexcept
获取当前线程标识符
NEFORCE_API shared_ptr< task_group > & get_current_task_group() noexcept
获取当前线程的任务组
NEFORCE_API worker_context *& get_worker_context() noexcept
获取当前线程的工作线程上下文
NEFORCE_NODISCARD constexpr tuple< unwrap_ref_decay_t< Types >... > make_tuple(Types &&... args)
从参数创建元组
constexpr decltype(auto) apply(Func &&f, Tuple &&t) noexcept(inner::__apply_unpack_tuple< _NEFORCE is_nothrow_invocable, Func, Tuple >::value)
将元组元素解包作为参数调用函数
NEFORCE_NODISCARD NEFORCE_ALWAYS_INLINE constexpr decltype(auto) size(const Container &cont) noexcept(noexcept(cont.size()))
获取容器的大小
NEFORCE_CONSTEXPR20 unique_ptr< T > make_unique(Args &&... args)
创建unique_ptr
T load(const memory_order mo=memory_order_seq_cst) const noexcept
原子加载操作
void store(T value, const memory_order mo=memory_order_seq_cst) noexcept
原子存储操作
NEFORCE_NODISCARD const char * what() const noexcept
获取错误信息
static time_point now() noexcept
获取当前时间点
_NEFORCE future< T > future
任务的future
shared_ptr< _NEFORCE task_info > task_info
任务信息
void increment() noexcept
增加运行计数
atomic< size_t > running_count
正在运行的任务计数
void wait() const noexcept
等待组内所有任务完成
void decrement() noexcept
减少运行计数
timestamp submit_time
提交时间
timestamp start_time
开始执行时间
NEFORCE_NODISCARD bool is_finished() const noexcept
检查任务是否已完成
inner::manual_thread::id_type worker_thread_id
执行任务的线程ID
task_info(const uint64_t task_id, const priority_type priority)
构造函数
NEFORCE_NODISCARD int64_t exec_time() const noexcept
获取任务执行时间
priority_type priority
任务优先级
timestamp finish_time
完成时间
atomic< bool > cancelled
是否已取消
NEFORCE_NODISCARD string to_string() const
转换为字符串
size_t total_completed
总完成任务数
size_t total_stolen
总窃取任务数
size_t total_submitted
总提交任务数
atomic< bool > is_stealing
是否正在执行窃取操作
inner::manual_thread::id_type id_type
线程ID类型
size_t consecutive_idle_count
连续空闲次数