NexusForce 1.0.0
A Modern C++ Library with extended functionality, web components, and utility libraries
载入中...
搜索中...
未找到
lock_free_queue.hpp
浏览该文件的文档.
1#ifndef NEFORCE_CORE_ASYNC_LOCK_FREE_QUEUE_HPP__
2#define NEFORCE_CORE_ASYNC_LOCK_FREE_QUEUE_HPP__
3
11
14NEFORCE_BEGIN_NAMESPACE__
15
21
27
38template <typename T>
40private:
47 struct node_counter {
48 uint32_t internal_count : 30;
49 uint32_t external_counters : 2;
50 };
51
52 struct node;
53
60 struct counted_node_ptr {
61 int external_count = 0;
62 node* ptr = nullptr;
63
67 counted_node_ptr() noexcept {}
68 };
69
76 struct node {
80
85 explicit node(int external_count = 2) {
86 node_counter new_count;
87 new_count.internal_count = 0;
88 new_count.external_counters = external_count;
89 count.store(new_count);
90
91 counted_node_ptr node_ptr;
92 node_ptr.ptr = nullptr;
93 node_ptr.external_count = 0;
94
95 next.store(node_ptr);
96 }
97
103 void release_ref() {
104 node_counter old_counter = count.load(memory_order_relaxed);
105 node_counter new_counter;
106 do {
107 new_counter = old_counter;
108 --new_counter.internal_count;
109 } while (!count.compare_exchange_strong(old_counter, new_counter, memory_order_acquire,
111 if (!new_counter.internal_count && !new_counter.external_counters) {
112 delete this;
113 destruct_count.fetch_add(1);
114 }
115 }
116 };
117
118private:
119 static atomic<int> destruct_count;
120 static atomic<int> construct_count;
121
124 atomic<size_t> push_count_{0};
125 atomic<size_t> pop_count_{0};
126
127private:
135 void set_new_tail(counted_node_ptr& old_tail, counted_node_ptr const& new_tail) {
136 node* const current_tail_ptr = old_tail.ptr;
137 while (!tail.compare_exchange_weak(old_tail, new_tail) && old_tail.ptr == current_tail_ptr) {
138 this_thread::relax();
139 }
140 if (old_tail.ptr == current_tail_ptr) {
141 lock_free_queue::free_external_counter(old_tail);
142 } else {
143 current_tail_ptr->release_ref();
144 }
145 }
146
154 static void free_external_counter(counted_node_ptr& old_node_ptr) {
155 node* const ptr = old_node_ptr.ptr;
156 int const count_increase = old_node_ptr.external_count - 2;
157 node_counter old_counter = ptr->count.load(_NEFORCE memory_order_relaxed);
158 node_counter new_counter;
159 do {
160 new_counter = old_counter;
161 --new_counter.external_counters;
162 new_counter.internal_count += count_increase;
163 } while (!ptr->count.compare_exchange_strong(old_counter, new_counter, memory_order_acquire,
165 if (!new_counter.internal_count && !new_counter.external_counters) {
166 destruct_count.fetch_add(1);
167 delete ptr;
168 }
169 }
170
179 static void increase_external_count(atomic<counted_node_ptr>& counter, counted_node_ptr& old_counter) {
180 counted_node_ptr new_counter;
181 do {
182 new_counter = old_counter;
183 ++new_counter.external_count;
184 } while (
185 !counter.compare_exchange_strong(old_counter, new_counter, memory_order_acquire, memory_order_relaxed));
186 old_counter.external_count = new_counter.external_count;
187 }
188
189public:
196 counted_node_ptr new_next;
197 new_next.ptr = new node();
198 new_next.external_count = 1;
199 tail.store(new_next);
200 head.store(new_next);
201 }
202
209 while (pop()) {
210 this_thread::relax();
211 }
212 auto head_counted_node = head.load();
213 delete head_counted_node.ptr;
214 }
215
223 void push(T new_value) {
224 unique_ptr<T> new_data(new T(new_value));
225 counted_node_ptr new_next;
226 new_next.ptr = new node;
227 new_next.external_count = 1;
228 counted_node_ptr old_tail = tail.load();
229 for (;;) {
230 lock_free_queue::increase_external_count(tail, old_tail);
231 T* old_data = nullptr;
232 if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get())) {
233 counted_node_ptr old_next;
234 counted_node_ptr now_next = old_tail.ptr->next.load();
235 if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) {
236 delete new_next.ptr;
237 new_next = old_next;
238 }
239 this->set_new_tail(old_tail, new_next);
240 new_data.release();
241 break;
242 }
243 counted_node_ptr old_next;
244 if (old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) {
245 old_next = new_next;
246 new_next.ptr = new node;
247 }
248 this->set_new_tail(old_tail, old_next);
249 }
250 ++construct_count;
251 push_count_.fetch_add(1, memory_order_relaxed);
252 }
253
262 counted_node_ptr old_head = head.load(memory_order_relaxed);
263 for (;;) {
264 lock_free_queue::increase_external_count(head, old_head);
265 node* const ptr = old_head.ptr;
266 if (ptr == tail.load().ptr) {
267 ptr->release_ref();
268 return _NEFORCE make_unique<T>();
269 }
270 counted_node_ptr next = ptr->next.load();
271 if (head.compare_exchange_strong(old_head, next)) {
272 T* res = ptr->data.exchange(nullptr);
273 lock_free_queue::free_external_counter(old_head);
274 pop_count_.fetch_add(1, memory_order_relaxed);
275 return unique_ptr<T>(res);
276 }
277 ptr->release_ref();
278 }
279 }
280
291 counted_node_ptr old_head{};
292
293 for (int retry = 0; retry < 3; ++retry) {
294 old_head = head.load(memory_order_relaxed);
295 lock_free_queue::increase_external_count(head, old_head);
296 node* ptr = old_head.ptr;
297
298 if (ptr == tail.load().ptr) {
299 ptr->release_ref();
300 return make_unique<T>();
301 }
302
303 counted_node_ptr next = ptr->next.load();
304 if (head.compare_exchange_strong(old_head, next)) {
305 T* res = ptr->data.exchange(nullptr);
306 lock_free_queue::free_external_counter(old_head);
307 pop_count_.fetch_add(1, _NEFORCE memory_order_relaxed);
308 return unique_ptr<T>(res);
309 }
310
311 ptr->release_ref();
312 }
313
314 return make_unique<T>();
315 }
316
324 bool empty() const noexcept {
325 const counted_node_ptr head_ptr = head.load(memory_order_acquire);
326 const counted_node_ptr tail_ptr = tail.load(memory_order_acquire);
327 return head_ptr.ptr == tail_ptr.ptr;
328 }
329
338 size_t size() const noexcept {
339 const size_t push_cnt = push_count_.load(memory_order_relaxed);
340 const size_t pop_cnt = pop_count_.load(memory_order_relaxed);
341 return push_cnt - pop_cnt;
342 }
343
355 void clear() {
356 while (unique_ptr<T> ptr = this->pop()) {
357 this_thread::relax();
358 }
359 }
360};
361
362template <typename T>
363atomic<int> lock_free_queue<T>::destruct_count{0};
364
365template <typename T>
366atomic<int> lock_free_queue<T>::construct_count{0};
367 // LockFreeQueue
369 // AsyncComponents
371
372NEFORCE_END_NAMESPACE__
373#endif // NEFORCE_CORE_ASYNC_LOCK_FREE_QUEUE_HPP__
原子类型完整实现
bool empty() const noexcept
检查队列是否为空
unique_ptr< T > try_pop()
尝试出队操作
void push(T new_value)
入队操作
lock_free_queue()
默认构造函数
void clear()
清空队列
size_t size() const noexcept
获取队列中元素的近似数量
unique_ptr< T > pop()
出队操作
独占智能指针
NEFORCE_CONSTEXPR20 pointer release() noexcept
释放所有权
NEFORCE_CONSTEXPR20 pointer get() const noexcept
获取原始指针
unsigned int uint32_t
32位无符号整数类型
task< T > retry(Factory &&factory, const size_t max_attempts, function< bool(const exception_ptr &)> should_retry=nullptr)
带重试的异步操作
constexpr iter_difference_t< Iterator > count(Iterator first, Iterator last, const T &value)
统计范围内等于指定值的元素数量
constexpr Iterator next(Iterator iter, iter_difference_t< Iterator > n=1)
获取迭代器的后一个位置
NEFORCE_INLINE17 constexpr auto memory_order_relaxed
宽松内存顺序常量
NEFORCE_INLINE17 constexpr auto memory_order_acquire
获取内存顺序常量
NEFORCE_NODISCARD NEFORCE_ALWAYS_INLINE constexpr decltype(auto) data(Container &cont) noexcept(noexcept(cont.data()))
获取容器的底层数据指针
NEFORCE_CONSTEXPR20 unique_ptr< T > make_unique(Args &&... args)
创建unique_ptr
通用原子类型模板
T load(const memory_order mo=memory_order_seq_cst) const noexcept
原子加载操作
bool compare_exchange_strong(T &expected, T desired, const memory_order success, const memory_order failure) noexcept
强比较交换操作
T exchange(T value, const memory_order mo=memory_order_seq_cst) noexcept
原子交换操作
独占智能指针