arduino-audio-tools
SynchronizedBuffers.h
1 
2 #pragma once
3 #include "AudioConfig.h"
4 #include "AudioTools/AudioTypes.h"
5 #include "AudioTools/Buffers.h"
6 #include "AudioTools/AudioLogger.h"
7 
8 #if defined(USE_CONCURRENCY)
9 #ifdef ESP32
10 # include "freertos/FreeRTOS.h"
11 # include "Concurrency/QueueRTOS.h"
12 # if ESP_IDF_VERSION_MAJOR >= 4
13 # include <freertos/stream_buffer.h>
14 # endif
15 #else
16 # include "stream_buffer.h"
17 #endif
18 
19 #include "LockGuard.h"
20 
21 namespace audio_tools {
22 
32 template <typename T>
33 class SynchronizedBuffer : public BaseBuffer<T> {
34 public:
35  SynchronizedBuffer(BaseBuffer<T> &buffer, Mutex &mutex) {
36  p_buffer = &buffer;
37  p_mutex = &mutex;
38  }
39 
40  // reads a single value
41  T read() override {
42  TRACED();
43  LockGuard guard(p_mutex);
44  return p_buffer->read();
45  }
46 
47  // reads multiple values
48  int readArray(T data[], int len) {
49  TRACED();
50  LockGuard guard(p_mutex);
51  int lenResult = MIN(len, available());
52  for (int j = 0; j < lenResult; j++) {
53  data[j] = p_buffer->read();
54  }
55  return lenResult;
56  }
57 
58  int writeArray(const T data[], int len) {
59  LOGD("%s: %d", LOG_METHOD, len);
60  LockGuard guard(p_mutex);
61  int result = 0;
62  for (int j = 0; j < len; j++) {
63  if (p_buffer->write(data[j]) == 0) {
64  break;
65  }
66  result = j + 1;
67  }
68  return result;
69  }
70 
71  // peeks the actual entry from the buffer
72  T peek() override {
73  TRACED();
74  LockGuard guard(p_mutex);
75  return p_buffer->peek();
76  }
77 
78  // checks if the buffer is full
79  bool isFull() override { return p_buffer->isFull(); }
80 
81  bool isEmpty() { return available() == 0; }
82 
83  // write add an entry to the buffer
84  bool write(T data) override {
85  TRACED();
86  LockGuard guard(p_mutex);
87  return p_buffer->write(data);
88  }
89 
90  // clears the buffer
91  void reset() override {
92  TRACED();
93  LockGuard guard(p_mutex);
94  p_buffer->reset();
95  }
96 
97  // provides the number of entries that are available to read
98  int available() override {
99  TRACED();
100  LockGuard guard(p_mutex);
101  return p_buffer->available();
102  }
103 
104  // provides the number of entries that are available to write
105  int availableForWrite() override {
106  TRACED();
107  LockGuard guard(p_mutex);
108  return p_buffer->availableForWrite();
109  }
110 
111  // returns the address of the start of the physical read buffer
112  T *address() override {
113  TRACED();
114  return p_buffer->address();
115  }
116 
117  size_t size() {
118  return p_buffer->size();
119  }
120 
121 protected:
122  BaseBuffer<T> *p_buffer = nullptr;
123  Mutex *p_mutex = nullptr;
124 };
125 
133 template <typename T>
134 class SynchronizedNBuffer : public NBuffer<T> {
135 public:
136  SynchronizedNBuffer(int bufferSize, int bufferCount, int writeMaxWait=portMAX_DELAY, int readMaxWait=portMAX_DELAY) {
137  TRACED();
138  max_size = bufferSize * bufferCount;
139  NBuffer<T>::buffer_count = bufferCount;
140  NBuffer<T>::buffer_size = bufferSize;
141 
142  available_buffers.resize(bufferCount);
143  filled_buffers.resize(bufferCount);
144 
145  setReadMaxWait(readMaxWait);
146  setWriteMaxWait(writeMaxWait);
147 
148  // setup buffers
150  for (int j = 0; j < bufferCount; j++) {
151  BaseBuffer<T> *tmp = new SingleBuffer<T>(bufferSize);
152  if (tmp != nullptr) {
153  available_buffers.enqueue(tmp);
154  } else {
155  LOGE("Not Enough Memory for buffer %d", j);
156  }
157  }
158  }
159 
160  void setReadMaxWait(TickType_t ticks){
161  available_buffers.setReadMaxWait(ticks);
162  filled_buffers.setReadMaxWait(ticks);
163  }
164 
165  void setWriteMaxWait(TickType_t ticks){
166  available_buffers.setWriteMaxWait(ticks);
167  filled_buffers.setWriteMaxWait(ticks);
168  }
169 
170  size_t size() {
171  return max_size;
172  }
173 
174 protected:
175  QueueRTOS<BaseBuffer<T>*> available_buffers{0,portMAX_DELAY,0};
176  QueueRTOS<BaseBuffer<T>*> filled_buffers{0,portMAX_DELAY,0};
177  size_t max_size;
178 
179  BaseBuffer<T> *getNextAvailableBuffer() {
180  TRACED();
181  BaseBuffer<T>* result;
182  return available_buffers.dequeue(result) ? result : nullptr;
183  }
184 
185  bool addAvailableBuffer(BaseBuffer<T> *buffer) {
186  TRACED();
187  return available_buffers.enqueue(buffer);
188  }
189 
190  BaseBuffer<T> *getNextFilledBuffer() {
191  TRACED();
192  BaseBuffer<T>* result;
193  return filled_buffers.dequeue(result) ? result : nullptr;
194  }
195 
196  bool addFilledBuffer(BaseBuffer<T> *buffer) {
197  TRACED();
198  return filled_buffers.enqueue(buffer);
199  }
200 };
201 
202 
203 } // namespace audio_tools
204 
205 
206 #endif
Shared functionality of all buffers.
Definition: Buffers.h:30
RAII implementaion using a Mutex: Only a few microcontrollers provide lock guards,...
Definition: LockGuard.h:92
Mutex implemntation using FreeRTOS.
Definition: LockGuard.h:58
A lock free N buffer. If count=2 we create a DoubleBuffer, if count=3 a TripleBuffer etc.
Definition: Buffers.h:559
FIFO Queue whch is based on the FreeRTOS queue API. The default allocator will allocate the memory fr...
Definition: QueueRTOS.h:26
A simple Buffer implementation which just uses a (dynamically sized) array.
Definition: Buffers.h:162
Wrapper class that can turn any Buffer into a thread save implementation.
Definition: SynchronizedBuffers.h:33
T * address() override
returns the address of the start of the physical read buffer
Definition: SynchronizedBuffers.h:112
T peek() override
peeks the actual entry from the buffer
Definition: SynchronizedBuffers.h:72
bool write(T data) override
write add an entry to the buffer
Definition: SynchronizedBuffers.h:84
int available() override
provides the number of entries that are available to read
Definition: SynchronizedBuffers.h:98
int availableForWrite() override
provides the number of entries that are available to write
Definition: SynchronizedBuffers.h:105
bool isFull() override
checks if the buffer is full
Definition: SynchronizedBuffers.h:79
int writeArray(const T data[], int len)
Fills the buffer data.
Definition: SynchronizedBuffers.h:58
void reset() override
clears the buffer
Definition: SynchronizedBuffers.h:91
T read() override
reads a single value
Definition: SynchronizedBuffers.h:41
int readArray(T data[], int len)
reads multiple values
Definition: SynchronizedBuffers.h:48
NBuffer which uses some RTOS queues to manage the available and filled buffers.
Definition: SynchronizedBuffers.h:134
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition: AnalogAudio.h:10