MSTL 1.4.0
A Modern C++ Library with extended functionality, web components, and utility libraries
载入中...
搜索中...
未找到
lock_free_queue.hpp
浏览该文件的文档.
1#ifndef MSTL_CORE_ASYNC_LOCK_FREE_QUEUE_HPP__
2#define MSTL_CORE_ASYNC_LOCK_FREE_QUEUE_HPP__
3
11
15
21
32template <typename T>
34private:
41 struct node_counter {
42 uint32_t internal_count : 30;
43 uint32_t external_counters : 2;
44 };
45
46 struct node;
47
54 struct counted_node_ptr {
55 int external_count = 0;
56 node* ptr = nullptr;
57
61 counted_node_ptr() noexcept {}
62 };
63
70 struct node {
74
79 explicit node(int external_count = 2) {
80 node_counter new_count;
81 new_count.internal_count = 0;
82 new_count.external_counters = external_count;
83 count.store(new_count);
84
85 counted_node_ptr node_ptr;
86 node_ptr.ptr = nullptr;
87 node_ptr.external_count = 0;
88
89 next.store(node_ptr);
90 }
91
97 void release_ref() {
98 node_counter old_counter = count.load(memory_order_relaxed);
99 node_counter new_counter;
100 do {
101 new_counter = old_counter;
102 --new_counter.internal_count;
103 }
104 while (!count.compare_exchange_strong(
105 old_counter, new_counter,
107 if (!new_counter.internal_count && !new_counter.external_counters) {
108 delete this;
109 destruct_count.fetch_add(1);
110 }
111 }
112 };
113
114private:
115 static atomic<int> destruct_count;
116 static atomic<int> construct_count;
117
120 atomic<size_t> push_count_{0};
121 atomic<size_t> pop_count_{0};
122
123private:
131 void set_new_tail(counted_node_ptr& old_tail, counted_node_ptr const& new_tail) {
132 node* const current_tail_ptr = old_tail.ptr;
133 while (!tail.compare_exchange_weak(old_tail, new_tail) &&
134 old_tail.ptr == current_tail_ptr) {
135 this_thread::relax();
136 }
137 if (old_tail.ptr == current_tail_ptr) {
138 lock_free_queue::free_external_counter(old_tail);
139 } else {
140 current_tail_ptr->release_ref();
141 }
142 }
143
151 static void free_external_counter(counted_node_ptr& old_node_ptr) {
152 node* const ptr = old_node_ptr.ptr;
153 int const count_increase = old_node_ptr.external_count - 2;
154 node_counter old_counter = ptr->count.load(_MSTL memory_order_relaxed);
155 node_counter new_counter;
156 do {
157 new_counter = old_counter;
158 --new_counter.external_counters;
159 new_counter.internal_count += count_increase;
160 }
161 while (!ptr->count.compare_exchange_strong(
162 old_counter, new_counter,
164 if (!new_counter.internal_count && !new_counter.external_counters) {
165 destruct_count.fetch_add(1);
166 delete ptr;
167 }
168 }
169
178 static void increase_external_count(
180 counted_node_ptr& old_counter) {
181 counted_node_ptr new_counter;
182 do {
183 new_counter = old_counter;
184 ++new_counter.external_count;
185 } while (!counter.compare_exchange_strong(
186 old_counter, new_counter,
188 old_counter.external_count = new_counter.external_count;
189 }
190
191public:
198 counted_node_ptr new_next;
199 new_next.ptr = new node();
200 new_next.external_count = 1;
201 tail.store(new_next);
202 head.store(new_next);
203 }
204
211 while (pop()) { this_thread::relax(); }
212 auto head_counted_node = head.load();
213 delete head_counted_node.ptr;
214 }
215
223 void push(T new_value) {
224 _MSTL unique_ptr<T> new_data(new T(new_value));
225 counted_node_ptr new_next;
226 new_next.ptr = new node;
227 new_next.external_count = 1;
228 counted_node_ptr old_tail = tail.load();
229 for (;;) {
230 lock_free_queue::increase_external_count(tail, old_tail);
231 T* old_data = nullptr;
232 if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get())) {
233 counted_node_ptr old_next;
234 counted_node_ptr now_next = old_tail.ptr->next.load();
235 if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) {
236 delete new_next.ptr;
237 new_next = old_next;
238 }
239 this->set_new_tail(old_tail, new_next);
240 new_data.release();
241 break;
242 }
243 counted_node_ptr old_next;
244 if (old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) {
245 old_next = new_next;
246 new_next.ptr = new node;
247 }
248 this->set_new_tail(old_tail, old_next);
249 }
250 ++construct_count;
251 push_count_.fetch_add(1, _MSTL memory_order_relaxed);
252 }
253
262 counted_node_ptr old_head = head.load(_MSTL memory_order_relaxed);
263 for (;;) {
264 lock_free_queue::increase_external_count(head, old_head);
265 node* const ptr = old_head.ptr;
266 if (ptr == tail.load().ptr) {
267 ptr->release_ref();
268 return _MSTL make_unique<T>();
269 }
270 counted_node_ptr next = ptr->next.load();
271 if (head.compare_exchange_strong(old_head, next)) {
272 T* res = ptr->data.exchange(nullptr);
273 lock_free_queue::free_external_counter(old_head);
274 pop_count_.fetch_add(1, _MSTL memory_order_relaxed);
275 return unique_ptr<T>(res);
276 }
277 ptr->release_ref();
278 }
279 }
280
291 counted_node_ptr old_head{};
292
293 for (int retry = 0; retry < 3; ++retry) {
294 old_head = head.load(_MSTL memory_order_relaxed);
295 lock_free_queue::increase_external_count(head, old_head);
296 node* ptr = old_head.ptr;
297
298 if (ptr == tail.load().ptr) {
299 ptr->release_ref();
300 return _MSTL make_unique<T>();
301 }
302
303 counted_node_ptr next = ptr->next.load();
304 if (head.compare_exchange_strong(old_head, next)) {
305 T* res = ptr->data.exchange(nullptr);
306 lock_free_queue::free_external_counter(old_head);
307 pop_count_.fetch_add(1, _MSTL memory_order_relaxed);
308 return _MSTL unique_ptr<T>(res);
309 }
310
311 ptr->release_ref();
312 }
313
314 return _MSTL make_unique<T>();
315 }
316
324 bool empty() const noexcept {
325 const counted_node_ptr head_ptr = head.load(_MSTL memory_order_acquire);
326 const counted_node_ptr tail_ptr = tail.load(_MSTL memory_order_acquire);
327 return head_ptr.ptr == tail_ptr.ptr;
328 }
329
338 size_t size() const noexcept {
339 const size_t push_cnt = push_count_.load(_MSTL memory_order_relaxed);
340 const size_t pop_cnt = pop_count_.load(_MSTL memory_order_relaxed);
341 return push_cnt - pop_cnt;
342 }
343
355 void clear() {
356 while (_MSTL unique_ptr<T> ptr = this->pop()) {
357 this_thread::relax();
358 }
359 }
360};
361
362template <typename T>
363_MSTL atomic<int> lock_free_queue<T>::destruct_count{0};
364
365template <typename T>
366_MSTL atomic<int> lock_free_queue<T>::construct_count{0};
367 // LockFreeQueue
369
371#endif // MSTL_CORE_ASYNC_LOCK_FREE_QUEUE_HPP__
MSTL原子类型完整实现
bool empty() const noexcept
检查队列是否为空
unique_ptr< T > try_pop()
尝试出队操作
void push(T new_value)
入队操作
lock_free_queue()
默认构造函数
void clear()
清空队列
size_t size() const noexcept
获取队列中元素的近似数量
unique_ptr< T > pop()
出队操作
独占智能指针
unsigned int uint32_t
32位无符号整数类型
constexpr iter_difference_t< Iterator > count(Iterator first, Iterator last, const T &value)
统计范围内等于指定值的元素数量
constexpr Iterator next(Iterator iter, iter_difference_t< Iterator > n=1)
获取迭代器的后一个位置
MSTL_INLINE17 constexpr auto memory_order_relaxed
宽松内存顺序常量
MSTL_INLINE17 constexpr auto memory_order_acquire
获取内存顺序常量
#define _MSTL
全局命名空间MSTL前缀
#define MSTL_END_NAMESPACE__
结束全局命名空间MSTL
#define MSTL_BEGIN_NAMESPACE__
开始全局命名空间MSTL
MSTL_NODISCARD MSTL_ALWAYS_INLINE constexpr decltype(auto) data(Container &cont) noexcept(noexcept(cont.data()))
获取容器的底层数据指针
MSTL_CONSTEXPR20 unique_ptr< T > make_unique(Args &&... args)
创建unique_ptr
通用原子类型模板
T load(const memory_order mo=memory_order_seq_cst) const noexcept
原子加载操作
bool compare_exchange_strong(T &expected, T desired, const memory_order success, const memory_order failure) noexcept
强比较交换操作
T exchange(T value, const memory_order mo=memory_order_seq_cst) noexcept
原子交换操作
MSTL独占智能指针