1#ifndef MSTL_CORE_ASYNC_VIRTUAL_THREAD_HPP__
2#define MSTL_CORE_ASYNC_VIRTUAL_THREAD_HPP__
12#include "MSTL/core/container/queue.hpp"
16#ifdef MSTL_STANDARD_20__
32struct virtual_thread_task {
40 _MSTL exception_ptr exception_;
46 virtual_thread_task get_return_object() {
47 return virtual_thread_task{
48 std::coroutine_handle<promise_type>::from_promise(*
this)
56 std::suspend_never initial_suspend() {
57 return std::suspend_never{};
64 std::suspend_always final_suspend() noexcept {
65 return std::suspend_always{};
78 void unhandled_exception() {
83 std::coroutine_handle<promise_type> handle_;
89 virtual_thread_task(std::coroutine_handle<promise_type> h)
97 ~virtual_thread_task() {
98 if (handle_ && !handle_.done()) {
103 virtual_thread_task(
const virtual_thread_task&) =
delete;
104 virtual_thread_task& operator =(
const virtual_thread_task&) =
delete;
110 virtual_thread_task(virtual_thread_task&& other) noexcept
118 virtual_thread_task& operator =(virtual_thread_task&& other)
noexcept {
119 if (
this != &other) {
120 if (handle_ && !handle_.done()) {
135class virtual_thread_scheduler {
137 queue<std::coroutine_handle<>> task_queue_;
138 vector<thread> workers_;
140 condition_variable cv_;
146 virtual_thread_scheduler()
147 : shutdown_(false) {}
156 std::coroutine_handle<> handle;
160 cv_.
wait(lock, [
this] {
161 return shutdown_ || !task_queue_.empty();
164 if (shutdown_ && task_queue_.empty()) {
168 if (!task_queue_.empty()) {
169 handle = task_queue_.front();
185 static virtual_thread_scheduler& get_instance() {
186 static virtual_thread_scheduler instance;
196 void schedule(std::coroutine_handle<> handle) {
198 lock<mutex> lock(mutex_);
199 task_queue_.push(handle);
210 void start_workers(
size_t num_threads) {
211 for (
size_t i = 0; i < num_threads; ++i) {
212 workers_.emplace_back([
this] { worker_loop(); });
223 lock<mutex> lock(mutex_);
228 for (
auto& worker : workers_) {
229 if (worker.joinable()) {
240 ~virtual_thread_scheduler() {
251struct virtual_thread_awaiter {
252 std::coroutine_handle<> handle_;
258 bool await_ready() const noexcept {
268 void await_suspend(std::coroutine_handle<> handle) {
269 virtual_thread_scheduler::get_instance().schedule(handle);
275 void await_resume() const noexcept {}
286class virtual_thread {
288 optional<virtual_thread_task> task_;
298 template<
typename Func>
299 static virtual_thread_task create_task(Func func) {
300 co_await std::suspend_never{};
305 virtual_thread() =
default;
306 virtual_thread(
const virtual_thread&) =
delete;
307 virtual_thread& operator =(
const virtual_thread&) =
delete;
308 virtual_thread(virtual_thread&& other)
noexcept =
default;
309 virtual_thread& operator =(virtual_thread&& other)
noexcept =
default;
319 template <
typename Func>
320 static virtual_thread start(Func&& func) {
332 static virtual_thread_awaiter
yield() {
333 return virtual_thread_awaiter{};
343 static virtual_thread_task sleep(
const int64_t ms) {
355 virtual_thread_scheduler::get_instance().start_workers(num_threads);
363 static void shutdown() {
364 virtual_thread_scheduler::get_instance().shutdown();
void notify_all() noexcept
通知所有等待线程
void notify_one() noexcept
通知一个等待线程
void wait(smart_lock< mutex > &lock)
无限期等待
MSTL_NODISCARD constexpr T && forward(remove_reference_t< T > &x) noexcept
完美转发左值
atomic< bool > atomic_bool
布尔原子类型
long long int64_t
64位有符号整数类型
duration< int64_t, milli > milliseconds
毫秒持续时间
exception_ptr MSTL_API current_exception() noexcept
获取当前异常
lock< Mutex, true > smart_lock
智能锁管理器的便捷类型别名
#define _MSTL
全局命名空间MSTL前缀
#define MSTL_END_NAMESPACE__
结束全局命名空间MSTL
#define MSTL_BEGIN_NAMESPACE__
开始全局命名空间MSTL
MSTL_CONSTEXPR14 T exchange(T &val, U &&new_val) noexcept(conjunction< is_nothrow_move_constructible< T >, is_nothrow_assignable< T &, U > >::value)
将新值赋给对象并返回旧值
MSTL_ALWAYS_INLINE_INLINE void yield() noexcept
让出当前线程的时间片
constexpr T initialize() noexcept(is_nothrow_default_constructible< T >::value)
返回类型T的默认初始化值