arduino-audio-tools
All Classes Namespaces Files Functions Variables Typedefs Enumerations Friends Modules Pages
BaseStream.h
1#pragma once
2#include "AudioTools/CoreAudio/Buffers.h"
3#include "AudioTools/CoreAudio/AudioTypes.h"
4#include "AudioTools/CoreAudio/BaseConverter.h"
5
6#ifdef ARDUINO
7#include "Stream.h"
8#endif
9
10#ifdef USE_STREAM_WRITE_OVERRIDE
11#define STREAM_WRITE_OVERRIDE override
12#else
13#define STREAM_WRITE_OVERRIDE
14#endif
15
16#ifdef USE_STREAM_READ_OVERRIDE
17#define STREAM_READ_OVERRIDE override
18#else
19#define STREAM_READ_OVERRIDE
20#endif
21
22#ifdef USE_STREAM_READCHAR_OVERRIDE
23#define STREAM_READCHAR_OVERRIDE override
24#else
25#define STREAM_READCHAR_OVERRIDE
26#endif
27
28namespace audio_tools {
29
36class BaseStream : public Stream {
37 public:
38 BaseStream() = default;
39 virtual ~BaseStream() = default;
40 BaseStream(BaseStream const &) = delete;
41 BaseStream &operator=(BaseStream const &) = delete;
42
43 virtual bool begin(){return true;}
44 virtual void end(){}
45
46 virtual size_t readBytes(uint8_t *data,
47 size_t len) STREAM_READ_OVERRIDE = 0;
48 virtual size_t write(const uint8_t *data, size_t len) override = 0;
49
50 virtual size_t write(uint8_t ch) override {
51 tmp_out.resize(MAX_SINGLE_CHARS);
52 if (tmp_out.isFull()) {
53 flush();
54 }
55 return tmp_out.write(ch);
56 }
57
58 virtual int available() override { return DEFAULT_BUFFER_SIZE; };
59
60 virtual int availableForWrite() override { return DEFAULT_BUFFER_SIZE; }
61
62 virtual void flush() override {
63 if (tmp_out.available() > 0) {
64 write((const uint8_t *)tmp_out.address(), tmp_out.available());
65 }
66 }
67
68// Methods which should be suppressed in the documentation
69#ifndef DOXYGEN
70
71 virtual size_t readBytes(char *data, size_t len) STREAM_READCHAR_OVERRIDE {
72 return readBytes((uint8_t *)data, len);
73 }
74
75 virtual int read() override {
76 refillReadBuffer();
77 // if it is empty we need to return an int -1
78 if (tmp_in.isEmpty()) return -1;
79 uint8_t result = 0;
80 if (!tmp_in.read(result)) return -1;
81 return result;
82 }
83
84 virtual int peek() override {
85 refillReadBuffer();
86 // if it is empty we need to return an int -1
87 if (tmp_in.isEmpty()) return -1;
88 uint8_t result = 0;
89 if (!tmp_in.peek(result)) return -1;
90 return result;
91 }
92
93#endif
94
95 protected:
96 RingBuffer<uint8_t> tmp_in{0};
97 RingBuffer<uint8_t> tmp_out{0};
98
99 void refillReadBuffer() {
100 tmp_in.resize(DEFAULT_BUFFER_SIZE);
101 if (tmp_in.isEmpty()) {
102 TRACED();
103 const int len = tmp_in.size();
104 uint8_t bytes[len];
105 int len_eff = readBytes(bytes, len);
106 // LOGD("tmp_in available: %d / size: %d / to be written %d",
107 // tmp_in.available(), tmp_in.size(), len_eff);
108 tmp_in.writeArray(bytes, len_eff);
109 }
110 }
111};
112
120 public:
121 AudioStream() = default;
122 virtual ~AudioStream() = default;
123 AudioStream(AudioStream const&) = delete;
124 AudioStream& operator=(AudioStream const&) = delete;
125
126 // Call from subclass or overwrite to do something useful
127 virtual void setAudioInfo(AudioInfo newInfo) override {
128 TRACED();
129
130 if (info != newInfo){
131 info = newInfo;
132 info.logInfo("in:");
133 }
134 // replicate information
135 AudioInfo out_new = audioInfoOut();
136 if (out_new) {
137 out_new.logInfo("out:");
138 notifyAudioChange(out_new);
139 }
140
141 }
142
143 virtual size_t readBytes(uint8_t *data, size_t len) override { return not_supported(0, "readBytes"); }
144
145 virtual size_t write(const uint8_t *data, size_t len) override{ return not_supported(0,"write"); }
146
147
148 virtual operator bool() { return info && available() > 0; }
149
150 virtual AudioInfo audioInfo() override {
151 return info;
152 }
153
154
156 virtual void writeSilence(size_t len){
157 int16_t zero = 0;
158 for (int j=0;j<len/2;j++){
159 write((uint8_t*)&zero,2);
160 }
161 }
162
164 virtual size_t readSilence(uint8_t *buffer, size_t length) {
165 memset(buffer, 0, length);
166 return length;
167 }
168
169 protected:
170 AudioInfo info;
171
172 virtual int not_supported(int out, const char *msg = "") {
173 LOGE("AudioStream: %s unsupported operation!", msg);
174 // trigger stacktrace
175 assert(false);
176 return out;
177 }
178
179};
180
181
192class CatStream : public BaseStream {
193 public:
194 CatStream() = default;
195
196 void add(Stream *stream) { input_streams.push_back(stream); }
197 void add(Stream &stream) { input_streams.push_back(&stream); }
198
199 bool begin() {
200 is_active = true;
201 return true;
202 }
203
204 void end() { is_active = false; }
205
206 int available() override {
207 if (!is_active) return 0;
208 if (!moveToNextStreamOnEnd()) {
209 return 0;
210 }
211 return availableWithTimout();
212 }
213
214 size_t readBytes(uint8_t *data, size_t len) override {
215 if (!is_active) return 0;
216 if (!moveToNextStreamOnEnd()) {
217 return 0;
218 }
219 return p_current_stream->readBytes(data, len);
220 }
221
223 operator bool() { return is_active && available() > 0; }
224
225 void setOnBeginCallback(void (*callback)(Stream *stream)) {
226 begin_callback = callback;
227 }
228 void setOnEndCallback(void (*callback)(Stream *stream)) {
229 end_callback = callback;
230 }
231
233 void setTimeout(uint32_t t) { _timeout = t; }
234
236 size_t write(const uint8_t *data, size_t size) override { return 0;};
237
238 protected:
239 Vector<Stream *> input_streams;
240 Stream *p_current_stream = nullptr;
241 bool is_active = false;
242 void (*begin_callback)(Stream *stream) = nullptr;
243 void (*end_callback)(Stream *stream) = nullptr;
244 uint_fast32_t _timeout = 0;
245
249 // keep on running
250 if (p_current_stream != nullptr && p_current_stream->available() > 0)
251 return true;
252 // at end?
253 if ((p_current_stream == nullptr || availableWithTimout() == 0)) {
254 if (end_callback && p_current_stream) end_callback(p_current_stream);
255 if (!input_streams.empty()) {
256 LOGI("using next stream");
257 p_current_stream = input_streams[0];
258 input_streams.pop_front();
259 if (begin_callback && p_current_stream)
260 begin_callback(p_current_stream);
261 } else {
262 p_current_stream = nullptr;
263 }
264 }
265 // returns true if we have a valid stream
266 return p_current_stream != nullptr;
267 }
268
269 int availableWithTimout() {
270 int result = p_current_stream->available();
271 if (result == 0) {
272 for (int j = 0; j < _timeout / 10; j++) {
273 delay(10);
274 result = p_current_stream->available();
275 if (result != 0) break;
276 }
277 }
278 return result;
279 }
280};
281
289class NullStream : public BaseStream {
290 public:
291 size_t write(const uint8_t *data, size_t len) override { return len; }
292
293 size_t readBytes(uint8_t *data, size_t len) override {
294 memset(data, 0, len);
295 return len;
296 }
297};
298
299
307template <class T>
308class QueueStream : public BaseStream {
309 public:
311 QueueStream(int bufferSize, int bufferCount,
312 bool autoRemoveOldestDataIfFull = false) {
313 owns_buffer = true;
314 callback_buffer_ptr = new NBuffer<T>(bufferSize, bufferCount);
315 remove_oldest_data = autoRemoveOldestDataIfFull;
316 }
319 owns_buffer = false;
320 callback_buffer_ptr = &buffer;
321 }
322
323 virtual ~QueueStream() {
324 if (owns_buffer) {
325 delete callback_buffer_ptr;
326 }
327 }
328
330 virtual bool begin() {
331 TRACED();
332 total_written = 0;
333 active = true;
334 return true;
335 }
336
338 virtual bool begin(size_t activeWhenPercentFilled) {
339 total_written = 0;
340 // determine total buffer size in bytes
341 size_t size = callback_buffer_ptr->size() * sizeof(T);
342 // calculate limit
343 active_limit = size * activeWhenPercentFilled / 100;
344 LOGI("activate after: %u bytes",(unsigned)active_limit);
345 return true;
346 }
347
349 virtual void end() {
350 TRACED();
351 active = false;
352 };
353
354 int available() override {
355 return active ? callback_buffer_ptr->available() * sizeof(T) : 0;
356 }
357
358 int availableForWrite() override {
359 if (!active && active_limit > 0) return DEFAULT_BUFFER_SIZE;
360 return callback_buffer_ptr->availableForWrite() * sizeof(T);
361 }
362
363 virtual size_t write(const uint8_t *data, size_t len) override {
364 if (len == 0) return 0;
365 if (active_limit == 0 && !active) return 0;
366
367 // activate automaticaly when limit has been reached
368 total_written += len;
369 if (!active && active_limit > 0 && total_written >= active_limit) {
370 this->active = true;
371 LOGI("setting active");
372 }
373
374 // make space by deleting oldest entries
375 if (remove_oldest_data) {
376 int available_bytes =
377 callback_buffer_ptr->availableForWrite() * sizeof(T);
378 if ((int)len > available_bytes) {
379 int gap = len - available_bytes;
380 uint8_t tmp[gap];
381 readBytes(tmp, gap);
382 }
383 }
384
385 return callback_buffer_ptr->writeArray(data, len / sizeof(T));
386 }
387
388 virtual size_t readBytes(uint8_t *data, size_t len) override {
389 if (!active) return 0;
390 return callback_buffer_ptr->readArray(data, len / sizeof(T));
391 }
392
394 void clear() {
395 if (active) {
396 callback_buffer_ptr->reset();
397 }
398 }
399
401 operator bool() { return active; }
402
403 protected:
404 BaseBuffer<T> *callback_buffer_ptr;
405 size_t active_limit = 0;
406 size_t total_written = 0;
407 bool active = false;
408 bool remove_oldest_data = false;
409 bool owns_buffer = false;
410};
411
412
413#ifndef SWIG
414
415struct DataNode {
416 size_t len=0;
417 Vector<uint8_t> data{0};
418
419 DataNode() = default;
421 DataNode(void*inData, int len){
422 this->len = len;
423 this->data.resize(len);
424 memcpy(&data[0], inData, len);
425 }
426};
427
436public:
437
438 DynamicMemoryStream() = default;
439
440 DynamicMemoryStream(bool isLoop, int defaultBufferSize=DEFAULT_BUFFER_SIZE ) {
441 this->default_buffer_size = defaultBufferSize;
442 is_loop = isLoop;
443 }
444 // Assign values from ref, clearing the original ref
445 void assign(DynamicMemoryStream &ref){
446 audio_list.swap(ref.audio_list);
447 it = ref.it;
448 total_available=ref.total_available;
449 default_buffer_size = ref.default_buffer_size;
450 alloc_failed = ref.alloc_failed;;
451 is_loop = ref.is_loop;
452 ref.clear();
453 }
454
456 virtual bool begin() {
457 clear();
458 temp_audio.resize(default_buffer_size);
459 return true;
460 }
461
462 virtual void end() {
463 clear();
464 }
465
467 virtual void setLoop(bool loop){
468 is_loop = loop;
469 }
470
471 void clear() {
472 DataNode *p_node;
473 bool ok;
474 do{
475 ok = audio_list.pop_front(p_node);
476 if (ok){
477 delete p_node;
478 }
479 } while (ok);
480
481 temp_audio.reset();
482 total_available = 0;
483 alloc_failed = false;
484 rewind();
485 }
486
487 size_t size(){
488 return total_available;
489 }
490
492 void rewind() {
493 it = audio_list.begin();
494 }
495
496 virtual size_t write(const uint8_t *data, size_t len) override {
497 DataNode *p_node = new DataNode((void*)data, len);
498 if (p_node->data){
499 alloc_failed = false;
500 total_available += len;
501 audio_list.push_back(p_node);
502
503 // setup interator to point to first record
504 if (it == audio_list.end()){
505 it = audio_list.begin();
506 }
507
508 return len;
509 }
510 alloc_failed = true;
511 return 0;
512 }
513
514 virtual int availableForWrite() override {
515 return alloc_failed ? 0 : default_buffer_size;
516 }
517
518 virtual int available() override {
519 if (it == audio_list.end()){
520 if (is_loop) rewind();
521 if (it == audio_list.end()) {
522 return 0;
523 }
524 }
525 return (*it)->len;
526 }
527
528 virtual size_t readBytes(uint8_t *data, size_t len) override {
529 // provide unprocessed data
530 if (temp_audio.available()>0){
531 return temp_audio.readArray(data, len);
532 }
533
534 // We have no more data
535 if (it==audio_list.end()){
536 if (is_loop){
537 rewind();
538 } else {
539 // stop the processing
540 return 0;
541 }
542 }
543
544 // provide data from next node
545 DataNode *p_node = *it;
546 int result_len = min(len, (size_t) p_node->len);
547 memcpy(data, &p_node->data[0], result_len);
548 // save unprocessed data to temp buffer
549 if (p_node->len>len){
550 uint8_t *start = &p_node->data[result_len];
551 int uprocessed_len = p_node->len - len;
552 temp_audio.writeArray(start, uprocessed_len);
553 }
554 //move to next pos
555 ++it;
556 return result_len;
557 }
558
559 List<DataNode*> &list() {
560 return audio_list;
561 }
562
566 template<typename T>
567 void postProcessSmoothTransition(int channels, float factor = 0.01, int remove=0){
568 if (remove>0){
569 for (int j=0;j<remove;j++){
570 DataNode* node = nullptr;
571 audio_list.pop_front(node);
572 if (node!=nullptr) delete node;
573 node = nullptr;
574 audio_list.pop_back(node);
575 if (node!=nullptr) delete node;
576 }
577 }
578
579 // Remove popping noise
580 SmoothTransition<T> clean_start(channels, true, false, factor);
581 auto first = *list().begin();
582 if (first!=nullptr){
583 clean_start.convert(&(first->data[0]),first->len);
584 }
585
586 SmoothTransition<T> clean_end(channels, false, true, factor);
587 auto last = * (--(list().end()));
588 if (last!=nullptr){
589 clean_end.convert(&(last->data[0]),last->len);
590 }
591 }
592
593
594protected:
595 List<DataNode*> audio_list;
596 List<DataNode*>::Iterator it = audio_list.end();
597 size_t total_available=0;
598 int default_buffer_size=DEFAULT_BUFFER_SIZE;
599 bool alloc_failed = false;
600 RingBuffer<uint8_t> temp_audio{DEFAULT_BUFFER_SIZE};
601 bool is_loop = false;
602
603};
604
605#endif
606
607} // namespace audio_tools
Supports the subscription to audio change notifications.
Definition AudioTypes.h:148
Supports changes to the sampling rate, bits and channels.
Definition AudioTypes.h:133
virtual AudioInfo audioInfoOut()
Definition AudioTypes.h:141
Base class for all Audio Streams. It support the boolean operator to test if the object is ready with...
Definition BaseStream.h:119
virtual size_t readSilence(uint8_t *buffer, size_t length)
Source to generate silence: just sets the buffer to 0.
Definition BaseStream.h:164
virtual void setAudioInfo(AudioInfo newInfo) override
Defines the input AudioInfo.
Definition BaseStream.h:127
virtual void writeSilence(size_t len)
Writes len bytes of silence (=0).
Definition BaseStream.h:156
virtual AudioInfo audioInfo() override
provides the actual input AudioInfo
Definition BaseStream.h:150
Shared functionality of all buffers.
Definition Buffers.h:26
virtual int readArray(T data[], int len)
reads multiple values
Definition Buffers.h:37
virtual int writeArray(const T data[], int len)
Fills the buffer data.
Definition Buffers.h:59
Base class for all Streams. It relies on write(const uint8_t *buffer, size_t size) and readBytes(uint...
Definition BaseStream.h:36
Provides data from a concatenation of Streams. Please note that the provided Streams can be played on...
Definition BaseStream.h:192
size_t write(const uint8_t *data, size_t size) override
not supported
Definition BaseStream.h:236
void setTimeout(uint32_t t)
Defines the timout the system waits for data when moving to the next stream.
Definition BaseStream.h:233
bool moveToNextStreamOnEnd()
Definition BaseStream.h:248
MemoryStream which is written and read using the internal RAM. For each write the data is allocated o...
Definition BaseStream.h:435
void postProcessSmoothTransition(int channels, float factor=0.01, int remove=0)
Post processing after the recording. We add a smooth transition at the beginning and at the end.
Definition BaseStream.h:567
virtual bool begin()
Intializes the processing.
Definition BaseStream.h:456
virtual void setLoop(bool loop)
Automatically rewinds to the beginning when reaching the end.
Definition BaseStream.h:467
void rewind()
Sets the read position to the beginning.
Definition BaseStream.h:492
Definition List.h:26
Double linked list.
Definition List.h:18
A lock free N buffer. If count=2 we create a DoubleBuffer, if count=3 a TripleBuffer etc.
Definition Buffers.h:617
The Arduino Stream which provides silence and simulates a null device when used as audio target or au...
Definition BaseStream.h:289
Stream class which stores the data in a temporary queue buffer. The queue can be consumed e....
Definition BaseStream.h:308
QueueStream(BaseBuffer< T > &buffer)
Create stream from any BaseBuffer subclass.
Definition BaseStream.h:318
virtual bool begin()
Activates the output.
Definition BaseStream.h:330
virtual bool begin(size_t activeWhenPercentFilled)
Activate only when filled buffer reached %.
Definition BaseStream.h:338
void clear()
Clears the data in the buffer.
Definition BaseStream.h:394
QueueStream(int bufferSize, int bufferCount, bool autoRemoveOldestDataIfFull=false)
Default constructor.
Definition BaseStream.h:311
virtual void end()
stops the processing
Definition BaseStream.h:349
Implements a typed Ringbuffer.
Definition Buffers.h:308
virtual T * address()
returns the address of the start of the physical read buffer
Definition Buffers.h:383
bool peek(T &result) override
peeks the actual entry from the buffer
Definition Buffers.h:328
virtual int available()
provides the number of entries that are available to read
Definition Buffers.h:377
bool read(T &result) override
reads a single value
Definition Buffers.h:315
virtual bool write(T data)
write add an entry to the buffer
Definition Buffers.h:358
virtual void reset()
clears the buffer
Definition Buffers.h:370
virtual size_t size()
Returns the maximum capacity of the buffer.
Definition Buffers.h:394
virtual bool isFull()
checks if the buffer is full
Definition Buffers.h:353
Changes the samples at the beginning or at the end to slowly ramp up the volume.
Definition BaseConverter.h:1781
Definition NoArduino.h:142
Vector implementation which provides the most important methods as defined by std::vector....
Definition Vector.h:21
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition AudioCodecsBase.h:10
Basic Audio information which drives e.g. I2S.
Definition AudioTypes.h:53
Definition BaseStream.h:415
DataNode(void *inData, int len)
Constructor.
Definition BaseStream.h:421