1#ifndef NEFORCE_CORE_ASYNC_VIRTUAL_THREAD_HPP__
2#define NEFORCE_CORE_ASYNC_VIRTUAL_THREAD_HPP__
17#ifdef NEFORCE_STANDARD_20
26NEFORCE_BEGIN_NAMESPACE__
38template <
typename T =
void>
62struct task_shared_state_base {
67struct task_shared_state : task_shared_state_base {
68 aligned_buffer<T> result_buffer_;
69 bool has_value_{
false};
70 exception_ptr exception_{
nullptr};
72 atomic<coroutine_handle<>> continuation_{
nullptr};
73 atomic<bool> completed_{
false};
75 condition_variable cv_;
76 atomic<unsigned long> ref_count_{1};
80 void release() noexcept {
83 result_buffer_.ptr()->~T();
89 static task_shared_state* create() {
return new task_shared_state(); }
93struct task_shared_state<void> : task_shared_state_base {
94 exception_ptr exception_{
nullptr};
96 atomic<coroutine_handle<>> continuation_{
nullptr};
97 atomic<bool> completed_{
false};
99 condition_variable cv_;
100 atomic<unsigned long> ref_count_{1};
104 void release() noexcept {
110 static task_shared_state* create() {
return new task_shared_state(); }
158 for (
size_t i = 0; i < num_threads; ++i) {
159 workers_.emplace_back([
this] { worker_loop(); });
175 for (
auto& worker: workers_) {
176 if (worker.joinable()) {
200 cv_.
wait(
lock, [
this] {
return shutdown_ || !task_queue_.
empty(); });
202 if (shutdown_ && task_queue_.empty()) {
206 if (!task_queue_.empty()) {
207 handle = task_queue_.front();
245 inner::task_shared_state<void>* shared_state_{inner::task_shared_state<void>::create()};
268 struct final_awaiter {
272 auto& p = h.promise();
273 auto* state = p.shared_state_;
276 state->cv_.notify_all();
281 inner::mark_continuation_scheduled(cont);
293 return final_awaiter{};
301 struct yield_awaiter {
302 inner::task_shared_state<void>* shared_state_;
304 NEFORCE_NODISCARD
bool await_ready()
const noexcept {
return false; }
313 return yield_awaiter{shared_state_};
322 struct sleep_awaiter_impl {
323 inner::task_shared_state<void>* shared_state_;
326 NEFORCE_NODISCARD
bool await_ready()
const noexcept {
return ms_ <= 0; }
338 return sleep_awaiter_impl{shared_state_, tag.ms_};
344 template <
typename Awaiter>
363 if (shared_state_ !=
nullptr) {
364 shared_state_->release();
400 if (!completed && !was_scheduled) {
429 if (!completed && !was_scheduled) {
493 NEFORCE_NODISCARD
bool is_done() const noexcept {
527 inner::task_shared_state<T>* shared_state_{inner::task_shared_state<T>::create()};
548 struct final_awaiter {
552 auto& p = h.promise();
553 auto* state = p.shared_state_;
556 state->cv_.notify_all();
561 inner::mark_continuation_scheduled(cont);
573 return final_awaiter{};
580 struct yield_awaiter {
581 inner::task_shared_state<T>* shared_state_;
583 NEFORCE_NODISCARD
bool await_ready()
const noexcept {
return false; }
592 return yield_awaiter{shared_state_};
600 struct sleep_awaiter_impl {
601 inner::task_shared_state<T>* shared_state_;
604 NEFORCE_NODISCARD
bool await_ready()
const noexcept {
return ms_ <= 0; }
616 return sleep_awaiter_impl{shared_state_, tag.ms_};
623 ::new (shared_state_->result_buffer_.addr()) T(value);
624 shared_state_->has_value_ =
true;
631 ::new (shared_state_->result_buffer_.addr()) T(
move(value));
632 shared_state_->has_value_ =
true;
638 template <
typename Awaiter>
653 shared_state_->release();
686 if (!completed && !was_scheduled) {
715 if (!completed && !was_scheduled) {
778 NEFORCE_NODISCARD
bool is_done() const noexcept {
803 template <
typename Func>
816 template <
typename Func>
829 static inner::yield_tag
yield() {
return inner::yield_tag{}; }
835 static inner::sleep_tag
sleep(
const int64_t ms) {
return inner::sleep_tag{ms}; }
851NEFORCE_END_NAMESPACE__
void wait(unique_lock< mutex > &lock)
无限期等待
bool empty() const noexcept(noexcept(seq_.empty()))
检查队列是否为空
~virtual_thread_scheduler()
析构函数,自动调用 shutdown()
void start_workers(size_t num_threads)
启动指定数量的工作线程
static virtual_thread_scheduler & get_instance()
获取调度器单例实例
void schedule(coroutine_handle<> handle)
将协程加入调度队列
static inner::yield_tag yield()
创建 yield 标记,用于 co_await 让出执行权
static auto start(Func &&func)
启动异步任务
static void shutdown()
关闭调度器
static void initialize(size_t num_threads)
初始化调度器并启动工作线程
static inner::sleep_tag sleep(const int64_t ms)
创建 sleep 标记,用于 co_await 休眠
constexpr T && forward(remove_reference_t< T > &x) noexcept
完美转发左值
constexpr T * addressof(T &x) noexcept
获取对象的地址
long long int64_t
64位有符号整数类型
std::coroutine_handle< Promise > coroutine_handle
协程句柄
duration< int64_t, milli > milliseconds
毫秒持续时间
NEFORCE_NORETURN void rethrow_exception(const exception_ptr &p)
重新抛出异常
exception_ptr current_exception() noexcept
获取当前异常
typename inner::__invoke_result_aux< F, Args... >::type invoke_result_t
invoke_result的便捷别名
constexpr auto memory_order_release
释放内存顺序常量
constexpr auto memory_order_relaxed
宽松内存顺序常量
constexpr auto memory_order_acquire
获取内存顺序常量
constexpr auto memory_order_acq_rel
获取-释放内存顺序常量
constexpr Iterator2 move(Iterator1 first, Iterator1 last, Iterator2 result) noexcept(noexcept(inner::__move_aux(first, last, result)))
移动范围元素
constexpr T exchange(T &val, U &&new_val) noexcept(is_nothrow_move_constructible_v< T > &&is_nothrow_assignable_v< T &, U >)
将新值赋给对象并返回旧值
thread::native_handle_type handle() noexcept
获取当前线程句柄
bool_constant< true > true_type
表示true的类型
bool_constant< false > false_type
表示false的类型
constexpr bool is_virtual_thread_task_v
is_virtual_thread_task 的便捷变量模板
static constexpr bool value
判断类型是否为 virtual_thread_task 特化
协程 promise_type,管理任务生命周期与返回值存储
suspend_never initial_suspend() noexcept
初始挂起点 — 不暂停
~promise_type()
析构时释放共享状态的引用
void return_value(T &&value)
co_return 值,移动存储
void unhandled_exception()
未处理异常的捕获入口
decltype(auto) await_transform(Awaiter &&a)
通用 await_transform,透传自定义等待器
void return_value(const T &value)
co_return 值,拷贝存储
auto final_suspend() noexcept
最终挂起点
auto await_transform(inner::sleep_tag tag)
处理 co_await sleep
virtual_thread_task get_return_object()
创建返回给调用者的任务对象
auto await_transform(inner::yield_tag)
处理 co_await yield
协程 promise_type,管理任务生命周期与状态
auto final_suspend() noexcept
最终挂起点
void return_void()
co_return 无返回值
auto await_transform(inner::yield_tag)
处理 co_await yield
auto await_transform(inner::sleep_tag tag)
处理 co_await sleep
decltype(auto) await_transform(Awaiter &&a)
通用 await_transform
~promise_type()
析构时释放共享状态的引用
suspend_never initial_suspend() noexcept
初始挂起点
void unhandled_exception()
未处理异常的捕获入口
virtual_thread_task get_return_object()
创建返回给调用者的任务对象
void await_resume()
co_await 恢复时检查异常
virtual_thread_task & operator=(virtual_thread_task &&other) noexcept
移动赋值运算符
virtual_thread_task(coroutine_handle< promise_type > h)
从协程句柄构造任务
coroutine_handle< promise_type > handle_
协程句柄
bool is_done() const noexcept
检查任务是否已完成
void get_result()
阻塞等待任务完成并获取结果
bool await_ready() noexcept
co_await 就绪检查
bool await_suspend(coroutine_handle<> caller) noexcept
co_await 挂起时注册 continuation
inner::task_shared_state< void > * shared_state_
共享状态指针
virtual_thread_task(virtual_thread_task &&other) noexcept
移动构造函数
~virtual_thread_task()
析构函数
bool valid() const noexcept
检查任务是否关联有效共享状态
virtual_thread_task()=default
默认构造,创建空任务
bool is_done() const noexcept
检查任务是否已完成
coroutine_handle< promise_type > handle_
协程句柄
virtual_thread_task()=default
默认构造,创建空任务
T await_resume()
co_await 恢复时返回结果或抛出异常
bool await_ready() noexcept
co_await 就绪检查
bool await_suspend(coroutine_handle<> caller) noexcept
co_await 挂起时注册 continuation
virtual_thread_task(coroutine_handle< promise_type > h)
从协程句柄构造任务
bool valid() const noexcept
检查任务是否关联有效共享状态
virtual_thread_task & operator=(virtual_thread_task &&other) noexcept
移动赋值运算符
virtual_thread_task(virtual_thread_task &&other) noexcept
移动构造函数
~virtual_thread_task()
析构函数 — 清理未调度的帧,释放共享状态引用
inner::task_shared_state< T > * shared_state_
共享状态指针