arduino-audio-tools
SynchronizedBuffers.h
1 
2 #pragma once
3 #include "AudioConfig.h"
4 #include "AudioTools/CoreAudio/AudioTypes.h"
5 #include "AudioTools/CoreAudio/Buffers.h"
6 #include "AudioTools/CoreAudio/AudioLogger.h"
7 
8 #ifdef ESP32
9 # include "freertos/FreeRTOS.h"
10 # include "AudioTools/Concurrency/QueueRTOS.h"
11 # if ESP_IDF_VERSION_MAJOR >= 4
12 # include <freertos/stream_buffer.h>
13 # endif
14 #else
15 # include "stream_buffer.h"
16 #endif
17 
18 #include "LockGuard.h"
19 
20 namespace audio_tools {
21 
31 template <typename T>
32 class SynchronizedBuffer : public BaseBuffer<T> {
33 public:
34  SynchronizedBuffer(BaseBuffer<T> &buffer, Mutex &mutex, bool syncAvailable=false) {
35  p_buffer = &buffer;
36  p_mutex = &mutex;
37  is_sync_available = syncAvailable;
38  }
39 
40  // reads a single value
41  T read() override {
42  TRACED();
43  LockGuard guard(p_mutex);
44  return p_buffer->read();
45  }
46 
47  // reads multiple values
48  int readArray(T data[], int len) {
49  TRACED();
50  LockGuard guard(p_mutex);
51  int lenResult = MIN(len, available());
52  for (int j = 0; j < lenResult; j++) {
53  data[j] = p_buffer->read();
54  }
55  return lenResult;
56  }
57 
58  int writeArray(const T data[], int len) {
59  LOGD("%s: %d", LOG_METHOD, len);
60  LockGuard guard(p_mutex);
61  int result = 0;
62  for (int j = 0; j < len; j++) {
63  if (p_buffer->write(data[j]) == 0) {
64  break;
65  }
66  result = j + 1;
67  }
68  return result;
69  }
70 
71  // peeks the actual entry from the buffer
72  T peek() override {
73  TRACED();
74  LockGuard guard(p_mutex);
75  return p_buffer->peek();
76  }
77 
78  // checks if the buffer is full
79  bool isFull() override { return p_buffer->isFull(); }
80 
81  bool isEmpty() { return available() == 0; }
82 
83  // write add an entry to the buffer
84  bool write(T data) override {
85  TRACED();
86  LockGuard guard(p_mutex);
87  return p_buffer->write(data);
88  }
89 
90  // clears the buffer
91  void reset() override {
92  TRACED();
93  LockGuard guard(p_mutex);
94  p_buffer->reset();
95  }
96 
97  // provides the number of entries that are available to read
98  int available() override {
99  TRACED();
100  if (is_sync_available) LockGuard guard(p_mutex);
101  return p_buffer->available();
102  }
103 
104  // provides the number of entries that are available to write
105  int availableForWrite() override {
106  TRACED();
107  if (is_sync_available) LockGuard guard(p_mutex);
108  return p_buffer->availableForWrite();
109  }
110 
111  // returns the address of the start of the physical read buffer
112  T *address() override {
113  TRACED();
114  return p_buffer->address();
115  }
116 
117  size_t size() {
118  return p_buffer->size();
119  }
120 
121 protected:
122  BaseBuffer<T> *p_buffer = nullptr;
123  Mutex *p_mutex = nullptr;
124  bool is_sync_available = false;
125 };
126 
134 template <typename T>
135 class SynchronizedNBuffer : public NBuffer<T> {
136 public:
137  SynchronizedNBuffer(int bufferSize, int bufferCount, int writeMaxWait=portMAX_DELAY, int readMaxWait=portMAX_DELAY) {
138  TRACED();
139  read_max_wait = readMaxWait;
140  write_max_wait = writeMaxWait;
141  resize(bufferSize, bufferCount);
142  }
144  cleanup();
145  }
146 
147  void resize(int bufferSize, int bufferCount) {
148  TRACED();
149  if (buffer_size == bufferSize && buffer_count == bufferCount){
150  return;
151  }
152 
153  max_size = bufferSize * bufferCount;
154  NBuffer<T>::buffer_count = bufferCount;
155  NBuffer<T>::buffer_size = bufferSize;
156 
157  cleanup();
158  available_buffers.resize(bufferCount);
159  filled_buffers.resize(bufferCount);
160 
161  setReadMaxWait(read_max_wait);
162  setWriteMaxWait(write_max_wait);
163 
164  // setup buffers
165  for (int j = 0; j < bufferCount; j++) {
166  BaseBuffer<T> *tmp = new SingleBuffer<T>(bufferSize);
167  if (tmp != nullptr) {
168  available_buffers.enqueue(tmp);
169  } else {
170  LOGE("Not Enough Memory for buffer %d", j);
171  }
172  }
173  }
174 
175  void setReadMaxWait(TickType_t ticks){
176  available_buffers.setReadMaxWait(ticks);
177  filled_buffers.setReadMaxWait(ticks);
178  }
179 
180  void setWriteMaxWait(TickType_t ticks){
181  available_buffers.setWriteMaxWait(ticks);
182  filled_buffers.setWriteMaxWait(ticks);
183  }
184 
185  size_t size() {
186  return max_size;
187  }
188 
189  int bufferCountFilled() {
190  return filled_buffers.size();
191  }
192 
193  int bufferCountEmpty() {
194  return available_buffers.size();
195  }
196 
197 protected:
198  QueueRTOS<BaseBuffer<T>*> available_buffers{0,portMAX_DELAY,0};
199  QueueRTOS<BaseBuffer<T>*> filled_buffers{0,portMAX_DELAY,0};
200  size_t max_size;
201  size_t read_max_wait, write_max_wait;
202  int buffer_size = 0, buffer_count = 0;
203 
205  void cleanup(){
206  TRACED();
207  BaseBuffer<T>* buffer = nullptr;;
208  while (available_buffers.dequeue(buffer)){
209  delete buffer;
210  }
211  while (filled_buffers.dequeue(buffer)){
212  delete buffer;
213  }
214  }
215 
216  BaseBuffer<T> *getNextAvailableBuffer() {
217  TRACED();
218  BaseBuffer<T>* result;
219  return available_buffers.dequeue(result) ? result : nullptr;
220  }
221 
222  bool addAvailableBuffer(BaseBuffer<T> *buffer) {
223  TRACED();
224  return available_buffers.enqueue(buffer);
225  }
226 
227  BaseBuffer<T> *getNextFilledBuffer() {
228  TRACED();
229  BaseBuffer<T>* result;
230  return filled_buffers.dequeue(result) ? result : nullptr;
231  }
232 
233  bool addFilledBuffer(BaseBuffer<T> *buffer) {
234  TRACED();
235  return filled_buffers.enqueue(buffer);
236  }
237 };
238 
239 
240 } // namespace audio_tools
241 
Shared functionality of all buffers.
Definition: Buffers.h:30
RAII implementaion using a Mutex: Only a few microcontrollers provide lock guards,...
Definition: LockGuard.h:91
Mutex implemntation using FreeRTOS.
Definition: LockGuard.h:57
A lock free N buffer. If count=2 we create a DoubleBuffer, if count=3 a TripleBuffer etc.
Definition: Buffers.h:563
FIFO Queue whch is based on the FreeRTOS queue API. The default allocator will allocate the memory fr...
Definition: QueueRTOS.h:25
A simple Buffer implementation which just uses a (dynamically sized) array.
Definition: Buffers.h:169
Wrapper class that can turn any Buffer into a thread save implementation.
Definition: SynchronizedBuffers.h:32
T * address() override
returns the address of the start of the physical read buffer
Definition: SynchronizedBuffers.h:112
T peek() override
peeks the actual entry from the buffer
Definition: SynchronizedBuffers.h:72
bool write(T data) override
write add an entry to the buffer
Definition: SynchronizedBuffers.h:84
int available() override
provides the number of entries that are available to read
Definition: SynchronizedBuffers.h:98
int availableForWrite() override
provides the number of entries that are available to write
Definition: SynchronizedBuffers.h:105
bool isFull() override
checks if the buffer is full
Definition: SynchronizedBuffers.h:79
int writeArray(const T data[], int len)
Fills the buffer data.
Definition: SynchronizedBuffers.h:58
void reset() override
clears the buffer
Definition: SynchronizedBuffers.h:91
T read() override
reads a single value
Definition: SynchronizedBuffers.h:41
int readArray(T data[], int len)
reads multiple values
Definition: SynchronizedBuffers.h:48
NBuffer which uses some RTOS queues to manage the available and filled buffers.
Definition: SynchronizedBuffers.h:135
void cleanup()
Removes all allocated buffers.
Definition: SynchronizedBuffers.h:205
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition: AudioConfig.h:823