arduino-audio-tools
BaseStream.h
1 #pragma once
2 #include "AudioTools/CoreAudio/Buffers.h"
3 
4 #ifdef ARDUINO
5 #include "Stream.h"
6 #endif
7 
8 #ifdef USE_STREAM_WRITE_OVERRIDE
9 #define STREAM_WRITE_OVERRIDE override
10 #else
11 #define STREAM_WRITE_OVERRIDE
12 #endif
13 
14 #ifdef USE_STREAM_READ_OVERRIDE
15 #define STREAM_READ_OVERRIDE override
16 #else
17 #define STREAM_READ_OVERRIDE
18 #endif
19 
20 #ifdef USE_STREAM_READCHAR_OVERRIDE
21 #define STREAM_READCHAR_OVERRIDE override
22 #else
23 #define STREAM_READCHAR_OVERRIDE
24 #endif
25 
26 namespace audio_tools {
27 
34 class BaseStream : public Stream {
35  public:
36  BaseStream() = default;
37  virtual ~BaseStream() = default;
38  BaseStream(BaseStream const &) = delete;
39  BaseStream &operator=(BaseStream const &) = delete;
40 
41  virtual bool begin(){return true;}
42  virtual void end(){}
43 
44  virtual size_t readBytes(uint8_t *data,
45  size_t len) STREAM_READ_OVERRIDE = 0;
46  virtual size_t write(const uint8_t *data, size_t len) override = 0;
47 
48  virtual size_t write(uint8_t ch) override {
49  tmp_out.resize(MAX_SINGLE_CHARS);
50  if (tmp_out.isFull()) {
51  flush();
52  }
53  return tmp_out.write(ch);
54  }
55 
56  virtual int available() override { return DEFAULT_BUFFER_SIZE; };
57 
58  virtual int availableForWrite() override { return DEFAULT_BUFFER_SIZE; }
59 
60  virtual void flush() override {
61  if (tmp_out.available() > 0) {
62  write((const uint8_t *)tmp_out.address(), tmp_out.available());
63  }
64  }
65 
66 // Methods which should be suppressed in the documentation
67 #ifndef DOXYGEN
68 
69  virtual size_t readBytes(char *data, size_t len) STREAM_READCHAR_OVERRIDE {
70  return readBytes((uint8_t *)data, len);
71  }
72 
73  virtual int read() override {
74  refillReadBuffer();
75  // if it is empty we need to return an int -1
76  if (tmp_in.isEmpty()) return -1;
77  return tmp_in.read();
78  }
79 
80  virtual int peek() override {
81  refillReadBuffer();
82  // if it is empty we need to return an int -1
83  if (tmp_in.isEmpty()) return -1;
84  return tmp_in.peek();
85  }
86 
87 #endif
88 
89  protected:
90  RingBuffer<uint8_t> tmp_in{0};
91  RingBuffer<uint8_t> tmp_out{0};
92 
93  void refillReadBuffer() {
94  tmp_in.resize(DEFAULT_BUFFER_SIZE);
95  if (tmp_in.isEmpty()) {
96  TRACED();
97  const int len = tmp_in.size();
98  uint8_t bytes[len];
99  int len_eff = readBytes(bytes, len);
100  // LOGD("tmp_in available: %d / size: %d / to be written %d",
101  // tmp_in.available(), tmp_in.size(), len_eff);
102  tmp_in.writeArray(bytes, len_eff);
103  }
104  }
105 };
106 
113 class AudioStream : public BaseStream, public AudioInfoSupport, public AudioInfoSource {
114  public:
115  AudioStream() = default;
116  virtual ~AudioStream() = default;
117  AudioStream(AudioStream const&) = delete;
118  AudioStream& operator=(AudioStream const&) = delete;
119 
120  // Call from subclass or overwrite to do something useful
121  virtual void setAudioInfo(AudioInfo newInfo) override {
122  TRACED();
123 
124  if (info != newInfo){
125  info = newInfo;
126  info.logInfo("in:");
127  }
128  // replicate information
129  AudioInfo out_new = audioInfoOut();
130  if (out_new) {
131  out_new.logInfo("out:");
132  notifyAudioChange(out_new);
133  }
134 
135  }
136 
137  virtual size_t readBytes(uint8_t *data, size_t len) override { return not_supported(0, "readBytes"); }
138 
139  virtual size_t write(const uint8_t *data, size_t len) override{ return not_supported(0,"write"); }
140 
141 
142  virtual operator bool() { return info && available() > 0; }
143 
144  virtual AudioInfo audioInfo() override {
145  return info;
146  }
147 
148 
150  virtual void writeSilence(size_t len){
151  int16_t zero = 0;
152  for (int j=0;j<len/2;j++){
153  write((uint8_t*)&zero,2);
154  }
155  }
156 
158  virtual size_t readSilence(uint8_t *buffer, size_t length) {
159  memset(buffer, 0, length);
160  return length;
161  }
162 
163  protected:
164  AudioInfo info;
165 
166  virtual int not_supported(int out, const char *msg = "") {
167  LOGE("AudioStream: %s unsupported operation!", msg);
168  // trigger stacktrace
169  assert(false);
170  return out;
171  }
172 
173 };
174 
175 
186 class CatStream : public BaseStream {
187  public:
188  CatStream() = default;
189 
190  void add(Stream *stream) { input_streams.push_back(stream); }
191  void add(Stream &stream) { input_streams.push_back(&stream); }
192 
193  bool begin() {
194  is_active = true;
195  return true;
196  }
197 
198  void end() { is_active = false; }
199 
200  int available() override {
201  if (!is_active) return 0;
202  if (!moveToNextStreamOnEnd()) {
203  return 0;
204  }
205  return availableWithTimout();
206  }
207 
208  size_t readBytes(uint8_t *data, size_t len) override {
209  if (!is_active) return 0;
210  if (!moveToNextStreamOnEnd()) {
211  return 0;
212  }
213  return p_current_stream->readBytes(data, len);
214  }
215 
217  operator bool() { return is_active && available() > 0; }
218 
219  void setOnBeginCallback(void (*callback)(Stream *stream)) {
220  begin_callback = callback;
221  }
222  void setOnEndCallback(void (*callback)(Stream *stream)) {
223  end_callback = callback;
224  }
225  void setTimeout(uint32_t t) { _timeout = t; }
226 
228  size_t write(const uint8_t *data, size_t size) override { return 0;};
229 
230  protected:
231  Vector<Stream *> input_streams;
232  Stream *p_current_stream = nullptr;
233  bool is_active = false;
234  void (*begin_callback)(Stream *stream) = nullptr;
235  void (*end_callback)(Stream *stream) = nullptr;
236  uint_fast32_t _timeout = 0;
237 
241  // keep on running
242  if (p_current_stream != nullptr && p_current_stream->available() > 0)
243  return true;
244  // at end?
245  if ((p_current_stream == nullptr || availableWithTimout() == 0)) {
246  if (end_callback && p_current_stream) end_callback(p_current_stream);
247  if (!input_streams.empty()) {
248  LOGI("using next stream");
249  p_current_stream = input_streams[0];
250  input_streams.pop_front();
251  if (begin_callback && p_current_stream)
252  begin_callback(p_current_stream);
253  } else {
254  p_current_stream = nullptr;
255  }
256  }
257  // returns true if we have a valid stream
258  return p_current_stream != nullptr;
259  }
260 
261  int availableWithTimout() {
262  int result = p_current_stream->available();
263  if (result == 0) {
264  for (int j = 0; j < _timeout / 10; j++) {
265  delay(10);
266  result = p_current_stream->available();
267  if (result != 0) break;
268  }
269  }
270  return result;
271  }
272 };
273 
281 class NullStream : public BaseStream {
282  public:
283  size_t write(const uint8_t *data, size_t len) override { return len; }
284 
285  size_t readBytes(uint8_t *data, size_t len) override {
286  memset(data, 0, len);
287  return len;
288  }
289 };
290 
291 
299 template <class T>
300 class QueueStream : public BaseStream {
301  public:
303  QueueStream(int bufferSize, int bufferCount,
304  bool autoRemoveOldestDataIfFull = false) {
305  owns_buffer = true;
306  callback_buffer_ptr = new NBuffer<T>(bufferSize, bufferCount);
307  remove_oldest_data = autoRemoveOldestDataIfFull;
308  }
311  owns_buffer = false;
312  callback_buffer_ptr = &buffer;
313  }
314 
315  virtual ~QueueStream() {
316  if (owns_buffer) {
317  delete callback_buffer_ptr;
318  }
319  }
320 
322  virtual bool begin() {
323  TRACED();
324  active = true;
325  return true;
326  }
327 
329  virtual bool begin(size_t activeWhenPercentFilled) {
330  // determine total buffer size in bytes
331  size_t size = callback_buffer_ptr->size() * sizeof(T);
332  // calculate limit
333  active_limit = size * activeWhenPercentFilled / 100;
334  return true;
335  }
336 
338  virtual void end() {
339  TRACED();
340  active = false;
341  };
342 
343  int available() override {
344  return active ? callback_buffer_ptr->available() * sizeof(T) : 0;
345  }
346 
347  int availableForWrite() override {
348  return callback_buffer_ptr->availableForWrite() * sizeof(T);
349  }
350 
351  virtual size_t write(const uint8_t *data, size_t len) override {
352  if (active_limit == 0 && !active) return 0;
353 
354  // activate automaticaly when limit has been reached
355  if (active_limit > 0 && !active && available() >= active_limit) {
356  this->active = true;
357  }
358 
359  // make space by deleting oldest entries
360  if (remove_oldest_data) {
361  int available_bytes =
362  callback_buffer_ptr->availableForWrite() * sizeof(T);
363  if ((int)len > available_bytes) {
364  int gap = len - available_bytes;
365  uint8_t tmp[gap];
366  readBytes(tmp, gap);
367  }
368  }
369 
370  return callback_buffer_ptr->writeArray(data, len / sizeof(T));
371  }
372 
373  virtual size_t readBytes(uint8_t *data, size_t len) override {
374  if (!active) return 0;
375  return callback_buffer_ptr->readArray(data, len / sizeof(T));
376  }
377 
379  void clear() {
380  if (active) {
381  callback_buffer_ptr->reset();
382  }
383  }
384 
386  operator bool() { return active; }
387 
388  protected:
389  BaseBuffer<T> *callback_buffer_ptr;
390  size_t active_limit = 0;
391  bool active;
392  bool remove_oldest_data;
393  bool owns_buffer;
394 };
395 
396 #if USE_OBSOLETE
397 // support legacy name
398 template <typename T>
399 using CallbackBufferedStream = QueueStream<T>;
400 #endif
401 
402 #ifndef SWIG
403 
404 struct DataNode {
405  size_t len=0;
406  Vector<uint8_t> data{0};
407 
408  DataNode() = default;
410  DataNode(void*inData, int len){
411  this->len = len;
412  this->data.resize(len);
413  memcpy(&data[0], inData, len);
414  }
415 };
416 
425 public:
426 
427  DynamicMemoryStream() = default;
428 
429  DynamicMemoryStream(bool isLoop, int defaultBufferSize=DEFAULT_BUFFER_SIZE ) {
430  this->default_buffer_size = defaultBufferSize;
431  is_loop = isLoop;
432  }
433  // Assign values from ref, clearing the original ref
434  void assign(DynamicMemoryStream &ref){
435  audio_list.swap(ref.audio_list);
436  it = ref.it;
437  total_available=ref.total_available;
438  default_buffer_size = ref.default_buffer_size;
439  alloc_failed = ref.alloc_failed;;
440  is_loop = ref.is_loop;
441  ref.clear();
442  }
443 
445  virtual bool begin() {
446  clear();
447  temp_audio.resize(default_buffer_size);
448  return true;
449  }
450 
451  virtual void end() {
452  clear();
453  }
454 
456  virtual void setLoop(bool loop){
457  is_loop = loop;
458  }
459 
460  void clear() {
461  DataNode *p_node;
462  bool ok;
463  do{
464  ok = audio_list.pop_front(p_node);
465  if (ok){
466  delete p_node;
467  }
468  } while (ok);
469 
470  temp_audio.reset();
471  total_available = 0;
472  alloc_failed = false;
473  rewind();
474  }
475 
476  size_t size(){
477  return total_available;
478  }
479 
481  void rewind() {
482  it = audio_list.begin();
483  }
484 
485  virtual size_t write(const uint8_t *data, size_t len) override {
486  DataNode *p_node = new DataNode((void*)data, len);
487  if (p_node->data){
488  alloc_failed = false;
489  total_available += len;
490  audio_list.push_back(p_node);
491 
492  // setup interator to point to first record
493  if (it == audio_list.end()){
494  it = audio_list.begin();
495  }
496 
497  return len;
498  }
499  alloc_failed = true;
500  return 0;
501  }
502 
503  virtual int availableForWrite() override {
504  return alloc_failed ? 0 : default_buffer_size;
505  }
506 
507  virtual int available() override {
508  if (it == audio_list.end()){
509  if (is_loop) rewind();
510  if (it == audio_list.end()) {
511  return 0;
512  }
513  }
514  return (*it)->len;
515  }
516 
517  virtual size_t readBytes(uint8_t *data, size_t len) override {
518  // provide unprocessed data
519  if (temp_audio.available()>0){
520  return temp_audio.readArray(data, len);
521  }
522 
523  // We have no more data
524  if (it==audio_list.end()){
525  if (is_loop){
526  rewind();
527  } else {
528  // stop the processing
529  return 0;
530  }
531  }
532 
533  // provide data from next node
534  DataNode *p_node = *it;
535  int result_len = min(len, (size_t) p_node->len);
536  memcpy(data, &p_node->data[0], result_len);
537  // save unprocessed data to temp buffer
538  if (p_node->len>len){
539  uint8_t *start = &p_node->data[result_len];
540  int uprocessed_len = p_node->len - len;
541  temp_audio.writeArray(start, uprocessed_len);
542  }
543  //move to next pos
544  ++it;
545  return result_len;
546  }
547 
548  List<DataNode*> &list() {
549  return audio_list;
550  }
551 
555  template<typename T>
556  void postProcessSmoothTransition(int channels, float factor = 0.01, int remove=0){
557  if (remove>0){
558  for (int j=0;j<remove;j++){
559  DataNode* node = nullptr;
560  audio_list.pop_front(node);
561  if (node!=nullptr) delete node;
562  node = nullptr;
563  audio_list.pop_back(node);
564  if (node!=nullptr) delete node;
565  }
566  }
567 
568  // Remove popping noise
569  SmoothTransition<T> clean_start(channels, true, false, factor);
570  auto first = *list().begin();
571  if (first!=nullptr){
572  clean_start.convert(&(first->data[0]),first->len);
573  }
574 
575  SmoothTransition<T> clean_end(channels, false, true, factor);
576  auto last = * (--(list().end()));
577  if (last!=nullptr){
578  clean_end.convert(&(last->data[0]),last->len);
579  }
580  }
581 
582 
583 protected:
584  List<DataNode*> audio_list;
585  List<DataNode*>::Iterator it = audio_list.end();
586  size_t total_available=0;
587  int default_buffer_size=DEFAULT_BUFFER_SIZE;
588  bool alloc_failed = false;
589  RingBuffer<uint8_t> temp_audio{DEFAULT_BUFFER_SIZE};
590  bool is_loop = false;
591 
592 };
593 
594 #endif
595 
596 } // namespace audio_tools
Supports the subscription to audio change notifications.
Definition: AudioTypes.h:159
Supports changes to the sampling rate, bits and channels.
Definition: AudioTypes.h:139
virtual AudioInfo audioInfoOut()
provides the actual output AudioInfo: this is usually the same as audioInfo() unless we use a transfo...
Definition: AudioTypes.h:146
Base class for all Audio Streams. It support the boolean operator to test if the object is ready with...
Definition: BaseStream.h:113
virtual size_t readSilence(uint8_t *buffer, size_t length)
Source to generate silence: just sets the buffer to 0.
Definition: BaseStream.h:158
virtual void setAudioInfo(AudioInfo newInfo) override
Defines the input AudioInfo.
Definition: BaseStream.h:121
virtual void writeSilence(size_t len)
Writes len bytes of silence (=0).
Definition: BaseStream.h:150
virtual AudioInfo audioInfo() override
provides the actual input AudioInfo
Definition: BaseStream.h:144
Shared functionality of all buffers.
Definition: Buffers.h:30
virtual int readArray(T data[], int len)
reads multiple values
Definition: Buffers.h:41
virtual int writeArray(const T data[], int len)
Fills the buffer data.
Definition: Buffers.h:65
Base class for all Streams. It relies on write(const uint8_t *buffer, size_t size) and readBytes(uint...
Definition: BaseStream.h:34
Provides data from a concatenation of Streams. Please note that the provided Streams can be played on...
Definition: BaseStream.h:186
size_t write(const uint8_t *data, size_t size) override
not supported
Definition: BaseStream.h:228
bool moveToNextStreamOnEnd()
Definition: BaseStream.h:240
MemoryStream which is written and read using the internal RAM. For each write the data is allocated o...
Definition: BaseStream.h:424
void postProcessSmoothTransition(int channels, float factor=0.01, int remove=0)
Post processing after the recording. We add a smooth transition at the beginning and at the end.
Definition: BaseStream.h:556
virtual bool begin()
Intializes the processing.
Definition: BaseStream.h:445
virtual void setLoop(bool loop)
Automatically rewinds to the beginning when reaching the end.
Definition: BaseStream.h:456
void rewind()
Sets the read position to the beginning.
Definition: BaseStream.h:481
Definition: List.h:26
Double linked list.
Definition: List.h:18
A lock free N buffer. If count=2 we create a DoubleBuffer, if count=3 a TripleBuffer etc.
Definition: Buffers.h:563
The Arduino Stream which provides silence and simulates a null device when used as audio target or au...
Definition: BaseStream.h:281
Stream class which stores the data in a temporary queue buffer. The queue can be consumed e....
Definition: BaseStream.h:300
QueueStream(BaseBuffer< T > &buffer)
Create stream from any BaseBuffer subclass.
Definition: BaseStream.h:310
virtual bool begin()
Activates the output.
Definition: BaseStream.h:322
virtual bool begin(size_t activeWhenPercentFilled)
Activate only when filled buffer reached %.
Definition: BaseStream.h:329
void clear()
Clears the data in the buffer.
Definition: BaseStream.h:379
QueueStream(int bufferSize, int bufferCount, bool autoRemoveOldestDataIfFull=false)
Default constructor.
Definition: BaseStream.h:303
virtual void end()
stops the processing
Definition: BaseStream.h:338
virtual T read()
reads a single value
Definition: Buffers.h:309
virtual int available()
provides the number of entries that are available to read
Definition: Buffers.h:366
virtual bool write(T data)
write add an entry to the buffer
Definition: Buffers.h:347
virtual void reset()
clears the buffer
Definition: Buffers.h:359
virtual T * address()
returns the address of the start of the physical read buffer
Definition: Buffers.h:372
virtual size_t size()
Returns the maximum capacity of the buffer.
Definition: Buffers.h:383
virtual bool isFull()
checks if the buffer is full
Definition: Buffers.h:342
virtual T peek()
peeks the actual entry from the buffer
Definition: Buffers.h:320
Changes the samples at the beginning or at the end to slowly ramp up the volume.
Definition: BaseConverter.h:1738
Definition: NoArduino.h:125
Vector implementation which provides the most important methods as defined by std::vector....
Definition: Vector.h:21
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition: AudioConfig.h:872
Basic Audio information which drives e.g. I2S.
Definition: AudioTypes.h:52
Definition: BaseStream.h:404
DataNode(void *inData, int len)
Constructor.
Definition: BaseStream.h:410