arduino-audio-tools
All Classes Namespaces Files Functions Variables Typedefs Enumerations Friends Macros Modules Pages
BaseStream.h
1#pragma once
2#include "AudioTools/CoreAudio/Buffers.h"
3#include "AudioTools/CoreAudio/AudioTypes.h"
4#include "AudioTools/CoreAudio/BaseConverter.h"
5
6#ifdef ARDUINO
7#include "Stream.h"
8#endif
9
10#ifdef USE_STREAM_WRITE_OVERRIDE
11#define STREAM_WRITE_OVERRIDE override
12#else
13#define STREAM_WRITE_OVERRIDE
14#endif
15
16#ifdef USE_STREAM_READ_OVERRIDE
17#define STREAM_READ_OVERRIDE override
18#else
19#define STREAM_READ_OVERRIDE
20#endif
21
22#ifdef USE_STREAM_READCHAR_OVERRIDE
23#define STREAM_READCHAR_OVERRIDE override
24#else
25#define STREAM_READCHAR_OVERRIDE
26#endif
27
28namespace audio_tools {
29
36class BaseStream : public Stream {
37 public:
38 BaseStream() = default;
39 virtual ~BaseStream() = default;
40 BaseStream(BaseStream const &) = delete;
41 BaseStream &operator=(BaseStream const &) = delete;
42
43 virtual bool begin(){return true;}
44 virtual void end(){}
45
46 virtual size_t readBytes(uint8_t *data,
47 size_t len) STREAM_READ_OVERRIDE = 0;
48 virtual size_t write(const uint8_t *data, size_t len) override = 0;
49
50 virtual size_t write(uint8_t ch) override {
51 tmp_out.resize(write_buffer_size);
52 if (tmp_out.isFull()) {
53 flush();
54 }
55 return tmp_out.write(ch);
56 }
57
58 virtual int available() override { return DEFAULT_BUFFER_SIZE; };
59
60 virtual int availableForWrite() override { return DEFAULT_BUFFER_SIZE; }
61
62 virtual void flush() override {
63 if (tmp_out.available() > 0) {
64 write((const uint8_t *)tmp_out.address(), tmp_out.available());
65 }
66 }
67
68// Methods which should be suppressed in the documentation
69#ifndef DOXYGEN
70
71 virtual size_t readBytes(char *data, size_t len) STREAM_READCHAR_OVERRIDE {
72 return readBytes((uint8_t *)data, len);
73 }
74
75 virtual int read() override {
76 refillReadBuffer();
77 // if it is empty we need to return an int -1
78 if (tmp_in.isEmpty()) return -1;
79 uint8_t result = 0;
80 if (!tmp_in.read(result)) return -1;
81 return result;
82 }
83
84 virtual int peek() override {
85 refillReadBuffer();
86 // if it is empty we need to return an int -1
87 if (tmp_in.isEmpty()) return -1;
88 uint8_t result = 0;
89 if (!tmp_in.peek(result)) return -1;
90 return result;
91 }
92
93#endif
94
95 void setWriteBufferSize(int size) { write_buffer_size = size;}
96
97 protected:
98 RingBuffer<uint8_t> tmp_in{0};
99 RingBuffer<uint8_t> tmp_out{0};
100 int write_buffer_size = MAX_SINGLE_CHARS;
101
102 void refillReadBuffer() {
103 tmp_in.resize(DEFAULT_BUFFER_SIZE);
104 if (tmp_in.isEmpty()) {
105 TRACED();
106 const int len = tmp_in.size();
107 uint8_t bytes[len];
108 int len_eff = readBytes(bytes, len);
109 // LOGD("tmp_in available: %d / size: %d / to be written %d",
110 // tmp_in.available(), tmp_in.size(), len_eff);
111 tmp_in.writeArray(bytes, len_eff);
112 }
113 }
114};
115
123 public:
124 AudioStream() = default;
125 virtual ~AudioStream() = default;
126 AudioStream(AudioStream const&) = delete;
127 AudioStream& operator=(AudioStream const&) = delete;
128
129 // Call from subclass or overwrite to do something useful
130 virtual void setAudioInfo(AudioInfo newInfo) override {
131 TRACED();
132
133 if (info != newInfo){
134 info = newInfo;
135 info.logInfo("in:");
136 }
137 // replicate information
138 AudioInfo out_new = audioInfoOut();
139 if (out_new) {
140 out_new.logInfo("out:");
141 notifyAudioChange(out_new);
142 }
143
144 }
145
146 virtual size_t readBytes(uint8_t *data, size_t len) override { return not_supported(0, "readBytes"); }
147
148 virtual size_t write(const uint8_t *data, size_t len) override{ return not_supported(0,"write"); }
149
150
151 virtual operator bool() { return info && available() > 0; }
152
153 virtual AudioInfo audioInfo() override {
154 return info;
155 }
156
157
159 virtual void writeSilence(size_t len){
160 int16_t zero = 0;
161 for (int j=0;j<len/2;j++){
162 write((uint8_t*)&zero,2);
163 }
164 }
165
167 virtual size_t readSilence(uint8_t *buffer, size_t length) {
168 memset(buffer, 0, length);
169 return length;
170 }
171
172 protected:
173 AudioInfo info;
174
175 virtual int not_supported(int out, const char *msg = "") {
176 LOGE("AudioStream: %s unsupported operation!", msg);
177 // trigger stacktrace
178 assert(false);
179 return out;
180 }
181
182};
183
184
195class CatStream : public BaseStream {
196 public:
197 CatStream() = default;
198
199 void add(Stream *stream) { input_streams.push_back(stream); }
200 void add(Stream &stream) { input_streams.push_back(&stream); }
201
202 bool begin() override {
203 is_active = true;
204 return true;
205 }
206
207 void end() override { is_active = false; }
208
209 int available() override {
210 if (!is_active) return 0;
211 if (!moveToNextStreamOnEnd()) {
212 return 0;
213 }
214 return availableWithTimout();
215 }
216
217 size_t readBytes(uint8_t *data, size_t len) override {
218 if (!is_active) return 0;
219 if (!moveToNextStreamOnEnd()) {
220 return 0;
221 }
222 return p_current_stream->readBytes(data, len);
223 }
224
226 operator bool() { return is_active && available() > 0; }
227
228 void setOnBeginCallback(void (*callback)(Stream *stream)) {
229 begin_callback = callback;
230 }
231 void setOnEndCallback(void (*callback)(Stream *stream)) {
232 end_callback = callback;
233 }
234
236 void setTimeout(size_t t) { _timeout = t; }
237
239 size_t write(const uint8_t *data, size_t size) override { return 0;};
240
241 protected:
242 Vector<Stream *> input_streams;
243 Stream *p_current_stream = nullptr;
244 bool is_active = false;
245 void (*begin_callback)(Stream *stream) = nullptr;
246 void (*end_callback)(Stream *stream) = nullptr;
247 size_t _timeout = 0;
248
252 // keep on running
253 if (p_current_stream != nullptr && p_current_stream->available() > 0)
254 return true;
255 // at end?
256 if ((p_current_stream == nullptr || availableWithTimout() == 0)) {
257 if (end_callback && p_current_stream) end_callback(p_current_stream);
258 if (!input_streams.empty()) {
259 LOGI("using next stream");
260 p_current_stream = input_streams[0];
261 input_streams.pop_front();
262 if (begin_callback && p_current_stream)
263 begin_callback(p_current_stream);
264 } else {
265 p_current_stream = nullptr;
266 }
267 }
268 // returns true if we have a valid stream
269 return p_current_stream != nullptr;
270 }
271
272 int availableWithTimout() {
273 int result = p_current_stream->available();
274 if (result == 0) {
275 for (int j = 0; j < _timeout / 10; j++) {
276 delay(10);
277 result = p_current_stream->available();
278 if (result != 0) break;
279 }
280 }
281 return result;
282 }
283};
284
292class NullStream : public BaseStream {
293 public:
294 size_t write(const uint8_t *data, size_t len) override { return len; }
295
296 size_t readBytes(uint8_t *data, size_t len) override {
297 memset(data, 0, len);
298 return len;
299 }
300};
301
302
310template <class T>
311class QueueStream : public BaseStream {
312 public:
314 QueueStream() = default;
316 QueueStream(int bufferSize, int bufferCount,
317 bool autoRemoveOldestDataIfFull = false) {
318 owns_buffer = true;
319 p_buffer = new NBuffer<T>(bufferSize, bufferCount);
320 remove_oldest_data = autoRemoveOldestDataIfFull;
321 }
324 setBuffer(buffer);
325 }
326
327 virtual ~QueueStream() {
328 if (owns_buffer) {
329 delete p_buffer;
330 }
331 }
332
333 void setBuffer(BaseBuffer<T> &buffer){
334 owns_buffer = false;
335 p_buffer = &buffer;
336 }
337
339 virtual bool begin() override {
340 TRACED();
341 total_written = 0;
342 active = true;
343 return true;
344 }
345
347 virtual bool begin(size_t activeWhenPercentFilled) {
348 total_written = 0;
349 // determine total buffer size in bytes
350 size_t size = p_buffer->size() * sizeof(T);
351 // calculate limit
352 active_limit = size * activeWhenPercentFilled / 100;
353 LOGI("activate after: %u bytes",(unsigned)active_limit);
354 return true;
355 }
356
358 virtual void end() override {
359 TRACED();
360 active = false;
361 };
362
363 int available() override {
364 return active ? p_buffer->available() * sizeof(T) : 0;
365 }
366
367 int availableForWrite() override {
368 if (!active && active_limit > 0) return DEFAULT_BUFFER_SIZE;
369 return p_buffer->availableForWrite() * sizeof(T);
370 }
371
372 virtual size_t write(const uint8_t *data, size_t len) override {
373 if (len == 0) return 0;
374 if (active_limit == 0 && !active) return 0;
375
376 // activate automaticaly when limit has been reached
377 total_written += len;
378 if (!active && active_limit > 0 && total_written >= active_limit) {
379 this->active = true;
380 LOGI("setting active");
381 }
382
383 // make space by deleting oldest entries
384 if (remove_oldest_data) {
385 int available_bytes =
386 p_buffer->availableForWrite() * sizeof(T);
387 if ((int)len > available_bytes) {
388 int gap = len - available_bytes;
389 uint8_t tmp[gap];
390 readBytes(tmp, gap);
391 }
392 }
393
394 return p_buffer->writeArray(data, len / sizeof(T));
395 }
396
397 virtual size_t readBytes(uint8_t *data, size_t len) override {
398 if (!active) return 0;
399 return p_buffer->readArray(data, len / sizeof(T));
400 }
401
402 int read() override {
403 if (!active) return -1;
404 T result;
405 if (!p_buffer->read(result)) {
406 return -1; // no data available
407 }
408 return result;
409 }
410
411 int peek() override {
412 if (!active) return -1;
413 T result;
414 if (p_buffer->peek(result)) {
415 return *(reinterpret_cast<uint8_t *>(&result));
416 }
417 return -1; // no data available
418 }
419
421 void clear() {
422 if (active) {
423 p_buffer->reset();
424 }
425 }
426
428 operator bool() { return active; }
429
431 int levelPercent() {return p_buffer->levelPercent();}
432
433 protected:
434 BaseBuffer<T> *p_buffer;
435 size_t active_limit = 0;
436 size_t total_written = 0;
437 bool active = false;
438 bool remove_oldest_data = false;
439 bool owns_buffer = false;
440};
441
442
443#ifndef SWIG
444
445struct DataNode {
446 size_t len=0;
447 Vector<uint8_t> data{0};
448
449 DataNode() = default;
451 DataNode(void*inData, int len){
452 this->len = len;
453 this->data.resize(len);
454 memcpy(&data[0], inData, len);
455 }
456};
457
466public:
467
468 DynamicMemoryStream() = default;
469
470 DynamicMemoryStream(bool isLoop, int defaultBufferSize=DEFAULT_BUFFER_SIZE ) {
471 this->default_buffer_size = defaultBufferSize;
472 is_loop = isLoop;
473 }
474 // Assign values from ref, clearing the original ref
475 void assign(DynamicMemoryStream &ref){
476 audio_list.swap(ref.audio_list);
477 it = ref.it;
478 total_available=ref.total_available;
479 default_buffer_size = ref.default_buffer_size;
480 alloc_failed = ref.alloc_failed;;
481 is_loop = ref.is_loop;
482 ref.clear();
483 }
484
486 virtual bool begin() override {
487 clear();
488 temp_audio.resize(default_buffer_size);
489 return true;
490 }
491
492 virtual void end() override {
493 clear();
494 }
495
497 virtual void setLoop(bool loop){
498 is_loop = loop;
499 }
500
501 void clear() {
502 DataNode *p_node;
503 bool ok;
504 do{
505 ok = audio_list.pop_front(p_node);
506 if (ok){
507 delete p_node;
508 }
509 } while (ok);
510
511 temp_audio.reset();
512 total_available = 0;
513 read_pos = 0;
514 alloc_failed = false;
515 rewind();
516 }
517
518 size_t size(){
519 return total_available;
520 }
521
523 void rewind() {
524 it = audio_list.begin();
525 read_pos = 0;
526 }
527
528 virtual size_t write(const uint8_t *data, size_t len) override {
529 DataNode *p_node = new DataNode((void*)data, len);
530 if (p_node->data){
531 alloc_failed = false;
532 total_available += len;
533 audio_list.push_back(p_node);
534
535 // setup interator to point to first record
536 if (it == audio_list.end()){
537 it = audio_list.begin();
538 }
539
540 return len;
541 }
542 alloc_failed = true;
543 return 0;
544 }
545
546 virtual int availableForWrite() override {
547 return alloc_failed ? 0 : default_buffer_size;
548 }
549
550 virtual int available() override {
551 if (it == audio_list.end()){
552 if (is_loop) rewind();
553 if (it == audio_list.end()) {
554 return 0;
555 }
556 }
557 return (*it)->len;
558 }
559
560 virtual size_t readBytes(uint8_t *data, size_t len) override {
561 // provide unprocessed data
562 if (temp_audio.available()>0){
563 size_t result = temp_audio.readArray(data, len);
564 read_pos += result;
565 return result;
566 }
567
568 // We have no more data
569 if (it==audio_list.end()){
570 if (is_loop){
571 rewind();
572 } else {
573 // stop the processing
574 return 0;
575 }
576 }
577
578 // provide data from next node
579 DataNode *p_node = *it;
580 int result_len = min(len, (size_t) p_node->len);
581 memcpy(data, &p_node->data[0], result_len);
582 // save unprocessed data to temp buffer
583 if (p_node->len>len){
584 uint8_t *start = &p_node->data[result_len];
585 int uprocessed_len = p_node->len - len;
586 temp_audio.writeArray(start, uprocessed_len);
587 }
588 //move to next pos
589 ++it;
590 read_pos += result_len;
591 return result_len;
592 }
593
594 List<DataNode*> &list() {
595 return audio_list;
596 }
597
601 template<typename T>
602 void postProcessSmoothTransition(int channels, float factor = 0.01, int remove=0){
603 if (remove>0){
604 for (int j=0;j<remove;j++){
605 DataNode* node = nullptr;
606 audio_list.pop_front(node);
607 if (node!=nullptr) delete node;
608 node = nullptr;
609 audio_list.pop_back(node);
610 if (node!=nullptr) delete node;
611 }
612 }
613
614 // Remove popping noise
615 SmoothTransition<T> clean_start(channels, true, false, factor);
616 auto first = *list().begin();
617 if (first!=nullptr){
618 clean_start.convert(&(first->data[0]),first->len);
619 }
620
621 SmoothTransition<T> clean_end(channels, false, true, factor);
622 auto last = * (--(list().end()));
623 if (last!=nullptr){
624 clean_end.convert(&(last->data[0]),last->len);
625 }
626 }
627
629 size_t getPos() {
630 return read_pos;
631 }
632
633
634protected:
635 List<DataNode*> audio_list;
636 List<DataNode*>::Iterator it = audio_list.end();
637 size_t total_available = 0;
638 size_t read_pos = 0;
639 int default_buffer_size=DEFAULT_BUFFER_SIZE;
640 bool alloc_failed = false;
641 RingBuffer<uint8_t> temp_audio{DEFAULT_BUFFER_SIZE};
642 bool is_loop = false;
643
644};
645
646#endif
647
648} // namespace audio_tools
Supports the subscription to audio change notifications.
Definition AudioTypes.h:148
Supports changes to the sampling rate, bits and channels.
Definition AudioTypes.h:133
virtual AudioInfo audioInfoOut()
Definition AudioTypes.h:141
Base class for all Audio Streams. It support the boolean operator to test if the object is ready with...
Definition BaseStream.h:122
virtual size_t readSilence(uint8_t *buffer, size_t length)
Source to generate silence: just sets the buffer to 0.
Definition BaseStream.h:167
virtual void setAudioInfo(AudioInfo newInfo) override
Defines the input AudioInfo.
Definition BaseStream.h:130
virtual void writeSilence(size_t len)
Writes len bytes of silence (=0).
Definition BaseStream.h:159
virtual AudioInfo audioInfo() override
provides the actual input AudioInfo
Definition BaseStream.h:153
Shared functionality of all buffers.
Definition Buffers.h:22
virtual int readArray(T data[], int len)
reads multiple values
Definition Buffers.h:33
virtual int writeArray(const T data[], int len)
Fills the buffer data.
Definition Buffers.h:55
Base class for all Streams. It relies on write(const uint8_t *buffer, size_t size) and readBytes(uint...
Definition BaseStream.h:36
Provides data from a concatenation of Streams. Please note that the provided Streams can be played on...
Definition BaseStream.h:195
size_t write(const uint8_t *data, size_t size) override
not supported
Definition BaseStream.h:239
void setTimeout(size_t t)
Defines the timout the system waits for data when moving to the next stream.
Definition BaseStream.h:236
bool moveToNextStreamOnEnd()
Definition BaseStream.h:251
MemoryStream which is written and read using the internal RAM. For each write the data is allocated o...
Definition BaseStream.h:465
size_t getPos()
Returns the current read position.
Definition BaseStream.h:629
void postProcessSmoothTransition(int channels, float factor=0.01, int remove=0)
Post processing after the recording. We add a smooth transition at the beginning and at the end.
Definition BaseStream.h:602
virtual bool begin() override
Intializes the processing.
Definition BaseStream.h:486
virtual void setLoop(bool loop)
Automatically rewinds to the beginning when reaching the end.
Definition BaseStream.h:497
void rewind()
Sets the read position to the beginning.
Definition BaseStream.h:523
Definition List.h:26
Double linked list.
Definition List.h:18
A lock free N buffer. If count=2 we create a DoubleBuffer, if count=3 a TripleBuffer etc.
Definition Buffers.h:648
The Arduino Stream which provides silence and simulates a null device when used as audio target or au...
Definition BaseStream.h:292
Stream class which stores the data in a temporary queue buffer. The queue can be consumed e....
Definition BaseStream.h:311
QueueStream(BaseBuffer< T > &buffer)
Create stream from any BaseBuffer subclass.
Definition BaseStream.h:323
virtual bool begin() override
Activates the output.
Definition BaseStream.h:339
virtual bool begin(size_t activeWhenPercentFilled)
Activate only when filled buffer reached %.
Definition BaseStream.h:347
QueueStream()=default
Empty Constructor: call setBuffer() to set the buffer.
void clear()
Clears the data in the buffer.
Definition BaseStream.h:421
QueueStream(int bufferSize, int bufferCount, bool autoRemoveOldestDataIfFull=false)
Default constructor.
Definition BaseStream.h:316
int levelPercent()
Returns the fill level in percent.
Definition BaseStream.h:431
virtual void end() override
stops the processing
Definition BaseStream.h:358
Implements a typed Ringbuffer.
Definition Buffers.h:327
bool peek(T &result) override
peeks the actual entry from the buffer
Definition Buffers.h:347
bool read(T &result) override
reads a single value
Definition Buffers.h:334
virtual T * address() override
returns the address of the start of the physical read buffer
Definition Buffers.h:402
virtual bool write(T data) override
write add an entry to the buffer
Definition Buffers.h:377
virtual size_t size() override
Returns the maximum capacity of the buffer.
Definition Buffers.h:414
virtual void reset() override
clears the buffer
Definition Buffers.h:389
virtual bool isFull() override
checks if the buffer is full
Definition Buffers.h:372
virtual bool resize(int len)
Resizes the buffer if supported: returns false if not supported.
Definition Buffers.h:404
virtual int available() override
provides the number of entries that are available to read
Definition Buffers.h:396
Changes the samples at the beginning or at the end to slowly ramp up the volume.
Definition BaseConverter.h:1781
Definition NoArduino.h:142
Vector implementation which provides the most important methods as defined by std::vector....
Definition Vector.h:21
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition AudioCodecsBase.h:10
Basic Audio information which drives e.g. I2S.
Definition AudioTypes.h:53
Definition BaseStream.h:445
DataNode(void *inData, int len)
Constructor.
Definition BaseStream.h:451