NexusForce 1.0.0
A Modern C++ Library with extended functionality, web components, and utility libraries
载入中...
搜索中...
未找到
virtual_thread.hpp
浏览该文件的文档.
1#ifndef NEFORCE_CORE_ASYNC_VIRTUAL_THREAD_HPP__
2#define NEFORCE_CORE_ASYNC_VIRTUAL_THREAD_HPP__
3
11
12#ifdef NEFORCE_STANDARD_20
18NEFORCE_BEGIN_NAMESPACE__
19
25
74
76
83
90 if (handle_ && !handle_.done()) {
91 handle_.destroy();
92 }
93 }
94
97
103 handle_(_NEFORCE exchange(other.handle_, nullptr)) {}
104
111 if (addressof(other) == this) {
112 return *this;
113 }
114 if (handle_ && !handle_.done()) {
115 handle_.destroy();
116 }
117 handle_ = _NEFORCE exchange(other.handle_, nullptr);
118 return *this;
119 }
120};
121
128class virtual_thread_scheduler {
129private:
130 queue<coroutine_handle<>> task_queue_;
131 vector<thread> workers_;
132 mutex mutex_;
134 atomic<bool> shutdown_;
135
139 virtual_thread_scheduler() :
140 shutdown_(false) {}
141
147 void worker_loop() {
148 while (true) {
150
151 {
152 unique_lock<mutex> lock(mutex_);
153 cv_.wait(lock, [this] { return shutdown_ || !task_queue_.empty(); });
154
155 if (shutdown_ && task_queue_.empty()) {
156 return;
157 }
158
159 if (!task_queue_.empty()) {
160 handle = task_queue_.front();
161 task_queue_.pop();
162 }
163 }
164
165 if (handle) {
166 handle.resume();
167 }
168 }
169 }
170
171public:
176 static virtual_thread_scheduler& get_instance() {
177 static virtual_thread_scheduler instance;
178 return instance;
179 }
180
188 {
189 lock<mutex> lock(mutex_);
190 task_queue_.push(handle);
191 }
192 cv_.notify_one();
193 }
194
201 void start_workers(size_t num_threads) {
202 for (size_t i = 0; i < num_threads; ++i) {
203 workers_.emplace_back([this] { worker_loop(); });
204 }
205 }
206
212 void shutdown() {
213 {
214 lock<mutex> lock(mutex_);
215 shutdown_ = true;
216 }
217 cv_.notify_all();
218
219 for (auto& worker: workers_) {
220 if (worker.joinable()) {
221 worker.join();
222 }
223 }
224 }
225
232};
233
262
263
272private:
274
283 template <typename Func>
284 static virtual_thread_task create_task(Func func) {
285 co_await suspend_never{};
286 func();
287 }
288
289public:
290 virtual_thread() = default;
293 virtual_thread(virtual_thread&& other) noexcept = default;
294 virtual_thread& operator=(virtual_thread&& other) noexcept = default;
295
304 template <typename Func>
305 static virtual_thread start(Func&& func) {
307 vt.task_ = virtual_thread::create_task(_NEFORCE forward<Func>(func));
308 return vt;
309 }
310
318
327 co_await yield();
328 this_thread::sleep_for(milliseconds(ms));
329 }
330
337 static void initialize(size_t num_threads) { virtual_thread_scheduler::get_instance().start_workers(num_threads); }
338
345};
346 // Coroutine
348
349NEFORCE_END_NAMESPACE__
350#endif
351#endif // NEFORCE_CORE_ASYNC_VIRTUAL_THREAD_HPP__
原子类型完整实现
锁管理器模板
非递归互斥锁
可选值类
队列容器适配器
独占锁管理器模板
动态大小数组容器
void start_workers(size_t num_threads)
启动工作线程
static virtual_thread_scheduler & get_instance()
获取调度器实例
void schedule(coroutine_handle<> handle)
调度协程任务
virtual_thread & operator=(virtual_thread &&other) noexcept=default
移动赋值运算符
virtual_thread(const virtual_thread &)=delete
禁止拷贝构造
static virtual_thread_awaiter yield()
让出执行权
static void shutdown()
关闭调度器
virtual_thread & operator=(const virtual_thread &)=delete
禁止拷贝赋值
static virtual_thread start(Func &&func)
启动虚拟线程
static void initialize(size_t num_threads)
初始化调度器
static virtual_thread_task sleep(const int64_t ms)
睡眠指定毫秒数
virtual_thread()=default
默认构造函数
virtual_thread(virtual_thread &&other) noexcept=default
移动构造函数
条件变量行为
协程支持
NEFORCE_NODISCARD constexpr T * addressof(T &x) noexcept
获取对象的地址
NEFORCE_NODISCARD constexpr T && forward(remove_reference_t< T > &x) noexcept
完美转发左值
long long int64_t
64位有符号整数类型
std::coroutine_handle< Promise > coroutine_handle
协程句柄
duration< int64_t, milli > milliseconds
毫秒持续时间
exception_ptr NEFORCE_API current_exception() noexcept
获取当前异常
NEFORCE_CONSTEXPR14 T exchange(T &val, U &&new_val) noexcept(is_nothrow_move_constructible_v< T > &&is_nothrow_assignable_v< T &, U >)
将新值赋给对象并返回旧值
NEFORCE_ALWAYS_INLINE_INLINE thread::native_handle_type handle() noexcept
获取当前线程句柄
可选值类型
队列容器适配器
通用原子类型模板
始终暂停的等待器
从不暂停的等待器
coroutine_handle handle_
协程句柄
void await_resume() const noexcept
恢复协程
void await_suspend(coroutine_handle<> handle)
挂起协程
bool await_ready() const noexcept
检查是否准备就绪
suspend_never initial_suspend()
初始挂起点
void unhandled_exception()
未处理异常处理
suspend_always final_suspend() noexcept
最终挂起点
virtual_thread_task get_return_object()
获取返回对象
coroutine_handle< promise_type > handle_
协程句柄
virtual_thread_task & operator=(const virtual_thread_task &)=delete
禁止拷贝赋值
virtual_thread_task(const virtual_thread_task &)=delete
禁止拷贝构造
virtual_thread_task(coroutine_handle< promise_type > h)
构造函数
virtual_thread_task(virtual_thread_task &&other) noexcept
移动构造函数
virtual_thread_task & operator=(virtual_thread_task &&other) noexcept
移动赋值运算符