MSTL 1.4.0
A Modern C++ Library with extended functionality, web components, and utility libraries
载入中...
搜索中...
未找到
virtual_thread.hpp
浏览该文件的文档.
1#ifndef MSTL_CORE_ASYNC_VIRTUAL_THREAD_HPP__
2#define MSTL_CORE_ASYNC_VIRTUAL_THREAD_HPP__
3
11
12#include "MSTL/core/container/queue.hpp"
16#ifdef MSTL_STANDARD_20__
17#include <coroutine>
19
25
32struct virtual_thread_task {
39 struct promise_type {
40 _MSTL exception_ptr exception_;
41
46 virtual_thread_task get_return_object() {
47 return virtual_thread_task{
48 std::coroutine_handle<promise_type>::from_promise(*this)
49 };
50 }
51
56 std::suspend_never initial_suspend() {
57 return std::suspend_never{};
58 }
59
64 std::suspend_always final_suspend() noexcept {
65 return std::suspend_always{};
66 }
67
71 void return_void() {}
72
78 void unhandled_exception() {
79 exception_ = _MSTL current_exception();
80 }
81 };
82
83 std::coroutine_handle<promise_type> handle_;
84
89 virtual_thread_task(std::coroutine_handle<promise_type> h)
90 : handle_(h) {}
91
97 ~virtual_thread_task() {
98 if (handle_ && !handle_.done()) {
99 handle_.destroy();
100 }
101 }
102
103 virtual_thread_task(const virtual_thread_task&) = delete;
104 virtual_thread_task& operator =(const virtual_thread_task&) = delete;
105
110 virtual_thread_task(virtual_thread_task&& other) noexcept
111 : handle_(_MSTL exchange(other.handle_, nullptr)) {}
112
118 virtual_thread_task& operator =(virtual_thread_task&& other) noexcept {
119 if (this != &other) {
120 if (handle_ && !handle_.done()) {
121 handle_.destroy();
122 }
123 handle_ = _MSTL exchange(other.handle_, nullptr);
124 }
125 return *this;
126 }
127};
128
135class virtual_thread_scheduler {
136private:
137 queue<std::coroutine_handle<>> task_queue_;
138 vector<thread> workers_;
139 mutex mutex_;
140 condition_variable cv_;
141 atomic_bool shutdown_;
142
146 virtual_thread_scheduler()
147 : shutdown_(false) {}
148
154 void worker_loop() {
155 while (true) {
156 std::coroutine_handle<> handle;
157
158 {
159 smart_lock<mutex> lock(mutex_);
160 cv_.wait(lock, [this] {
161 return shutdown_ || !task_queue_.empty();
162 });
163
164 if (shutdown_ && task_queue_.empty()) {
165 return;
166 }
167
168 if (!task_queue_.empty()) {
169 handle = task_queue_.front();
170 task_queue_.pop();
171 }
172 }
173
174 if (handle) {
175 handle.resume();
176 }
177 }
178 }
179
180public:
185 static virtual_thread_scheduler& get_instance() {
186 static virtual_thread_scheduler instance;
187 return instance;
188 }
189
196 void schedule(std::coroutine_handle<> handle) {
197 {
198 lock<mutex> lock(mutex_);
199 task_queue_.push(handle);
200 }
201 cv_.notify_one();
202 }
203
210 void start_workers(size_t num_threads) {
211 for (size_t i = 0; i < num_threads; ++i) {
212 workers_.emplace_back([this] { worker_loop(); });
213 }
214 }
215
221 void shutdown() {
222 {
223 lock<mutex> lock(mutex_);
224 shutdown_ = true;
225 }
226 cv_.notify_all();
227
228 for (auto& worker : workers_) {
229 if (worker.joinable()) {
230 worker.join();
231 }
232 }
233 }
234
240 ~virtual_thread_scheduler() {
241 shutdown();
242 }
243};
244
251struct virtual_thread_awaiter {
252 std::coroutine_handle<> handle_;
253
258 bool await_ready() const noexcept {
259 return false;
260 }
261
268 void await_suspend(std::coroutine_handle<> handle) {
269 virtual_thread_scheduler::get_instance().schedule(handle);
270 }
271
275 void await_resume() const noexcept {}
276};
277
278
286class virtual_thread {
287private:
288 optional<virtual_thread_task> task_;
289
298 template<typename Func>
299 static virtual_thread_task create_task(Func func) {
300 co_await std::suspend_never{};
301 func();
302 }
303
304public:
305 virtual_thread() = default;
306 virtual_thread(const virtual_thread&) = delete;
307 virtual_thread& operator =(const virtual_thread&) = delete;
308 virtual_thread(virtual_thread&& other) noexcept = default;
309 virtual_thread& operator =(virtual_thread&& other) noexcept = default;
310
319 template <typename Func>
320 static virtual_thread start(Func&& func) {
321 virtual_thread vt;
322 vt.task_ = virtual_thread::create_task(_MSTL forward<Func>(func));
323 return vt;
324 }
325
332 static virtual_thread_awaiter yield() {
333 return virtual_thread_awaiter{};
334 }
335
343 static virtual_thread_task sleep(const int64_t ms) {
344 co_await yield();
345 this_thread::sleep_for(milliseconds(ms));
346 }
347
354 static void initialize(size_t num_threads) {
355 virtual_thread_scheduler::get_instance().start_workers(num_threads);
356 }
357
363 static void shutdown() {
364 virtual_thread_scheduler::get_instance().shutdown();
365 }
366};
367 // Coroutine
369
371#endif
372#endif // MSTL_CORE_ASYNC_VIRTUAL_THREAD_HPP__
MSTL原子类型完整实现
void notify_all() noexcept
通知所有等待线程
void notify_one() noexcept
通知一个等待线程
void wait(smart_lock< mutex > &lock)
无限期等待
MSTL条件变量行为
MSTL_NODISCARD constexpr T && forward(remove_reference_t< T > &x) noexcept
完美转发左值
atomic< bool > atomic_bool
布尔原子类型
long long int64_t
64位有符号整数类型
duration< int64_t, milli > milliseconds
毫秒持续时间
exception_ptr MSTL_API current_exception() noexcept
获取当前异常
lock< Mutex, true > smart_lock
智能锁管理器的便捷类型别名
#define _MSTL
全局命名空间MSTL前缀
#define MSTL_END_NAMESPACE__
结束全局命名空间MSTL
#define MSTL_BEGIN_NAMESPACE__
开始全局命名空间MSTL
MSTL_CONSTEXPR14 T exchange(T &val, U &&new_val) noexcept(conjunction< is_nothrow_move_constructible< T >, is_nothrow_assignable< T &, U > >::value)
将新值赋给对象并返回旧值
MSTL_ALWAYS_INLINE_INLINE void yield() noexcept
让出当前线程的时间片
constexpr T initialize() noexcept(is_nothrow_default_constructible< T >::value)
返回类型T的默认初始化值
MSTL可选值类型