14MSTL_INLINE17
constexpr size_t THREAD_POOL_MAX_IDLE_SECONDS = 60;
15MSTL_INLINE17
constexpr size_t THREAD_POOL_LOCAL_QUEUE_SIZE = 256;
16static const size_t THREAD_POOL_THREAD_MAX_THRESHHOLD = sysinfo::instance().get_system_info().processor_numbers;
19enum class THREAD_POOL_MODE :
uint8_t {
20 MODE_FIXED, MODE_CACHED
23enum class STEAL_STRATEGY {
33class MSTL_API manual_thread {
38 using thread_func =
_MSTL function<void(id_type)>;
44 explicit manual_thread(thread_func&& func)
noexcept;
45 ~manual_thread() =
default;
47 MSTL_NODISCARD id_type
id() const noexcept {
return thread_id_; }
55 task_group() =
default;
56 ~task_group() =
default;
58 _MSTL atomic<size_t> running_count{0};
60 void increment() noexcept {
64 void decrement() noexcept {
66 running_count.notify_all();
70 void wait() const noexcept {
73 running_count.wait(
count);
80class MSTL_API local_queue {
82 static STEAL_STRATEGY steal_strategy_;
85 _MSTL array<
_MSTL function<void()>, THREAD_POOL_LOCAL_QUEUE_SIZE> tasks_{};
86 _MSTL atomic<uint64_t> head_{0};
87 _MSTL atomic<uint32_t> tail_{0};
90 constexpr static size_t mask_ = THREAD_POOL_LOCAL_QUEUE_SIZE - 1;
93 return static_cast<uint64_t>(steal) << 32 |
static_cast<uint64_t>(local_head);
96 MSTL_NODISCARD
static pair<uint32_t, uint32_t> unpack(
const uint64_t head)
noexcept {
103 local_queue() =
default;
104 ~local_queue() =
default;
105 local_queue(
const local_queue&) =
delete;
106 local_queue& operator =(
const local_queue&) =
delete;
107 local_queue(local_queue&& other)
noexcept;
108 local_queue& operator =(local_queue&& other)
noexcept;
110 MSTL_NODISCARD
size_t capacity() const noexcept {
return tasks_.size(); }
111 MSTL_NODISCARD
bool empty() const noexcept {
return size() == 0u; }
113 MSTL_NODISCARD
size_t remain_size() const noexcept {
116 const auto steal = unpack(head).first;
117 const size_t used =
static_cast<size_t>(tail - steal);
118 const size_t remain = capacity() - used;
121 MSTL_NODISCARD
size_t size() const noexcept {
124 const auto local_head = unpack(head).second;
125 return static_cast<size_t>(tail - local_head);
128 static void set_steal_strategy(
const STEAL_STRATEGY strategy,
const uint32_t batch_size = 4) {
129 steal_strategy_ = strategy;
130 fixed_batch_size_ = batch_size;
133 void push_back(
_MSTL function<
void()> task) {
139 _MSTL optional<
_MSTL function<void()>> try_pop();
141 _MSTL optional<
_MSTL function<void()>> be_stolen_by(local_queue& dst_queue);
145struct MSTL_API worker_context {
146 using id_type =
_INNER manual_thread::id_type;
150 _MSTL atomic<bool> is_stealing{
false};
151 size_t consecutive_idle_count = 0;
153 worker_context() =
default;
154 worker_context(
const worker_context&) =
delete;
155 worker_context& operator =(
const worker_context&) =
delete;
156 worker_context(worker_context&& other)
noexcept;
157 worker_context& operator =(worker_context&& other)
noexcept;
161enum class TASK_STATUS {
168MSTL_CONSTEXPR20
string to_string(
const TASK_STATUS status) {
170 case TASK_STATUS::PENDING:
return "PENDING";
171 case TASK_STATUS::RUNNING:
return "RUNNING";
172 case TASK_STATUS::COMPLETED:
return "COMPLETED";
173 case TASK_STATUS::FAILED:
return "FAILED";
174 default: MSTL_UNREACHABLE;
183 atomic<TASK_STATUS> status{TASK_STATUS::PENDING};
184 timestamp submit_time{timestamp::now()};
185 timestamp start_time{0};
186 timestamp finish_time{0};
187 _INNER manual_thread::id_type worker_thread_id{0};
194 MSTL_NODISCARD
bool is_finished() const noexcept {
196 return s == TASK_STATUS::COMPLETED || s == TASK_STATUS::FAILED;
199 MSTL_NODISCARD
int64_t exec_time() const noexcept {
200 if (start_time.value() == 0 || finish_time.value() == 0) {
203 return finish_time - start_time;
211struct submit_result {
212 _MSTL future<T> future;
213 task_info_ptr task_info;
215 MSTL_NODISCARD
explicit operator bool() const noexcept {
216 return future.valid() && task_info;
221class MSTL_API thread_pool {
223 struct periodic_task_state {
227 struct MSTL_API pool_statistics : istringify<pool_statistics> {
228 size_t total_threads;
232 size_t total_submitted;
234 size_t total_completed;
236 MSTL_NODISCARD
string to_string()
const;
239 using id_type =
_INNER manual_thread::id_type;
240 using periodic_token = shared_ptr<periodic_task_state>;
241 using priority_type = task_info::priority_type;
244 using Task =
_MSTL function<void()>;
246 struct priority_task {
249 task_info_ptr task_info;
251 priority_task(Task t,
const priority_type p, task_info_ptr info) noexcept
254 bool operator <(
const priority_task& other)
const noexcept {
259 _MSTL unordered_map<id_type, _MSTL unique_ptr<_INNER manual_thread>> threads_map_;
260 _MSTL unordered_map<id_type, worker_context> worker_contexts_;
261 _MSTL vector<_MSTL atomic<worker_context*>> worker_contexts_ptr_;
262 _MSTL mutex worker_contexts_mtx_;
264 _MSTL timer_scheduler<steady_clock> timer_{};
266 id_type init_thread_size_{0};
267 size_t thread_threshhold_{THREAD_POOL_THREAD_MAX_THRESHHOLD};
269 _MSTL priority_queue<priority_task> task_queue_{};
272 size_t task_threshhold_{THREAD_POOL_TASK_MAX_THRESHHOLD};
274 _MSTL mutex task_queue_mtx_{};
275 _MSTL condition_variable not_full_{};
276 _MSTL condition_variable not_empty_{};
277 _MSTL condition_variable exit_cond_{};
279 _MSTL atomic<THREAD_POOL_MODE> pool_mode_{THREAD_POOL_MODE::MODE_FIXED};
294 void thread_function(id_type thread_id);
295 _MSTL optional<Task> try_steal_task(worker_context& ctx);
297 pool_statistics statistics_unsafe()
const;
303 thread_pool(
const thread_pool&) =
delete;
304 thread_pool& operator =(
const thread_pool&) =
delete;
305 thread_pool(thread_pool&&) =
delete;
306 thread_pool& operator =(thread_pool&&) =
delete;
308 bool set_mode(THREAD_POOL_MODE mode)
noexcept;
309 bool set_steal_mode(STEAL_STRATEGY strategy,
uint32_t steal_batch = 4) noexcept;
310 bool set_task_threshhold(
size_t threshhold) noexcept;
311 bool set_thread_threshhold(
size_t threshhold) noexcept;
313 MSTL_NODISCARD static
size_t max_thread_size() noexcept {
return THREAD_POOL_THREAD_MAX_THRESHHOLD; }
314 MSTL_NODISCARD
bool running() const noexcept {
return is_running_; }
315 MSTL_NODISCARD THREAD_POOL_MODE mode() const noexcept {
return pool_mode_; }
316 MSTL_NODISCARD pool_statistics statistics()
const;
318 bool start(
size_t init_thread_size = 3);
319 pool_statistics stop();
321 template <
typename Func,
typename... Args,
enable_if_t<is_invocable_v<Func, Args...>,
int> = 0>
322 submit_result<
invoke_result_t<Func, Args...>> submit_task(priority_type
priority, Func&& func, Args&&... args);
324 template <
typename Func,
typename... Args,
enable_if_t<is_invocable_v<Func, Args...>,
int> = 0>
325 submit_result<
invoke_result_t<Func, Args...>> submit_task(Func&& func, Args&&... args) {
329 template <
typename Func,
typename... Args,
enable_if_t<is_invocable_v<Func, Args...>,
int> = 0>
332 template <
typename Func,
typename... Args,
enable_if_t<is_invocable_v<Func, Args...>,
int> = 0>
333 submit_result<
invoke_result_t<Func, Args...>> submit_after(
int64_t delay_ms, Func&& func, Args&&... args) {
337 template <
typename Func,
typename... Args,
enable_if_t<is_invocable_v<Func, Args...>,
int> = 0>
338 periodic_token submit_every(
int64_t interval_ms, priority_type
priority, Func&& func, Args&&... args);
340 template <
typename Func,
typename... Args,
enable_if_t<is_invocable_v<Func, Args...>,
int> = 0>
341 periodic_token submit_every(
int64_t interval_ms, Func&& func, Args&&... args) {
345 static void cancel_periodic_task(
const periodic_token& token) {
346 if (token) token->cancelled.store(
true);
349 template <
typename... Types>
350 static tuple<future_result_t<Types>...>
wait(future<Types>&&... futures) {
355#ifdef MSTL_COMPILER_MSVC__
356MSTL_ALWAYS_INLINE_INLINE MSTL_API worker_context*& get_worker_context() noexcept;
357MSTL_ALWAYS_INLINE_INLINE MSTL_API
_MSTL shared_ptr<task_group>& get_current_task_group() noexcept;
359extern thread_local worker_context* t_worker_ctx;
361MSTL_ALWAYS_INLINE_INLINE worker_context*& get_worker_context() noexcept {
return t_worker_ctx; }
366template <
typename Func,
typename... Args,
enable_if_t<is_invocable_v<Func, Args...>,
int>>
368thread_pool::submit_task(
const priority_type
priority, Func&& func, Args&&... args) {
373 const auto current_group = get_current_task_group();
375 current_group->increment();
381 group = current_group,
382 info]()
mutable -> Result {
383 struct context_guard {
389 : info(
move(i)), group_inner(
move(g)) {
391 info->start_time = timestamp::now();
392 info->worker_thread_id = get_worker_context() ? get_worker_context()->id : 0;
394 prev_group_inner = get_current_task_group();
395 get_current_task_group() = group_inner;
398 ~context_guard() noexcept {
400 info->finish_time = timestamp::now();
401 auto expected = TASK_STATUS::RUNNING;
402 info->status.compare_exchange_strong(expected,
405 get_current_task_group() = prev_group_inner;
406 if (group_inner) group_inner->decrement();
413 context_guard guard(info, group);
418 info->error = e.
what();
422 info->error =
"Unknown exception";
429 Task job([task] { (*task)(); });
434 if (!not_full_.wait_for(
lock,
seconds(1), [&]()->
bool {
435 return task_queue_.size() < task_threshhold_;
438 info->error =
"Task queue is full";
441 []() -> Result {
return Result(); });
443 return submit_result<Result>{dummy_task->get_future(), info};
448 ++total_submitted_tasks_;
449 not_empty_.notify_one();
452 auto* ctx = get_worker_context();
454 if (ctx !=
nullptr && ctx->queue.remain_size() > 0) {
455 ctx->queue.push_back(
move(job));
456 ++total_submitted_tasks_;
459 if (!not_full_.wait_for(
lock,
seconds(1), [&]()->
bool {
460 return task_queue_.size() < task_threshhold_;
463 info->error =
"Task queue is full";
466 []() -> Result {
return Result(); });
468 return submit_result<Result>{dummy_task->get_future(), info};
471 task_queue_.emplace(
_MSTL move(job), 0, info);
473 ++total_submitted_tasks_;
474 not_empty_.notify_one();
478 if (pool_mode_.load() == THREAD_POOL_MODE::MODE_CACHED
479 && task_size_.load() > idle_thread_size_
480 && threads_map_.size() < thread_threshhold_) {
483 [
this](
const id_type
id) {
486 id_type thread_id = ptr->id();
488 if (thread_id >= worker_contexts_ptr_.size()) {
489 worker_contexts_ptr_.reserve(thread_id + 1);
490 for (
size_t i = worker_contexts_ptr_.size() - 1; i <= thread_id; i++) {
493 worker_contexts_ptr_.emplace_back(
move(tmp));
497 threads_map_.emplace(thread_id,
_MSTL move(ptr));
498 threads_map_[thread_id]->start();
502 return submit_result<Result>{
_MSTL move(res), info};
505template <
typename Func,
typename... Args,
enable_if_t<is_invocable_v<Func, Args...>,
int>>
507thread_pool::submit_after(
const int64_t delay_ms,
const priority_type
priority, Func&& func, Args&&... args) {
508 using Result =
decltype(func(args...));
516 struct context_guard {
519 explicit context_guard(task_info_ptr i) : info(
move(i)) {
521 info->start_time = timestamp::now();
522 info->worker_thread_id = get_worker_context() ? get_worker_context()->id : 0;
525 ~context_guard() noexcept {
526 info->finish_time = timestamp::now();
527 auto expected = TASK_STATUS::RUNNING;
528 info->status.compare_exchange_strong(expected,
533 context_guard guard(info);
539 info->error = e.what();
547 timer_.add_task(expire_time, [
this, task =
_MSTL move(task),
priority]()
mutable {
548 this->submit_task(
priority, [task]() {
553 return submit_result<Result>{
_MSTL move(res), info};
556template <
typename Func,
typename... Args,
enable_if_t<is_invocable_v<Func, Args...>,
int>>
557thread_pool::periodic_token thread_pool::submit_every(
int64_t interval_ms,
const priority_type
priority, Func &&func, Args &&...args) {
565 *handler_ptr = [
this, state, task, interval_ms,
priority, handler_ptr]() {
566 if (state->cancelled.load())
return;
568 this->submit_task(
priority, [task]() {
572 if (state->cancelled.load())
return;
574 timer_.add_task(next_time, [handler_ptr]() { (*handler_ptr)(); });
578 timer_.add_task(first_time, [handler_ptr]() { (*handler_ptr)(); });