arduino-audio-tools
Loading...
Searching...
No Matches
BaseStream.h
1#pragma once
2#include "AudioTools/CoreAudio/Buffers.h"
3#include "AudioTools/CoreAudio/AudioTypes.h"
4#include "AudioTools/CoreAudio/BaseConverter.h"
5
6#ifdef ARDUINO
7#include "Stream.h"
8#endif
9
10#ifdef USE_STREAM_WRITE_OVERRIDE
11#define STREAM_WRITE_OVERRIDE override
12#else
13#define STREAM_WRITE_OVERRIDE
14#endif
15
16#ifdef USE_STREAM_READ_OVERRIDE
17#define STREAM_READ_OVERRIDE override
18#else
19#define STREAM_READ_OVERRIDE
20#endif
21
22#ifdef USE_STREAM_READCHAR_OVERRIDE
23#define STREAM_READCHAR_OVERRIDE override
24#else
25#define STREAM_READCHAR_OVERRIDE
26#endif
27
28namespace audio_tools {
29
36class BaseStream : public Stream {
37 public:
38 BaseStream() = default;
39 virtual ~BaseStream() = default;
40 BaseStream(BaseStream &) = default;
41 BaseStream &operator=(BaseStream &) = default;
42
43 virtual bool begin(){return true;}
44 virtual void end(){}
45
46 virtual size_t readBytes(uint8_t *data,
47 size_t len) STREAM_READ_OVERRIDE = 0;
48 virtual size_t write(const uint8_t *data, size_t len) override = 0;
49
50 virtual size_t write(uint8_t ch) override {
51 tmp_out.resize(write_buffer_size);
52 if (tmp_out.isFull()) {
53 flush();
54 }
55 return tmp_out.write(ch);
56 }
57
58 virtual int available() override { return DEFAULT_BUFFER_SIZE; };
59
60 virtual int availableForWrite() override { return DEFAULT_BUFFER_SIZE; }
61
62 virtual void flush() override {
63 if (tmp_out.available() > 0) {
64 write((const uint8_t *)tmp_out.address(), tmp_out.available());
65 }
66 }
67
68// Methods which should be suppressed in the documentation
69#ifndef DOXYGEN
70
71 virtual size_t readBytes(char *data, size_t len) STREAM_READCHAR_OVERRIDE {
72 return readBytes((uint8_t *)data, len);
73 }
74
75 virtual int read() override {
76 refillReadBuffer();
77 // if it is empty we need to return an int -1
78 if (tmp_in.isEmpty()) return -1;
79 uint8_t result = 0;
80 if (!tmp_in.read(result)) return -1;
81 return result;
82 }
83
84 virtual int peek() override {
85 refillReadBuffer();
86 // if it is empty we need to return an int -1
87 if (tmp_in.isEmpty()) return -1;
88 uint8_t result = 0;
89 if (!tmp_in.peek(result)) return -1;
90 return result;
91 }
92
93#endif
94
95 void setWriteBufferSize(int size) { write_buffer_size = size;}
96
97 protected:
98 RingBuffer<uint8_t> tmp_in{0};
99 RingBuffer<uint8_t> tmp_out{0};
100 int write_buffer_size = MAX_SINGLE_CHARS;
101
102 void refillReadBuffer() {
103 tmp_in.resize(DEFAULT_BUFFER_SIZE);
104 if (tmp_in.isEmpty()) {
105 TRACED();
106 const int len = tmp_in.size();
107 uint8_t bytes[len];
108 int len_eff = readBytes(bytes, len);
109 // LOGD("tmp_in available: %d / size: %d / to be written %d",
110 // tmp_in.available(), tmp_in.size(), len_eff);
111 tmp_in.writeArray(bytes, len_eff);
112 }
113 }
114};
115
123 public:
124 AudioStream() = default;
125 virtual ~AudioStream() = default;
126 AudioStream(AudioStream &) = default;
127 AudioStream& operator=(AudioStream &) = default;
128
129 // Call from subclass or overwrite to do something useful
130 virtual void setAudioInfo(AudioInfo newInfo) override {
131 TRACED();
132
133 if (info != newInfo){
134 info = newInfo;
135 info.logInfo("in:");
136 }
137 // replicate information
138 AudioInfo out_new = audioInfoOut();
139 if (out_new) {
140 out_new.logInfo("out:");
141 notifyAudioChange(out_new);
142 }
143
144 }
145
146 virtual size_t readBytes(uint8_t *data, size_t len) override { return not_supported(0, "readBytes"); }
147
148 virtual size_t write(const uint8_t *data, size_t len) override{ return not_supported(0,"write"); }
149
150
151 virtual operator bool() { return info && available() > 0; }
152
153 virtual AudioInfo audioInfo() override {
154 return info;
155 }
156
157
159 virtual void writeSilence(size_t len){
160 int16_t zero = 0;
161 for (int j=0;j<len/2;j++){
162 write((uint8_t*)&zero,2);
163 }
164 }
165
167 virtual size_t readSilence(uint8_t *buffer, size_t length) {
168 memset(buffer, 0, length);
169 return length;
170 }
171
172 protected:
173 AudioInfo info;
174
175 virtual int not_supported(int out, const char *msg = "") {
176 LOGE("AudioStream: %s unsupported operation!", msg);
177 // trigger stacktrace
178 assert(false);
179 return out;
180 }
181
182};
183
184
195class CatStream : public BaseStream {
196 public:
197 CatStream() = default;
198
199 void add(Stream *stream) { input_streams.push_back(stream); }
200 void add(Stream &stream) { input_streams.push_back(&stream); }
201
202 bool begin() override {
203 is_active = true;
204 return true;
205 }
206
207 void end() override { is_active = false; }
208
209 int available() override {
210 if (!is_active) return 0;
211 if (!moveToNextStreamOnEnd()) {
212 return 0;
213 }
214 return availableWithTimout();
215 }
216
217 size_t readBytes(uint8_t *data, size_t len) override {
218 if (!is_active) return 0;
219 if (!moveToNextStreamOnEnd()) {
220 return 0;
221 }
222 return p_current_stream->readBytes(data, len);
223 }
224
226 operator bool() { return is_active && available() > 0; }
227
228 void setOnBeginCallback(void (*callback)(Stream *stream)) {
229 begin_callback = callback;
230 }
231 void setOnEndCallback(void (*callback)(Stream *stream)) {
232 end_callback = callback;
233 }
234
236 void setTimeout(size_t t) { _timeout = t; }
237
239 size_t write(const uint8_t *data, size_t size) override { return 0;};
240
241 protected:
242 Vector<Stream *> input_streams;
243 Stream *p_current_stream = nullptr;
244 bool is_active = false;
245 void (*begin_callback)(Stream *stream) = nullptr;
246 void (*end_callback)(Stream *stream) = nullptr;
247 size_t _timeout = 0;
248
252 // keep on running
253 if (p_current_stream != nullptr && p_current_stream->available() > 0)
254 return true;
255 // at end?
256 if ((p_current_stream == nullptr || availableWithTimout() == 0)) {
257 if (end_callback && p_current_stream) end_callback(p_current_stream);
258 if (!input_streams.empty()) {
259 LOGI("using next stream");
260 p_current_stream = input_streams[0];
261 input_streams.pop_front();
262 if (begin_callback && p_current_stream)
263 begin_callback(p_current_stream);
264 } else {
265 p_current_stream = nullptr;
266 }
267 }
268 // returns true if we have a valid stream
269 return p_current_stream != nullptr;
270 }
271
272 int availableWithTimout() {
273 int result = p_current_stream->available();
274 if (result == 0) {
275 for (int j = 0; j < _timeout / 10; j++) {
276 delay(10);
277 result = p_current_stream->available();
278 if (result != 0) break;
279 }
280 }
281 return result;
282 }
283};
284
292class NullStream : public BaseStream {
293 public:
294 size_t write(const uint8_t *data, size_t len) override { return len; }
295
296 size_t readBytes(uint8_t *data, size_t len) override {
297 memset(data, 0, len);
298 return len;
299 }
300};
301
302
310template <class T>
311class QueueStream : public BaseStream {
312 public:
314 QueueStream() = default;
316 QueueStream(int bufferSize, int bufferCount,
317 bool autoRemoveOldestDataIfFull = false) {
318 owns_buffer = true;
319 p_buffer = new NBuffer<T>(bufferSize, bufferCount);
320 remove_oldest_data = autoRemoveOldestDataIfFull;
321 }
324 setBuffer(buffer);
325 }
326
327 virtual ~QueueStream() {
328 if (owns_buffer) {
329 delete p_buffer;
330 }
331 }
332
333 void setBuffer(BaseBuffer<T> &buffer){
334 owns_buffer = false;
335 p_buffer = &buffer;
336 }
337
339 virtual bool begin() override {
340 TRACED();
341 total_written = 0;
342 active = true;
343 return true;
344 }
345
347 virtual bool begin(size_t activeWhenPercentFilled) {
348 total_written = 0;
349 // determine total buffer size in bytes
350 size_t size = p_buffer->size() * sizeof(T);
351 // calculate limit
352 active_limit = size * activeWhenPercentFilled / 100;
353 LOGI("activate after: %u bytes",(unsigned)active_limit);
354 return true;
355 }
356
358 virtual void end() override {
359 TRACED();
360 active = false;
361 };
362
363 int available() override {
364 return active ? p_buffer->available() * sizeof(T) : 0;
365 }
366
367 int availableForWrite() override {
368 if (!active && active_limit > 0) return DEFAULT_BUFFER_SIZE;
369 return p_buffer->availableForWrite() * sizeof(T);
370 }
371
372 virtual size_t write(const uint8_t *data, size_t len) override {
373 if (len == 0) return 0;
374 if (active_limit == 0 && !active) return 0;
375
376 // activate automaticaly when limit has been reached
377 total_written += len;
378 if (!active && active_limit > 0 && total_written >= active_limit) {
379 this->active = true;
380 LOGI("setting active");
381 }
382
383 // make space by deleting oldest entries
384 if (remove_oldest_data) {
385 int available_bytes =
386 p_buffer->availableForWrite() * sizeof(T);
387 if ((int)len > available_bytes) {
388 int gap = len - available_bytes;
389 uint8_t tmp[gap];
390 readBytes(tmp, gap);
391 }
392 }
393
394 return p_buffer->writeArray(data, len / sizeof(T));
395 }
396
397 virtual size_t readBytes(uint8_t *data, size_t len) override {
398 if (!active) return 0;
399 return p_buffer->readArray(data, len / sizeof(T));
400 }
401
402 int read() override {
403 if (!active) return -1;
404 T result;
405 if (!p_buffer->read(result)) {
406 return -1; // no data available
407 }
408 return result;
409 }
410
411 int peek() override {
412 if (!active) return -1;
413 T result;
414 if (p_buffer->peek(result)) {
415 return *(reinterpret_cast<uint8_t *>(&result));
416 }
417 return -1; // no data available
418 }
419
421 void clear() {
422 if (active) {
423 p_buffer->reset();
424 }
425 }
426
428 operator bool() { return active; }
429
431 int levelPercent() {return p_buffer->levelPercent();}
432
434 bool resize(int size) {return p_buffer->resize(size);}
435
436 protected:
437 BaseBuffer<T> *p_buffer;
438 size_t active_limit = 0;
439 size_t total_written = 0;
440 bool active = false;
441 bool remove_oldest_data = false;
442 bool owns_buffer = false;
443};
444
445
446#ifndef SWIG
447
448struct DataNode {
449 size_t len=0;
450 Vector<uint8_t> data{0};
451
452 DataNode() = default;
454 DataNode(void*inData, int len){
455 this->len = len;
456 this->data.resize(len);
457 memcpy(&data[0], inData, len);
458 }
459};
460
469public:
470
471 DynamicMemoryStream() = default;
472
473 DynamicMemoryStream(bool isLoop, int defaultBufferSize=DEFAULT_BUFFER_SIZE, int maxRecords = 0 ) {
474 this->default_buffer_size = defaultBufferSize;
475 is_loop = isLoop;
476 this->max_records = maxRecords;
477 }
478 // Assign values from ref, clearing the original ref
479 void assign(DynamicMemoryStream &ref){
480 audio_list.swap(ref.audio_list);
481 it = ref.it;
482 total_available=ref.total_available;
483 default_buffer_size = ref.default_buffer_size;
484 alloc_failed = ref.alloc_failed;;
485 is_loop = ref.is_loop;
486 ref.clear();
487 }
488
490 virtual bool begin() override {
491 clear();
492 temp_audio.resize(default_buffer_size);
493 return true;
494 }
495
496 virtual void end() override {
497 clear();
498 }
499
501 virtual void setLoop(bool loop){
502 is_loop = loop;
503 }
504
506 void setConsumeOnRead(bool consume){
507 consume_on_read = consume;
508 }
509
510 void clear() {
511 DataNode *p_node;
512 bool ok;
513 do{
514 ok = audio_list.pop_front(p_node);
515 if (ok){
516 delete p_node;
517 }
518 } while (ok);
519
520 temp_audio.reset();
521 total_available = 0;
522 read_pos = 0;
523 alloc_failed = false;
524 rewind();
525 }
526
527 size_t size(){
528 return total_available;
529 }
530
532 void rewind() {
533 it = audio_list.begin();
534 read_pos = 0;
535 }
536
537 virtual size_t write(const uint8_t *data, size_t len) override {
538 int size = audio_list.size();
539 LOGI("write: %d / records: %d (max %d)",(int)len, size ,max_records);
540 DataNode *p_node = new DataNode((void*)data, len);
541 if (p_node->data){
542 alloc_failed = false;
543 total_available += len;
544 audio_list.push_back(p_node);
545
546 // setup interator to point to first record
547 if (it == audio_list.end()){
548 it = audio_list.begin();
549 }
550
551 return len;
552 }
553 alloc_failed = true;
554 return 0;
555 }
556
557 virtual int availableForWrite() override {
558 // check for max_records
559 if (max_records > 0 && audio_list.size() >= max_records) {
560 return 0;
561 }
562 return alloc_failed ? 0 : default_buffer_size;
563 }
564
565 virtual int available() override {
566 if (it == audio_list.end()){
567 if (is_loop) rewind();
568 if (it == audio_list.end()) {
569 return 0;
570 }
571 }
572 return (*it)->len;
573 }
574
575 virtual size_t readBytes(uint8_t *data, size_t len) override {
576 // provide unprocessed data
577 if (temp_audio.available() > 0){
578 size_t result = temp_audio.readArray(data, len);
579 read_pos += result;
580 return result;
581 }
582
583 // We have no more data
584 if (it==audio_list.end()){
585 if (is_loop){
586 rewind();
587 } else {
588 // stop the processing
589 return 0;
590 }
591 }
592
593 // provide data from next node
594 DataNode *p_node = *it;
595 size_t node_len = p_node->len;
596 size_t result_len = node_len < len ? node_len : len;
597 if (result_len > 0) {
598 memcpy(data, &p_node->data[0], result_len);
599 }
600 // save unprocessed remainder to temp buffer (to be returned on next call)
601 if (node_len > result_len) {
602 size_t remainder_len = node_len - result_len;
603 temp_audio.resize((int)remainder_len);
604 uint8_t *start = &p_node->data[result_len];
605 temp_audio.writeArray(start, remainder_len);
606 }
607 // advance and optionally consume the node
608 if (consume_on_read) {
609 DataNode* removed = nullptr;
610 bool ok = audio_list.pop_front(removed);
611 if (ok && removed != nullptr) {
612 if (total_available >= removed->len) {
613 total_available -= removed->len;
614 } else {
615 total_available = 0;
616 }
617 delete removed;
618 }
619 it = audio_list.begin();
620 } else {
621 ++it;
622 }
623 read_pos += result_len;
624 return result_len;
625 }
626
627 List<DataNode*> &list() {
628 return audio_list;
629 }
630
634 template<typename T>
635 void postProcessSmoothTransition(int channels, float factor = 0.01, int remove=0){
636 if (remove>0){
637 for (int j=0;j<remove;j++){
638 DataNode* node = nullptr;
639 audio_list.pop_front(node);
640 if (node!=nullptr) delete node;
641 node = nullptr;
642 audio_list.pop_back(node);
643 if (node!=nullptr) delete node;
644 }
645 }
646
647 // Remove popping noise
648 SmoothTransition<T> clean_start(channels, true, false, factor);
649 auto first = *list().begin();
650 if (first!=nullptr){
651 clean_start.convert(&(first->data[0]),first->len);
652 }
653
654 SmoothTransition<T> clean_end(channels, false, true, factor);
655 auto last = * (--(list().end()));
656 if (last!=nullptr){
657 clean_end.convert(&(last->data[0]),last->len);
658 }
659 }
660
662 size_t getPos() {
663 return read_pos;
664 }
665
666 void setMaxRecords(int max_records){
667 this->max_records = max_records;
668 }
669
670
671protected:
672 List<DataNode*> audio_list;
673 List<DataNode*>::Iterator it = audio_list.end();
674 size_t total_available = 0;
675 size_t read_pos = 0;
676 int default_buffer_size=DEFAULT_BUFFER_SIZE;
677 int max_records = 0;
678 bool alloc_failed = false;
679 RingBuffer<uint8_t> temp_audio{0};
680 bool is_loop = false;
681 bool consume_on_read = false;
682
683};
684
685#endif
686
687} // namespace audio_tools
Supports the subscription to audio change notifications.
Definition AudioTypes.h:150
Supports changes to the sampling rate, bits and channels.
Definition AudioTypes.h:135
virtual AudioInfo audioInfoOut()
Definition AudioTypes.h:143
Base class for all Audio Streams. It support the boolean operator to test if the object is ready with...
Definition BaseStream.h:122
virtual size_t readSilence(uint8_t *buffer, size_t length)
Source to generate silence: just sets the buffer to 0.
Definition BaseStream.h:167
virtual void setAudioInfo(AudioInfo newInfo) override
Defines the input AudioInfo.
Definition BaseStream.h:130
virtual void writeSilence(size_t len)
Writes len bytes of silence (=0).
Definition BaseStream.h:159
virtual AudioInfo audioInfo() override
provides the actual input AudioInfo
Definition BaseStream.h:153
Shared functionality of all buffers.
Definition Buffers.h:22
virtual bool read(T &result)=0
reads a single value
virtual int readArray(T data[], int len)
reads multiple values
Definition Buffers.h:33
virtual void reset()=0
clears the buffer
virtual int writeArray(const T data[], int len)
Fills the buffer data.
Definition Buffers.h:55
virtual int availableForWrite()=0
provides the number of entries that are available to write
virtual bool peek(T &result)=0
peeks the actual entry from the buffer
virtual float levelPercent()
Returns the level of the buffer in %.
Definition Buffers.h:109
virtual int available()=0
provides the number of entries that are available to read
virtual bool resize(int bytes)
Resizes the buffer if supported: returns false if not supported.
Definition Buffers.h:117
Base class for all Streams. It relies on write(const uint8_t *buffer, size_t size) and readBytes(uint...
Definition BaseStream.h:36
Provides data from a concatenation of Streams. Please note that the provided Streams can be played on...
Definition BaseStream.h:195
size_t write(const uint8_t *data, size_t size) override
not supported
Definition BaseStream.h:239
void setTimeout(size_t t)
Defines the timout the system waits for data when moving to the next stream.
Definition BaseStream.h:236
bool moveToNextStreamOnEnd()
Definition BaseStream.h:251
MemoryStream which is written and read using the internal RAM. For each write the data is allocated o...
Definition BaseStream.h:468
size_t getPos()
Returns the current read position.
Definition BaseStream.h:662
void postProcessSmoothTransition(int channels, float factor=0.01, int remove=0)
Post processing after the recording. We add a smooth transition at the beginning and at the end.
Definition BaseStream.h:635
virtual bool begin() override
Intializes the processing.
Definition BaseStream.h:490
virtual void setLoop(bool loop)
Automatically rewinds to the beginning when reaching the end.
Definition BaseStream.h:501
void rewind()
Sets the read position to the beginning.
Definition BaseStream.h:532
void setConsumeOnRead(bool consume)
Enable or disable consuming reads (remove records as they are read)
Definition BaseStream.h:506
A lock free N buffer. If count=2 we create a DoubleBuffer, if count=3 a TripleBuffer etc.
Definition Buffers.h:663
The Arduino Stream which provides silence and simulates a null device when used as audio target or au...
Definition BaseStream.h:292
Stream class which stores the data in a temporary queue buffer. The queue can be consumed e....
Definition BaseStream.h:311
QueueStream(BaseBuffer< T > &buffer)
Create stream from any BaseBuffer subclass.
Definition BaseStream.h:323
virtual bool begin() override
Activates the output.
Definition BaseStream.h:339
virtual bool begin(size_t activeWhenPercentFilled)
Activate only when filled buffer reached %.
Definition BaseStream.h:347
QueueStream()=default
Empty Constructor: call setBuffer() to set the buffer.
bool resize(int size)
Resize the buffer.
Definition BaseStream.h:434
void clear()
Clears the data in the buffer.
Definition BaseStream.h:421
QueueStream(int bufferSize, int bufferCount, bool autoRemoveOldestDataIfFull=false)
Default constructor.
Definition BaseStream.h:316
int levelPercent()
Returns the fill level in percent.
Definition BaseStream.h:431
virtual void end() override
stops the processing
Definition BaseStream.h:358
Implements a typed Ringbuffer.
Definition Buffers.h:341
bool peek(T &result) override
peeks the actual entry from the buffer
Definition Buffers.h:361
bool read(T &result) override
reads a single value
Definition Buffers.h:348
virtual T * address() override
returns the address of the start of the physical read buffer
Definition Buffers.h:416
virtual bool write(T data) override
write add an entry to the buffer
Definition Buffers.h:391
virtual size_t size() override
Returns the maximum capacity of the buffer.
Definition Buffers.h:428
virtual void reset() override
clears the buffer
Definition Buffers.h:403
virtual bool isFull() override
checks if the buffer is full
Definition Buffers.h:386
virtual bool resize(int len)
Resizes the buffer if supported: returns false if not supported.
Definition Buffers.h:418
virtual int available() override
provides the number of entries that are available to read
Definition Buffers.h:410
Changes the samples at the beginning or at the end to slowly ramp up the volume.
Definition BaseConverter.h:1767
Definition NoArduino.h:142
Vector implementation which provides the most important methods as defined by std::vector....
Definition Vector.h:21
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition AudioCodecsBase.h:10
Basic Audio information which drives e.g. I2S.
Definition AudioTypes.h:55
Definition BaseStream.h:448
DataNode(void *inData, int len)
Constructor.
Definition BaseStream.h:454