arduino-audio-tools
Loading...
Searching...
No Matches
QueueLockFree.h
Go to the documentation of this file.
1
2#pragma once
3#include <stdint.h>
4
5#include <atomic>
6#include <cstddef>
7#include <utility>
8
9namespace audio_tools {
10
17template <typename T>
19 public:
21 setAllocator(allocator);
23 }
24
26 size_t head = head_pos.load(std::memory_order_acquire);
27 size_t tail = tail_pos.load(std::memory_order_acquire);
28 for (size_t i = head; i != tail; ++i)
29 p_node[i & capacity_mask].ptr()->~T();
30 }
31
32 void setAllocator(Allocator& allocator) { vector.setAllocator(allocator); }
33
34 bool resize(size_t capacity) {
35 if (capacity == 0) capacity = 1;
36
37 // Destroy any live elements in the current queue before reinitialising.
38 if (p_node) {
39 size_t head = head_pos.load(std::memory_order_relaxed);
40 size_t tail = tail_pos.load(std::memory_order_relaxed);
41 for (size_t i = head; i != tail; ++i)
42 p_node[i & capacity_mask].ptr()->~T();
43 }
44
45 // Round capacity up to the next power of two so that the bitmask index
46 // wrapping always stays within the allocated array.
48 for (size_t i = 1; i <= sizeof(void*) * 4; i <<= 1)
51
52 vector.resize(capacity_value);
53 p_node = vector.data();
54
55 for (size_t i = 0; i < capacity_value; ++i) {
56 p_node[i].tail.store(i, std::memory_order_relaxed);
57 p_node[i].head.store(size_t(-1), std::memory_order_relaxed);
58 }
59
60 tail_pos.store(0, std::memory_order_relaxed);
61 head_pos.store(0, std::memory_order_relaxed);
62 return true;
63 }
64
65 size_t capacity() const { return capacity_value; }
66
67 size_t availableForWrite() const {
68 size_t head = head_pos.load(std::memory_order_seq_cst);
69 size_t tail = tail_pos.load(std::memory_order_seq_cst);
70 return capacity_value - (tail - head);
71 }
72
73 bool empty() const { return size() == 0; }
74
75 size_t size() const {
76 size_t head = head_pos.load(std::memory_order_seq_cst);
77 size_t tail = tail_pos.load(std::memory_order_seq_cst);
78 return tail - head;
79 }
80
81 bool enqueue(const T& data) { return emplace(data); }
82 bool enqueue(T&& data) { return emplace(std::move(data)); }
83
84 bool dequeue(T& result) {
85 Node* node;
86 size_t head = head_pos.load(std::memory_order_relaxed);
87 for (;;) {
88 node = &p_node[head & capacity_mask];
89 // acquire pairs with enqueue's release store on node->head,
90 // ensuring node->storage is visible before we read it.
91 if (node->head.load(std::memory_order_acquire) != head) return false;
92 if (head_pos.compare_exchange_weak(head, head + 1,
93 std::memory_order_relaxed))
94 break;
95 }
96 result = std::move(*node->ptr());
97 node->ptr()->~T();
98 node->tail.store(head + capacity_value, std::memory_order_release);
99 return true;
100 }
101
102 void clear() {
103 size_t head = head_pos.load(std::memory_order_acquire);
104 size_t tail = tail_pos.load(std::memory_order_acquire);
105 for (size_t i = head; i != tail; ++i) {
106 Node* node = &p_node[i & capacity_mask];
107 node->ptr()->~T();
108 node->tail.store(i + capacity_value, std::memory_order_release);
109 }
110 head_pos.store(tail, std::memory_order_release);
111 }
112
113 protected:
114 struct Node {
115 alignas(T) unsigned char storage[sizeof(T)];
116 std::atomic<size_t> tail;
117 std::atomic<size_t> head;
118 T* ptr() { return reinterpret_cast<T*>(storage); }
119 const T* ptr() const { return reinterpret_cast<const T*>(storage); }
120 };
121
122 // Single enqueue implementation for both lvalue and rvalue paths.
123 template <typename U>
124 bool emplace(U&& val) {
125 Node* node;
126 size_t tail = tail_pos.load(std::memory_order_relaxed);
127 for (;;) {
128 node = &p_node[tail & capacity_mask];
129 // acquire pairs with dequeue's release store on node->tail,
130 // ensuring ~T() in the consumer is complete before we reuse the slot.
131 if (node->tail.load(std::memory_order_acquire) != tail) return false;
132 if (tail_pos.compare_exchange_weak(tail, tail + 1,
133 std::memory_order_relaxed))
134 break;
135 }
136 new (node->ptr()) T(std::forward<U>(val));
137 node->head.store(tail, std::memory_order_release);
138 return true;
139 }
140
141 Node* p_node = nullptr;
142 size_t capacity_mask = 0;
143 size_t capacity_value = 0;
144 std::atomic<size_t> tail_pos{0};
145 std::atomic<size_t> head_pos{0};
147};
148
149} // namespace audio_tools
Memory allocateator which uses malloc.
Definition Allocator.h:24
Lock-free MPMC queue.
Definition QueueLockFree.h:18
size_t availableForWrite() const
Definition QueueLockFree.h:67
bool enqueue(T &&data)
Definition QueueLockFree.h:82
bool dequeue(T &result)
Definition QueueLockFree.h:84
size_t size() const
Definition QueueLockFree.h:75
size_t capacity_value
Definition QueueLockFree.h:143
bool resize(size_t capacity)
Definition QueueLockFree.h:34
bool empty() const
Definition QueueLockFree.h:73
size_t capacity() const
Definition QueueLockFree.h:65
bool enqueue(const T &data)
Definition QueueLockFree.h:81
std::atomic< size_t > tail_pos
Definition QueueLockFree.h:144
bool emplace(U &&val)
Definition QueueLockFree.h:124
void clear()
Definition QueueLockFree.h:102
void setAllocator(Allocator &allocator)
Definition QueueLockFree.h:32
std::atomic< size_t > head_pos
Definition QueueLockFree.h:145
~QueueLockFree()
Definition QueueLockFree.h:25
size_t capacity_mask
Definition QueueLockFree.h:142
QueueLockFree(size_t capacity, Allocator &allocator=DefaultAllocator)
Definition QueueLockFree.h:20
Vector< Node > vector
Definition QueueLockFree.h:146
Node * p_node
Definition QueueLockFree.h:141
Vector implementation which provides the most important methods as defined by std::vector....
Definition Vector.h:21
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition AudioCodecsBase.h:10
static TAllocatorExt DefaultAllocator
Definition Allocator.h:207
size_t writeData(Print *p_out, T *data, int samples, int maxSamples=512)
Definition AudioTypes.h:508
Definition QueueLockFree.h:114
unsigned char storage[sizeof(T)]
Definition QueueLockFree.h:115
const T * ptr() const
Definition QueueLockFree.h:119
std::atomic< size_t > head
Definition QueueLockFree.h:117
T * ptr()
Definition QueueLockFree.h:118
std::atomic< size_t > tail
Definition QueueLockFree.h:116