arduino-audio-tools
All Classes Namespaces Files Functions Variables Typedefs Enumerations Friends Modules Pages
BaseStream.h
1#pragma once
2#include "AudioTools/CoreAudio/Buffers.h"
3#include "AudioTools/CoreAudio/AudioTypes.h"
4#include "AudioTools/CoreAudio/BaseConverter.h"
5
6#ifdef ARDUINO
7#include "Stream.h"
8#endif
9
10#ifdef USE_STREAM_WRITE_OVERRIDE
11#define STREAM_WRITE_OVERRIDE override
12#else
13#define STREAM_WRITE_OVERRIDE
14#endif
15
16#ifdef USE_STREAM_READ_OVERRIDE
17#define STREAM_READ_OVERRIDE override
18#else
19#define STREAM_READ_OVERRIDE
20#endif
21
22#ifdef USE_STREAM_READCHAR_OVERRIDE
23#define STREAM_READCHAR_OVERRIDE override
24#else
25#define STREAM_READCHAR_OVERRIDE
26#endif
27
28namespace audio_tools {
29
36class BaseStream : public Stream {
37 public:
38 BaseStream() = default;
39 virtual ~BaseStream() = default;
40 BaseStream(BaseStream const &) = delete;
41 BaseStream &operator=(BaseStream const &) = delete;
42
43 virtual bool begin(){return true;}
44 virtual void end(){}
45
46 virtual size_t readBytes(uint8_t *data,
47 size_t len) STREAM_READ_OVERRIDE = 0;
48 virtual size_t write(const uint8_t *data, size_t len) override = 0;
49
50 virtual size_t write(uint8_t ch) override {
51 tmp_out.resize(MAX_SINGLE_CHARS);
52 if (tmp_out.isFull()) {
53 flush();
54 }
55 return tmp_out.write(ch);
56 }
57
58 virtual int available() override { return DEFAULT_BUFFER_SIZE; };
59
60 virtual int availableForWrite() override { return DEFAULT_BUFFER_SIZE; }
61
62 virtual void flush() override {
63 if (tmp_out.available() > 0) {
64 write((const uint8_t *)tmp_out.address(), tmp_out.available());
65 }
66 }
67
68// Methods which should be suppressed in the documentation
69#ifndef DOXYGEN
70
71 virtual size_t readBytes(char *data, size_t len) STREAM_READCHAR_OVERRIDE {
72 return readBytes((uint8_t *)data, len);
73 }
74
75 virtual int read() override {
76 refillReadBuffer();
77 // if it is empty we need to return an int -1
78 if (tmp_in.isEmpty()) return -1;
79 uint8_t result = 0;
80 if (!tmp_in.read(result)) return -1;
81 return result;
82 }
83
84 virtual int peek() override {
85 refillReadBuffer();
86 // if it is empty we need to return an int -1
87 if (tmp_in.isEmpty()) return -1;
88 uint8_t result = 0;
89 if (!tmp_in.peek(result)) return -1;
90 return result;
91 }
92
93#endif
94
95 protected:
96 RingBuffer<uint8_t> tmp_in{0};
97 RingBuffer<uint8_t> tmp_out{0};
98
99 void refillReadBuffer() {
100 tmp_in.resize(DEFAULT_BUFFER_SIZE);
101 if (tmp_in.isEmpty()) {
102 TRACED();
103 const int len = tmp_in.size();
104 uint8_t bytes[len];
105 int len_eff = readBytes(bytes, len);
106 // LOGD("tmp_in available: %d / size: %d / to be written %d",
107 // tmp_in.available(), tmp_in.size(), len_eff);
108 tmp_in.writeArray(bytes, len_eff);
109 }
110 }
111};
112
120 public:
121 AudioStream() = default;
122 virtual ~AudioStream() = default;
123 AudioStream(AudioStream const&) = delete;
124 AudioStream& operator=(AudioStream const&) = delete;
125
126 // Call from subclass or overwrite to do something useful
127 virtual void setAudioInfo(AudioInfo newInfo) override {
128 TRACED();
129
130 if (info != newInfo){
131 info = newInfo;
132 info.logInfo("in:");
133 }
134 // replicate information
135 AudioInfo out_new = audioInfoOut();
136 if (out_new) {
137 out_new.logInfo("out:");
138 notifyAudioChange(out_new);
139 }
140
141 }
142
143 virtual size_t readBytes(uint8_t *data, size_t len) override { return not_supported(0, "readBytes"); }
144
145 virtual size_t write(const uint8_t *data, size_t len) override{ return not_supported(0,"write"); }
146
147
148 virtual operator bool() { return info && available() > 0; }
149
150 virtual AudioInfo audioInfo() override {
151 return info;
152 }
153
154
156 virtual void writeSilence(size_t len){
157 int16_t zero = 0;
158 for (int j=0;j<len/2;j++){
159 write((uint8_t*)&zero,2);
160 }
161 }
162
164 virtual size_t readSilence(uint8_t *buffer, size_t length) {
165 memset(buffer, 0, length);
166 return length;
167 }
168
169 protected:
170 AudioInfo info;
171
172 virtual int not_supported(int out, const char *msg = "") {
173 LOGE("AudioStream: %s unsupported operation!", msg);
174 // trigger stacktrace
175 assert(false);
176 return out;
177 }
178
179};
180
181
192class CatStream : public BaseStream {
193 public:
194 CatStream() = default;
195
196 void add(Stream *stream) { input_streams.push_back(stream); }
197 void add(Stream &stream) { input_streams.push_back(&stream); }
198
199 bool begin() override {
200 is_active = true;
201 return true;
202 }
203
204 void end() override { is_active = false; }
205
206 int available() override {
207 if (!is_active) return 0;
208 if (!moveToNextStreamOnEnd()) {
209 return 0;
210 }
211 return availableWithTimout();
212 }
213
214 size_t readBytes(uint8_t *data, size_t len) override {
215 if (!is_active) return 0;
216 if (!moveToNextStreamOnEnd()) {
217 return 0;
218 }
219 return p_current_stream->readBytes(data, len);
220 }
221
223 operator bool() { return is_active && available() > 0; }
224
225 void setOnBeginCallback(void (*callback)(Stream *stream)) {
226 begin_callback = callback;
227 }
228 void setOnEndCallback(void (*callback)(Stream *stream)) {
229 end_callback = callback;
230 }
231
233 void setTimeout(size_t t) { _timeout = t; }
234
236 size_t write(const uint8_t *data, size_t size) override { return 0;};
237
238 protected:
239 Vector<Stream *> input_streams;
240 Stream *p_current_stream = nullptr;
241 bool is_active = false;
242 void (*begin_callback)(Stream *stream) = nullptr;
243 void (*end_callback)(Stream *stream) = nullptr;
244 size_t _timeout = 0;
245
249 // keep on running
250 if (p_current_stream != nullptr && p_current_stream->available() > 0)
251 return true;
252 // at end?
253 if ((p_current_stream == nullptr || availableWithTimout() == 0)) {
254 if (end_callback && p_current_stream) end_callback(p_current_stream);
255 if (!input_streams.empty()) {
256 LOGI("using next stream");
257 p_current_stream = input_streams[0];
258 input_streams.pop_front();
259 if (begin_callback && p_current_stream)
260 begin_callback(p_current_stream);
261 } else {
262 p_current_stream = nullptr;
263 }
264 }
265 // returns true if we have a valid stream
266 return p_current_stream != nullptr;
267 }
268
269 int availableWithTimout() {
270 int result = p_current_stream->available();
271 if (result == 0) {
272 for (int j = 0; j < _timeout / 10; j++) {
273 delay(10);
274 result = p_current_stream->available();
275 if (result != 0) break;
276 }
277 }
278 return result;
279 }
280};
281
289class NullStream : public BaseStream {
290 public:
291 size_t write(const uint8_t *data, size_t len) override { return len; }
292
293 size_t readBytes(uint8_t *data, size_t len) override {
294 memset(data, 0, len);
295 return len;
296 }
297};
298
299
307template <class T>
308class QueueStream : public BaseStream {
309 public:
311 QueueStream() = default;
313 QueueStream(int bufferSize, int bufferCount,
314 bool autoRemoveOldestDataIfFull = false) {
315 owns_buffer = true;
316 callback_buffer_ptr = new NBuffer<T>(bufferSize, bufferCount);
317 remove_oldest_data = autoRemoveOldestDataIfFull;
318 }
321 setBuffer(buffer);
322 }
323
324 virtual ~QueueStream() {
325 if (owns_buffer) {
326 delete callback_buffer_ptr;
327 }
328 }
329
330 void setBuffer(BaseBuffer<T> &buffer){
331 owns_buffer = false;
332 callback_buffer_ptr = &buffer;
333 }
334
336 virtual bool begin() override {
337 TRACED();
338 total_written = 0;
339 active = true;
340 return true;
341 }
342
344 virtual bool begin(size_t activeWhenPercentFilled) {
345 total_written = 0;
346 // determine total buffer size in bytes
347 size_t size = callback_buffer_ptr->size() * sizeof(T);
348 // calculate limit
349 active_limit = size * activeWhenPercentFilled / 100;
350 LOGI("activate after: %u bytes",(unsigned)active_limit);
351 return true;
352 }
353
355 virtual void end() override {
356 TRACED();
357 active = false;
358 };
359
360 int available() override {
361 return active ? callback_buffer_ptr->available() * sizeof(T) : 0;
362 }
363
364 int availableForWrite() override {
365 if (!active && active_limit > 0) return DEFAULT_BUFFER_SIZE;
366 return callback_buffer_ptr->availableForWrite() * sizeof(T);
367 }
368
369 virtual size_t write(const uint8_t *data, size_t len) override {
370 if (len == 0) return 0;
371 if (active_limit == 0 && !active) return 0;
372
373 // activate automaticaly when limit has been reached
374 total_written += len;
375 if (!active && active_limit > 0 && total_written >= active_limit) {
376 this->active = true;
377 LOGI("setting active");
378 }
379
380 // make space by deleting oldest entries
381 if (remove_oldest_data) {
382 int available_bytes =
383 callback_buffer_ptr->availableForWrite() * sizeof(T);
384 if ((int)len > available_bytes) {
385 int gap = len - available_bytes;
386 uint8_t tmp[gap];
387 readBytes(tmp, gap);
388 }
389 }
390
391 return callback_buffer_ptr->writeArray(data, len / sizeof(T));
392 }
393
394 virtual size_t readBytes(uint8_t *data, size_t len) override {
395 if (!active) return 0;
396 return callback_buffer_ptr->readArray(data, len / sizeof(T));
397 }
398
400 void clear() {
401 if (active) {
402 callback_buffer_ptr->reset();
403 }
404 }
405
407 operator bool() { return active; }
408
410 int levelPercent() {return callback_buffer_ptr->levelPercent();}
411
412 protected:
413 BaseBuffer<T> *callback_buffer_ptr;
414 size_t active_limit = 0;
415 size_t total_written = 0;
416 bool active = false;
417 bool remove_oldest_data = false;
418 bool owns_buffer = false;
419};
420
421
422#ifndef SWIG
423
424struct DataNode {
425 size_t len=0;
426 Vector<uint8_t> data{0};
427
428 DataNode() = default;
430 DataNode(void*inData, int len){
431 this->len = len;
432 this->data.resize(len);
433 memcpy(&data[0], inData, len);
434 }
435};
436
445public:
446
447 DynamicMemoryStream() = default;
448
449 DynamicMemoryStream(bool isLoop, int defaultBufferSize=DEFAULT_BUFFER_SIZE ) {
450 this->default_buffer_size = defaultBufferSize;
451 is_loop = isLoop;
452 }
453 // Assign values from ref, clearing the original ref
454 void assign(DynamicMemoryStream &ref){
455 audio_list.swap(ref.audio_list);
456 it = ref.it;
457 total_available=ref.total_available;
458 default_buffer_size = ref.default_buffer_size;
459 alloc_failed = ref.alloc_failed;;
460 is_loop = ref.is_loop;
461 ref.clear();
462 }
463
465 virtual bool begin() override {
466 clear();
467 temp_audio.resize(default_buffer_size);
468 return true;
469 }
470
471 virtual void end() override {
472 clear();
473 }
474
476 virtual void setLoop(bool loop){
477 is_loop = loop;
478 }
479
480 void clear() {
481 DataNode *p_node;
482 bool ok;
483 do{
484 ok = audio_list.pop_front(p_node);
485 if (ok){
486 delete p_node;
487 }
488 } while (ok);
489
490 temp_audio.reset();
491 total_available = 0;
492 alloc_failed = false;
493 rewind();
494 }
495
496 size_t size(){
497 return total_available;
498 }
499
501 void rewind() {
502 it = audio_list.begin();
503 }
504
505 virtual size_t write(const uint8_t *data, size_t len) override {
506 DataNode *p_node = new DataNode((void*)data, len);
507 if (p_node->data){
508 alloc_failed = false;
509 total_available += len;
510 audio_list.push_back(p_node);
511
512 // setup interator to point to first record
513 if (it == audio_list.end()){
514 it = audio_list.begin();
515 }
516
517 return len;
518 }
519 alloc_failed = true;
520 return 0;
521 }
522
523 virtual int availableForWrite() override {
524 return alloc_failed ? 0 : default_buffer_size;
525 }
526
527 virtual int available() override {
528 if (it == audio_list.end()){
529 if (is_loop) rewind();
530 if (it == audio_list.end()) {
531 return 0;
532 }
533 }
534 return (*it)->len;
535 }
536
537 virtual size_t readBytes(uint8_t *data, size_t len) override {
538 // provide unprocessed data
539 if (temp_audio.available()>0){
540 return temp_audio.readArray(data, len);
541 }
542
543 // We have no more data
544 if (it==audio_list.end()){
545 if (is_loop){
546 rewind();
547 } else {
548 // stop the processing
549 return 0;
550 }
551 }
552
553 // provide data from next node
554 DataNode *p_node = *it;
555 int result_len = min(len, (size_t) p_node->len);
556 memcpy(data, &p_node->data[0], result_len);
557 // save unprocessed data to temp buffer
558 if (p_node->len>len){
559 uint8_t *start = &p_node->data[result_len];
560 int uprocessed_len = p_node->len - len;
561 temp_audio.writeArray(start, uprocessed_len);
562 }
563 //move to next pos
564 ++it;
565 return result_len;
566 }
567
568 List<DataNode*> &list() {
569 return audio_list;
570 }
571
575 template<typename T>
576 void postProcessSmoothTransition(int channels, float factor = 0.01, int remove=0){
577 if (remove>0){
578 for (int j=0;j<remove;j++){
579 DataNode* node = nullptr;
580 audio_list.pop_front(node);
581 if (node!=nullptr) delete node;
582 node = nullptr;
583 audio_list.pop_back(node);
584 if (node!=nullptr) delete node;
585 }
586 }
587
588 // Remove popping noise
589 SmoothTransition<T> clean_start(channels, true, false, factor);
590 auto first = *list().begin();
591 if (first!=nullptr){
592 clean_start.convert(&(first->data[0]),first->len);
593 }
594
595 SmoothTransition<T> clean_end(channels, false, true, factor);
596 auto last = * (--(list().end()));
597 if (last!=nullptr){
598 clean_end.convert(&(last->data[0]),last->len);
599 }
600 }
601
602
603protected:
604 List<DataNode*> audio_list;
605 List<DataNode*>::Iterator it = audio_list.end();
606 size_t total_available=0;
607 int default_buffer_size=DEFAULT_BUFFER_SIZE;
608 bool alloc_failed = false;
609 RingBuffer<uint8_t> temp_audio{DEFAULT_BUFFER_SIZE};
610 bool is_loop = false;
611
612};
613
614#endif
615
616} // namespace audio_tools
Supports the subscription to audio change notifications.
Definition AudioTypes.h:148
Supports changes to the sampling rate, bits and channels.
Definition AudioTypes.h:133
virtual AudioInfo audioInfoOut()
Definition AudioTypes.h:141
Base class for all Audio Streams. It support the boolean operator to test if the object is ready with...
Definition BaseStream.h:119
virtual size_t readSilence(uint8_t *buffer, size_t length)
Source to generate silence: just sets the buffer to 0.
Definition BaseStream.h:164
virtual void setAudioInfo(AudioInfo newInfo) override
Defines the input AudioInfo.
Definition BaseStream.h:127
virtual void writeSilence(size_t len)
Writes len bytes of silence (=0).
Definition BaseStream.h:156
virtual AudioInfo audioInfo() override
provides the actual input AudioInfo
Definition BaseStream.h:150
Shared functionality of all buffers.
Definition Buffers.h:22
virtual int readArray(T data[], int len)
reads multiple values
Definition Buffers.h:33
virtual int writeArray(const T data[], int len)
Fills the buffer data.
Definition Buffers.h:55
Base class for all Streams. It relies on write(const uint8_t *buffer, size_t size) and readBytes(uint...
Definition BaseStream.h:36
Provides data from a concatenation of Streams. Please note that the provided Streams can be played on...
Definition BaseStream.h:192
size_t write(const uint8_t *data, size_t size) override
not supported
Definition BaseStream.h:236
void setTimeout(size_t t)
Defines the timout the system waits for data when moving to the next stream.
Definition BaseStream.h:233
bool moveToNextStreamOnEnd()
Definition BaseStream.h:248
MemoryStream which is written and read using the internal RAM. For each write the data is allocated o...
Definition BaseStream.h:444
void postProcessSmoothTransition(int channels, float factor=0.01, int remove=0)
Post processing after the recording. We add a smooth transition at the beginning and at the end.
Definition BaseStream.h:576
virtual bool begin() override
Intializes the processing.
Definition BaseStream.h:465
virtual void setLoop(bool loop)
Automatically rewinds to the beginning when reaching the end.
Definition BaseStream.h:476
void rewind()
Sets the read position to the beginning.
Definition BaseStream.h:501
Definition List.h:26
Double linked list.
Definition List.h:18
A lock free N buffer. If count=2 we create a DoubleBuffer, if count=3 a TripleBuffer etc.
Definition Buffers.h:621
The Arduino Stream which provides silence and simulates a null device when used as audio target or au...
Definition BaseStream.h:289
Stream class which stores the data in a temporary queue buffer. The queue can be consumed e....
Definition BaseStream.h:308
QueueStream(BaseBuffer< T > &buffer)
Create stream from any BaseBuffer subclass.
Definition BaseStream.h:320
virtual bool begin() override
Activates the output.
Definition BaseStream.h:336
virtual bool begin(size_t activeWhenPercentFilled)
Activate only when filled buffer reached %.
Definition BaseStream.h:344
QueueStream()=default
Empty Constructor: call setBuffer() to set the buffer.
void clear()
Clears the data in the buffer.
Definition BaseStream.h:400
QueueStream(int bufferSize, int bufferCount, bool autoRemoveOldestDataIfFull=false)
Default constructor.
Definition BaseStream.h:313
int levelPercent()
Returns the fill level in percent.
Definition BaseStream.h:410
virtual void end() override
stops the processing
Definition BaseStream.h:355
Implements a typed Ringbuffer.
Definition Buffers.h:312
bool peek(T &result) override
peeks the actual entry from the buffer
Definition Buffers.h:332
bool read(T &result) override
reads a single value
Definition Buffers.h:319
virtual T * address() override
returns the address of the start of the physical read buffer
Definition Buffers.h:387
virtual bool write(T data) override
write add an entry to the buffer
Definition Buffers.h:362
virtual size_t size() override
Returns the maximum capacity of the buffer.
Definition Buffers.h:398
virtual void reset() override
clears the buffer
Definition Buffers.h:374
virtual bool isFull() override
checks if the buffer is full
Definition Buffers.h:357
virtual int available() override
provides the number of entries that are available to read
Definition Buffers.h:381
Changes the samples at the beginning or at the end to slowly ramp up the volume.
Definition BaseConverter.h:1781
Definition NoArduino.h:142
Vector implementation which provides the most important methods as defined by std::vector....
Definition Vector.h:21
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition AudioCodecsBase.h:10
Basic Audio information which drives e.g. I2S.
Definition AudioTypes.h:53
Definition BaseStream.h:424
DataNode(void *inData, int len)
Constructor.
Definition BaseStream.h:430