NexusForce 1.0.0
A Modern C++ Library with extended functionality, web components, and utility libraries
载入中...
搜索中...
未找到
virtual_thread.hpp
浏览该文件的文档.
1#ifndef NEFORCE_CORE_ASYNC_VIRTUAL_THREAD_HPP__
2#define NEFORCE_CORE_ASYNC_VIRTUAL_THREAD_HPP__
3
15
17#ifdef NEFORCE_STANDARD_20
26NEFORCE_BEGIN_NAMESPACE__
27
33
38template <typename T = void>
40
45template <typename T>
47
49template <typename T>
52
56template <typename T>
58
59
60NEFORCE_BEGIN_INNER__
61
62struct task_shared_state_base {
63 atomic<bool> scheduled_{false};
64};
65
66template <typename T>
67struct task_shared_state : task_shared_state_base {
68 aligned_buffer<T> result_buffer_;
69 bool has_value_{false};
70 exception_ptr exception_{nullptr};
71
72 atomic<coroutine_handle<>> continuation_{nullptr};
73 atomic<bool> completed_{false};
74 mutex mtx_;
75 condition_variable cv_;
76 atomic<unsigned long> ref_count_{1};
77
78 void add_ref() noexcept { ref_count_.fetch_add(1, memory_order_relaxed); }
79
80 void release() noexcept {
81 if (ref_count_.fetch_sub(1, memory_order_acq_rel) == 1) {
82 if (has_value_) {
83 result_buffer_.ptr()->~T();
84 }
85 delete this;
86 }
87 }
88
89 static task_shared_state* create() { return new task_shared_state(); }
90};
91
92template <>
93struct task_shared_state<void> : task_shared_state_base {
94 exception_ptr exception_{nullptr};
95
96 atomic<coroutine_handle<>> continuation_{nullptr};
97 atomic<bool> completed_{false};
98 mutex mtx_;
99 condition_variable cv_;
100 atomic<unsigned long> ref_count_{1};
101
102 void add_ref() noexcept { ref_count_.fetch_add(1, memory_order_relaxed); }
103
104 void release() noexcept {
105 if (ref_count_.fetch_sub(1, memory_order_acq_rel) == 1) {
106 delete this;
107 }
108 }
109
110 static task_shared_state* create() { return new task_shared_state(); }
111};
112
113NEFORCE_END_INNER__
114
115
124private:
125 queue<coroutine_handle<>> task_queue_;
126 vector<thread> workers_;
127 mutex mutex_;
129 atomic<bool> shutdown_{false};
130
131public:
137 static virtual_thread_scheduler instance;
138 return instance;
139 }
140
146 {
147 lock<mutex> lock(mutex_);
148 task_queue_.push(handle);
149 }
150 cv_.notify_one();
151 }
152
157 void start_workers(size_t num_threads) {
158 for (size_t i = 0; i < num_threads; ++i) {
159 workers_.emplace_back([this] { worker_loop(); });
160 }
161 }
162
168 void shutdown() {
169 {
170 lock<mutex> lock(mutex_);
171 shutdown_ = true;
172 }
173 cv_.notify_all();
174
175 for (auto& worker: workers_) {
176 if (worker.joinable()) {
177 worker.join();
178 }
179 }
180 }
181
186
187private:
194 void worker_loop() {
195 while (true) {
197
198 {
199 unique_lock<mutex> lock(mutex_);
200 cv_.wait(lock, [this] { return shutdown_ || !task_queue_.empty(); });
201
202 if (shutdown_ && task_queue_.empty()) {
203 return;
204 }
205
206 if (!task_queue_.empty()) {
207 handle = task_queue_.front();
208 task_queue_.pop();
209 }
210 }
211
212 if (handle) {
213 handle.resume();
214 }
215 }
216 }
217};
218
219
220NEFORCE_BEGIN_INNER__
221
222struct yield_tag {};
223
224struct sleep_tag {
225 int64_t ms_;
226};
227
228void mark_continuation_scheduled(coroutine_handle<> cont);
229
230NEFORCE_END_INNER__
231
232
239template <>
245 inner::task_shared_state<void>* shared_state_{inner::task_shared_state<void>::create()};
246
255
259 suspend_never initial_suspend() noexcept { return {}; }
260
267 auto final_suspend() noexcept {
268 struct final_awaiter {
269 bool await_ready() noexcept { return false; }
270
272 auto& p = h.promise();
273 auto* state = p.shared_state_;
274
275 state->completed_.store(true, memory_order_release);
276 state->cv_.notify_all();
277
278 auto cont = state->continuation_.exchange(nullptr, memory_order_acq_rel);
279 if (cont) {
280 try {
281 inner::mark_continuation_scheduled(cont);
283 // NOLINTNEXTLINE(bugprone-empty-catch)
284 } catch (...) {
285 // ignore
286 }
287 }
288 return false;
289 }
290
291 void await_resume() noexcept {}
292 };
293 return final_awaiter{};
294 }
295
300 auto await_transform(inner::yield_tag /*unused*/) {
301 struct yield_awaiter {
302 inner::task_shared_state<void>* shared_state_;
303
304 NEFORCE_NODISCARD bool await_ready() const noexcept { return false; }
305
307 shared_state_->scheduled_.store(true, memory_order_release);
309 }
310
311 void await_resume() const noexcept {}
312 };
313 return yield_awaiter{shared_state_};
314 }
315
321 auto await_transform(inner::sleep_tag tag) {
322 struct sleep_awaiter_impl {
323 inner::task_shared_state<void>* shared_state_;
324 int64_t ms_;
325
326 NEFORCE_NODISCARD bool await_ready() const noexcept { return ms_ <= 0; }
327
329 shared_state_->scheduled_.store(true, memory_order_release);
330 thread([handle, ms = ms_] {
331 this_thread::sleep_for(milliseconds(ms));
333 }).detach();
334 }
335
336 void await_resume() const noexcept {}
337 };
338 return sleep_awaiter_impl{shared_state_, tag.ms_};
339 }
340
344 template <typename Awaiter>
345 decltype(auto) await_transform(Awaiter&& a) {
346 return _NEFORCE forward<Awaiter>(a);
347 }
348
352 void return_void() {}
353
357 void unhandled_exception() { shared_state_->exception_ = _NEFORCE current_exception(); }
358
363 if (shared_state_ != nullptr) {
364 shared_state_->release();
365 }
366 }
367 };
368
370 inner::task_shared_state<void>* shared_state_{nullptr};
371
376
382 handle_(h) {
383 if (handle_) {
384 shared_state_ = handle_.promise().shared_state_;
385 if (shared_state_ != nullptr) {
386 shared_state_->add_ref();
387 }
388 }
389 }
390
397 if (handle_) {
398 bool completed = shared_state_ != nullptr && shared_state_->completed_.load(memory_order_acquire);
399 bool was_scheduled = shared_state_ != nullptr && shared_state_->scheduled_.load(memory_order_acquire);
400 if (!completed && !was_scheduled) {
401 handle_.destroy();
402 }
403 }
404 if (shared_state_ != nullptr) {
405 shared_state_->release();
406 }
407 }
408
410 virtual_thread_task& operator=(const virtual_thread_task&) = delete;
411
416 handle_(_NEFORCE exchange(other.handle_, nullptr)),
417 shared_state_(_NEFORCE exchange(other.shared_state_, nullptr)) {}
418
423 if (addressof(other) == this) {
424 return *this;
425 }
426 if (handle_) {
427 bool completed = shared_state_ != nullptr && shared_state_->completed_.load(memory_order_acquire);
428 bool was_scheduled = shared_state_ != nullptr && shared_state_->scheduled_.load(memory_order_acquire);
429 if (!completed && !was_scheduled) {
430 handle_.destroy();
431 }
432 }
433 if (shared_state_ != nullptr) {
434 shared_state_->release();
435 }
436 handle_ = _NEFORCE exchange(other.handle_, nullptr);
437 shared_state_ = _NEFORCE exchange(other.shared_state_, nullptr);
438 return *this;
439 }
440
445 bool await_ready() noexcept {
446 return shared_state_ != nullptr && shared_state_->completed_.load(memory_order_acquire);
447 }
448
454 bool await_suspend(coroutine_handle<> caller) noexcept {
455 shared_state_->continuation_.store(caller, memory_order_release);
456
457 if (shared_state_->completed_.load(memory_order_acquire)) {
458 if (shared_state_->continuation_.exchange(nullptr, memory_order_acq_rel)) {
459 return false;
460 }
461 }
462 return true;
463 }
464
469 if (shared_state_->exception_) {
470 rethrow_exception(shared_state_->exception_);
471 }
472 }
473
480 void get_result() {
481 if (!shared_state_->completed_.load(memory_order_acquire)) {
483 shared_state_->cv_.wait(lock, [this] { return shared_state_->completed_.load(memory_order_acquire); });
484 }
485 if (shared_state_->exception_) {
486 rethrow_exception(shared_state_->exception_);
487 }
488 }
489
493 NEFORCE_NODISCARD bool is_done() const noexcept {
494 return shared_state_ != nullptr && shared_state_->completed_.load(memory_order_acquire);
495 }
496
500 NEFORCE_NODISCARD bool valid() const noexcept { return shared_state_ != nullptr; }
501};
502
503
504NEFORCE_BEGIN_INNER__
505
506inline void mark_continuation_scheduled(coroutine_handle<> cont) {
507 auto vh = coroutine_handle<virtual_thread_task<void>::promise_type>::from_address(cont.address());
508 vh.promise().shared_state_->scheduled_.store(true, memory_order_release);
509}
510
511NEFORCE_END_INNER__
512
513
521template <typename T>
527 inner::task_shared_state<T>* shared_state_{inner::task_shared_state<T>::create()};
528
536
540 suspend_never initial_suspend() noexcept { return {}; }
541
547 auto final_suspend() noexcept {
548 struct final_awaiter {
549 bool await_ready() noexcept { return false; }
550
552 auto& p = h.promise();
553 auto* state = p.shared_state_;
554
555 state->completed_.store(true, memory_order_release);
556 state->cv_.notify_all();
557
558 auto cont = state->continuation_.exchange(nullptr, memory_order_acq_rel);
559 if (cont) {
560 try {
561 inner::mark_continuation_scheduled(cont);
563 // NOLINTNEXTLINE(bugprone-empty-catch)
564 } catch (...) {
565 // ignore
566 }
567 }
568 return false;
569 }
570
571 void await_resume() noexcept {}
572 };
573 return final_awaiter{};
574 }
575
579 auto await_transform(inner::yield_tag /*unused*/) {
580 struct yield_awaiter {
581 inner::task_shared_state<T>* shared_state_;
582
583 NEFORCE_NODISCARD bool await_ready() const noexcept { return false; }
584
586 shared_state_->scheduled_.store(true, memory_order_release);
588 }
589
590 void await_resume() const noexcept {}
591 };
592 return yield_awaiter{shared_state_};
593 }
594
599 auto await_transform(inner::sleep_tag tag) {
600 struct sleep_awaiter_impl {
601 inner::task_shared_state<T>* shared_state_;
602 int64_t ms_;
603
604 NEFORCE_NODISCARD bool await_ready() const noexcept { return ms_ <= 0; }
605
607 shared_state_->scheduled_.store(true, memory_order_release);
608 thread([handle, ms = ms_] {
609 this_thread::sleep_for(milliseconds(ms));
611 }).detach();
612 }
613
614 void await_resume() const noexcept {}
615 };
616 return sleep_awaiter_impl{shared_state_, tag.ms_};
617 }
618
622 void return_value(const T& value) {
623 ::new (shared_state_->result_buffer_.addr()) T(value);
624 shared_state_->has_value_ = true;
625 }
626
630 void return_value(T&& value) {
631 ::new (shared_state_->result_buffer_.addr()) T(move(value));
632 shared_state_->has_value_ = true;
633 }
634
638 template <typename Awaiter>
639 decltype(auto) await_transform(Awaiter&& a) {
640 return _NEFORCE forward<Awaiter>(a);
641 }
642
646 void unhandled_exception() { shared_state_->exception_ = _NEFORCE current_exception(); }
647
652 if (shared_state_) {
653 shared_state_->release();
654 }
655 }
656 };
657
659 inner::task_shared_state<T>* shared_state_{nullptr};
660
665
670 handle_(h) {
671 if (handle_) {
672 shared_state_ = handle_.promise().shared_state_;
673 if (shared_state_) {
674 shared_state_->add_ref();
675 }
676 }
677 }
678
683 if (handle_) {
684 bool completed = shared_state_ && shared_state_->completed_.load(memory_order_acquire);
685 bool was_scheduled = shared_state_ && shared_state_->scheduled_.load(memory_order_acquire);
686 if (!completed && !was_scheduled) {
687 handle_.destroy();
688 }
689 }
690 if (shared_state_) {
691 shared_state_->release();
692 }
693 }
694
696 virtual_thread_task& operator=(const virtual_thread_task&) = delete;
697
702 handle_(_NEFORCE exchange(other.handle_, nullptr)),
703 shared_state_(_NEFORCE exchange(other.shared_state_, nullptr)) {}
704
709 if (addressof(other) == this) {
710 return *this;
711 }
712 if (handle_) {
713 bool completed = shared_state_ && shared_state_->completed_.load(memory_order_acquire);
714 bool was_scheduled = shared_state_ && shared_state_->scheduled_.load(memory_order_acquire);
715 if (!completed && !was_scheduled) {
716 handle_.destroy();
717 }
718 }
719 if (shared_state_) {
720 shared_state_->release();
721 }
722 handle_ = _NEFORCE exchange(other.handle_, nullptr);
723 shared_state_ = _NEFORCE exchange(other.shared_state_, nullptr);
724 return *this;
725 }
726
731 bool await_ready() noexcept { return shared_state_ && shared_state_->completed_.load(memory_order_acquire); }
732
736 bool await_suspend(coroutine_handle<> caller) noexcept {
737 shared_state_->continuation_.store(caller, memory_order_release);
738
739 if (shared_state_->completed_.load(memory_order_acquire)) {
740 if (shared_state_->continuation_.exchange(nullptr, memory_order_acq_rel)) {
741 return false;
742 }
743 }
744 return true;
745 }
746
752 if (shared_state_->exception_) {
753 rethrow_exception(shared_state_->exception_);
754 }
755 return move(*shared_state_->result_buffer_.ptr());
756 }
757
765 if (!shared_state_->completed_.load(memory_order_acquire)) {
767 shared_state_->cv_.wait(lock, [this] { return shared_state_->completed_.load(memory_order_acquire); });
768 }
769 if (shared_state_->exception_) {
770 rethrow_exception(shared_state_->exception_);
771 }
772 return move(*shared_state_->result_buffer_.ptr());
773 }
774
778 NEFORCE_NODISCARD bool is_done() const noexcept {
779 return shared_state_ && shared_state_->completed_.load(memory_order_acquire);
780 }
781
785 NEFORCE_NODISCARD bool valid() const noexcept { return shared_state_ != nullptr; }
786};
787
796private:
803 template <typename Func>
804 static virtual_thread_task<void> create_task(Func func) {
805 func();
806 co_return;
807 }
808
809public:
816 template <typename Func>
817 static auto start(Func&& func) {
818 using result_type = invoke_result_t<decay_t<Func>>;
820 return _NEFORCE forward<Func>(func)();
821 } else {
822 return create_task(_NEFORCE forward<Func>(func));
823 }
824 }
825
829 static inner::yield_tag yield() { return inner::yield_tag{}; }
830
835 static inner::sleep_tag sleep(const int64_t ms) { return inner::sleep_tag{ms}; }
836
841 static void initialize(size_t num_threads) { virtual_thread_scheduler::get_instance().start_workers(num_threads); }
842
847};
848 // VirtualThread
850
851NEFORCE_END_NAMESPACE__
852#endif
853#endif // NEFORCE_CORE_ASYNC_VIRTUAL_THREAD_HPP__
对齐缓冲区实现
原子类型完整实现
void wait(unique_lock< mutex > &lock)
无限期等待
锁管理器模板
非递归互斥锁
队列容器适配器
bool empty() const noexcept(noexcept(seq_.empty()))
检查队列是否为空
异步任务
线程类
独占锁管理器模板
动态大小数组容器
~virtual_thread_scheduler()
析构函数,自动调用 shutdown()
void start_workers(size_t num_threads)
启动指定数量的工作线程
static virtual_thread_scheduler & get_instance()
获取调度器单例实例
void schedule(coroutine_handle<> handle)
将协程加入调度队列
虚拟线程用户门面类
static inner::yield_tag yield()
创建 yield 标记,用于 co_await 让出执行权
static auto start(Func &&func)
启动异步任务
static void shutdown()
关闭调度器
static void initialize(size_t num_threads)
初始化调度器并启动工作线程
static inner::sleep_tag sleep(const int64_t ms)
创建 sleep 标记,用于 co_await 休眠
条件变量行为
协程支持
异常指针实现
constexpr T && forward(remove_reference_t< T > &x) noexcept
完美转发左值
constexpr T * addressof(T &x) noexcept
获取对象的地址
long long int64_t
64位有符号整数类型
std::coroutine_handle< Promise > coroutine_handle
协程句柄
duration< int64_t, milli > milliseconds
毫秒持续时间
NEFORCE_NORETURN void rethrow_exception(const exception_ptr &p)
重新抛出异常
exception_ptr current_exception() noexcept
获取当前异常
typename inner::__invoke_result_aux< F, Args... >::type invoke_result_t
invoke_result的便捷别名
constexpr auto memory_order_release
释放内存顺序常量
constexpr auto memory_order_relaxed
宽松内存顺序常量
constexpr auto memory_order_acquire
获取内存顺序常量
constexpr auto memory_order_acq_rel
获取-释放内存顺序常量
constexpr Iterator2 move(Iterator1 first, Iterator1 last, Iterator2 result) noexcept(noexcept(inner::__move_aux(first, last, result)))
移动范围元素
constexpr T exchange(T &val, U &&new_val) noexcept(is_nothrow_move_constructible_v< T > &&is_nothrow_assignable_v< T &, U >)
将新值赋给对象并返回旧值
thread::native_handle_type handle() noexcept
获取当前线程句柄
bool_constant< true > true_type
表示true的类型
bool_constant< false > false_type
表示false的类型
constexpr bool is_virtual_thread_task_v
is_virtual_thread_task 的便捷变量模板
互斥锁
队列容器适配器
通用原子类型模板
判断类型是否为 virtual_thread_task 特化
从不暂停的等待器
协程 promise_type,管理任务生命周期与返回值存储
suspend_never initial_suspend() noexcept
初始挂起点 — 不暂停
~promise_type()
析构时释放共享状态的引用
void return_value(T &&value)
co_return 值,移动存储
void unhandled_exception()
未处理异常的捕获入口
decltype(auto) await_transform(Awaiter &&a)
通用 await_transform,透传自定义等待器
void return_value(const T &value)
co_return 值,拷贝存储
auto final_suspend() noexcept
最终挂起点
auto await_transform(inner::sleep_tag tag)
处理 co_await sleep
virtual_thread_task get_return_object()
创建返回给调用者的任务对象
auto await_transform(inner::yield_tag)
处理 co_await yield
协程 promise_type,管理任务生命周期与状态
auto await_transform(inner::yield_tag)
处理 co_await yield
auto await_transform(inner::sleep_tag tag)
处理 co_await sleep
decltype(auto) await_transform(Awaiter &&a)
通用 await_transform
~promise_type()
析构时释放共享状态的引用
suspend_never initial_suspend() noexcept
初始挂起点
void unhandled_exception()
未处理异常的捕获入口
virtual_thread_task get_return_object()
创建返回给调用者的任务对象
void await_resume()
co_await 恢复时检查异常
virtual_thread_task & operator=(virtual_thread_task &&other) noexcept
移动赋值运算符
virtual_thread_task(coroutine_handle< promise_type > h)
从协程句柄构造任务
coroutine_handle< promise_type > handle_
协程句柄
bool is_done() const noexcept
检查任务是否已完成
void get_result()
阻塞等待任务完成并获取结果
bool await_ready() noexcept
co_await 就绪检查
bool await_suspend(coroutine_handle<> caller) noexcept
co_await 挂起时注册 continuation
inner::task_shared_state< void > * shared_state_
共享状态指针
virtual_thread_task(virtual_thread_task &&other) noexcept
移动构造函数
bool valid() const noexcept
检查任务是否关联有效共享状态
virtual_thread_task()=default
默认构造,创建空任务
异步任务主模板
bool is_done() const noexcept
检查任务是否已完成
coroutine_handle< promise_type > handle_
协程句柄
virtual_thread_task()=default
默认构造,创建空任务
T await_resume()
co_await 恢复时返回结果或抛出异常
bool await_ready() noexcept
co_await 就绪检查
bool await_suspend(coroutine_handle<> caller) noexcept
co_await 挂起时注册 continuation
virtual_thread_task(coroutine_handle< promise_type > h)
从协程句柄构造任务
bool valid() const noexcept
检查任务是否关联有效共享状态
virtual_thread_task & operator=(virtual_thread_task &&other) noexcept
移动赋值运算符
virtual_thread_task(virtual_thread_task &&other) noexcept
移动构造函数
~virtual_thread_task()
析构函数 — 清理未调度的帧,释放共享状态引用
T get_result()
阻塞获取任务结果
inner::task_shared_state< T > * shared_state_
共享状态指针
线程管理类
动态大小数组容器