arduino-audio-tools
BaseStream.h
1 #pragma once
2 #include "AudioTools/CoreAudio/Buffers.h"
3 
4 #ifdef ARDUINO
5 #include "Stream.h"
6 #endif
7 
8 #ifdef USE_STREAM_WRITE_OVERRIDE
9 #define STREAM_WRITE_OVERRIDE override
10 #else
11 #define STREAM_WRITE_OVERRIDE
12 #endif
13 
14 #ifdef USE_STREAM_READ_OVERRIDE
15 #define STREAM_READ_OVERRIDE override
16 #else
17 #define STREAM_READ_OVERRIDE
18 #endif
19 
20 #ifdef USE_STREAM_READCHAR_OVERRIDE
21 #define STREAM_READCHAR_OVERRIDE override
22 #else
23 #define STREAM_READCHAR_OVERRIDE
24 #endif
25 
26 namespace audio_tools {
27 
34 class BaseStream : public Stream {
35  public:
36  BaseStream() = default;
37  virtual ~BaseStream() = default;
38  BaseStream(BaseStream const &) = delete;
39  BaseStream &operator=(BaseStream const &) = delete;
40 
41  virtual bool begin(){return true;}
42  virtual void end(){}
43 
44  virtual size_t readBytes(uint8_t *data,
45  size_t len) STREAM_READ_OVERRIDE = 0;
46  virtual size_t write(const uint8_t *data, size_t len) override = 0;
47 
48  virtual size_t write(uint8_t ch) override {
49  tmp_out.resize(MAX_SINGLE_CHARS);
50  if (tmp_out.isFull()) {
51  flush();
52  }
53  return tmp_out.write(ch);
54  }
55 
56  virtual int available() override { return DEFAULT_BUFFER_SIZE; };
57 
58  virtual int availableForWrite() override { return DEFAULT_BUFFER_SIZE; }
59 
60  virtual void flush() override {
61  if (tmp_out.available() > 0) {
62  write((const uint8_t *)tmp_out.address(), tmp_out.available());
63  }
64  }
65 
66 // Methods which should be suppressed in the documentation
67 #ifndef DOXYGEN
68 
69  virtual size_t readBytes(char *data, size_t len) STREAM_READCHAR_OVERRIDE {
70  return readBytes((uint8_t *)data, len);
71  }
72 
73  virtual int read() override {
74  refillReadBuffer();
75  return tmp_in.read();
76  }
77 
78  virtual int peek() override {
79  refillReadBuffer();
80  return tmp_in.peek();
81  }
82 
83 #endif
84 
85  protected:
86  RingBuffer<uint8_t> tmp_in{0};
87  RingBuffer<uint8_t> tmp_out{0};
88 
89  void refillReadBuffer() {
90  tmp_in.resize(MAX_SINGLE_CHARS);
91  if (tmp_in.isEmpty()) {
92  TRACED();
93  const int len = tmp_in.size();
94  uint8_t bytes[len];
95  int len_eff = readBytes(bytes, len);
96  // LOGD("tmp_in available: %d / size: %d / to be written %d",
97  // tmp_in.available(), tmp_in.size(), len_eff);
98  tmp_in.writeArray(bytes, len_eff);
99  }
100  }
101 };
102 
109 class AudioStream : public BaseStream, public AudioInfoSupport, public AudioInfoSource {
110  public:
111  AudioStream() = default;
112  virtual ~AudioStream() = default;
113  AudioStream(AudioStream const&) = delete;
114  AudioStream& operator=(AudioStream const&) = delete;
115 
116  // Call from subclass or overwrite to do something useful
117  virtual void setAudioInfo(AudioInfo newInfo) override {
118  TRACED();
119 
120  if (info != newInfo){
121  info = newInfo;
122  info.logInfo("in:");
123  }
124  // replicate information
125  AudioInfo out_new = audioInfoOut();
126  if (out_new) {
127  out_new.logInfo("out:");
128  notifyAudioChange(out_new);
129  }
130 
131  }
132 
133  virtual size_t readBytes(uint8_t *data, size_t len) override { return not_supported(0, "readBytes"); }
134 
135  virtual size_t write(const uint8_t *data, size_t len) override{ return not_supported(0,"write"); }
136 
137 
138  virtual operator bool() { return info && available() > 0; }
139 
140  virtual AudioInfo audioInfo() override {
141  return info;
142  }
143 
144 
146  virtual void writeSilence(size_t len){
147  int16_t zero = 0;
148  for (int j=0;j<len/2;j++){
149  write((uint8_t*)&zero,2);
150  }
151  }
152 
154  virtual size_t readSilence(uint8_t *buffer, size_t length) {
155  memset(buffer, 0, length);
156  return length;
157  }
158 
159  protected:
160  AudioInfo info;
161 
162  virtual int not_supported(int out, const char *msg = "") {
163  LOGE("AudioStream: %s unsupported operation!", msg);
164  // trigger stacktrace
165  assert(false);
166  return out;
167  }
168 
169 };
170 
171 
182 class CatStream : public BaseStream {
183  public:
184  CatStream() = default;
185 
186  void add(Stream *stream) { input_streams.push_back(stream); }
187  void add(Stream &stream) { input_streams.push_back(&stream); }
188 
189  bool begin() {
190  is_active = true;
191  return true;
192  }
193 
194  void end() { is_active = false; }
195 
196  int available() override {
197  if (!is_active) return 0;
198  if (!moveToNextStreamOnEnd()) {
199  return 0;
200  }
201  return availableWithTimout();
202  }
203 
204  size_t readBytes(uint8_t *data, size_t len) override {
205  if (!is_active) return 0;
206  if (!moveToNextStreamOnEnd()) {
207  return 0;
208  }
209  return p_current_stream->readBytes(data, len);
210  }
211 
213  operator bool() { return is_active && available() > 0; }
214 
215  void setOnBeginCallback(void (*callback)(Stream *stream)) {
216  begin_callback = callback;
217  }
218  void setOnEndCallback(void (*callback)(Stream *stream)) {
219  end_callback = callback;
220  }
221  void setTimeout(uint32_t t) { _timeout = t; }
222 
224  size_t write(const uint8_t *data, size_t size) override { return 0;};
225 
226  protected:
227  Vector<Stream *> input_streams;
228  Stream *p_current_stream = nullptr;
229  bool is_active = false;
230  void (*begin_callback)(Stream *stream) = nullptr;
231  void (*end_callback)(Stream *stream) = nullptr;
232  uint_fast32_t _timeout = 0;
233 
237  // keep on running
238  if (p_current_stream != nullptr && p_current_stream->available() > 0)
239  return true;
240  // at end?
241  if ((p_current_stream == nullptr || availableWithTimout() == 0)) {
242  if (end_callback && p_current_stream) end_callback(p_current_stream);
243  if (!input_streams.empty()) {
244  LOGI("using next stream");
245  p_current_stream = input_streams[0];
246  input_streams.pop_front();
247  if (begin_callback && p_current_stream)
248  begin_callback(p_current_stream);
249  } else {
250  p_current_stream = nullptr;
251  }
252  }
253  // returns true if we have a valid stream
254  return p_current_stream != nullptr;
255  }
256 
257  int availableWithTimout() {
258  int result = p_current_stream->available();
259  if (result == 0) {
260  for (int j = 0; j < _timeout / 10; j++) {
261  delay(10);
262  result = p_current_stream->available();
263  if (result != 0) break;
264  }
265  }
266  return result;
267  }
268 };
269 
277 class NullStream : public BaseStream {
278  public:
279  size_t write(const uint8_t *data, size_t len) override { return len; }
280 
281  size_t readBytes(uint8_t *data, size_t len) override {
282  memset(data, 0, len);
283  return len;
284  }
285 };
286 
287 
295 template <class T>
296 class QueueStream : public BaseStream {
297  public:
299  QueueStream(int bufferSize, int bufferCount,
300  bool autoRemoveOldestDataIfFull = false) {
301  owns_buffer = true;
302  callback_buffer_ptr = new NBuffer<T>(bufferSize, bufferCount);
303  remove_oldest_data = autoRemoveOldestDataIfFull;
304  }
307  owns_buffer = false;
308  callback_buffer_ptr = &buffer;
309  }
310 
311  virtual ~QueueStream() {
312  if (owns_buffer) {
313  delete callback_buffer_ptr;
314  }
315  }
316 
318  virtual bool begin() {
319  TRACED();
320  active = true;
321  return true;
322  }
323 
325  virtual bool begin(size_t activeWhenPercentFilled) {
326  // determine total buffer size in bytes
327  size_t size = callback_buffer_ptr->size() * sizeof(T);
328  // calculate limit
329  active_limit = size * activeWhenPercentFilled / 100;
330  return true;
331  }
332 
334  virtual void end() {
335  TRACED();
336  active = false;
337  };
338 
339  int available() override {
340  return active ? callback_buffer_ptr->available() * sizeof(T) : 0;
341  }
342 
343  int availableForWrite() override {
344  return callback_buffer_ptr->availableForWrite() * sizeof(T);
345  }
346 
347  virtual size_t write(const uint8_t *data, size_t len) override {
348  if (active_limit == 0 && !active) return 0;
349 
350  // activate automaticaly when limit has been reached
351  if (active_limit > 0 && !active && available() >= active_limit) {
352  this->active = true;
353  }
354 
355  // make space by deleting oldest entries
356  if (remove_oldest_data) {
357  int available_bytes =
358  callback_buffer_ptr->availableForWrite() * sizeof(T);
359  if ((int)len > available_bytes) {
360  int gap = len - available_bytes;
361  uint8_t tmp[gap];
362  readBytes(tmp, gap);
363  }
364  }
365 
366  return callback_buffer_ptr->writeArray(data, len / sizeof(T));
367  }
368 
369  virtual size_t readBytes(uint8_t *data, size_t len) override {
370  if (!active) return 0;
371  return callback_buffer_ptr->readArray(data, len / sizeof(T));
372  }
373 
375  void clear() {
376  if (active) {
377  callback_buffer_ptr->reset();
378  }
379  }
380 
382  operator bool() { return active; }
383 
384  protected:
385  BaseBuffer<T> *callback_buffer_ptr;
386  size_t active_limit = 0;
387  bool active;
388  bool remove_oldest_data;
389  bool owns_buffer;
390 };
391 
392 #if USE_OBSOLETE
393 // support legacy name
394 template <typename T>
395 using CallbackBufferedStream = QueueStream<T>;
396 #endif
397 
398 #ifndef SWIG
399 
400 struct DataNode {
401  size_t len=0;
402  Vector<uint8_t> data{0};
403 
404  DataNode() = default;
406  DataNode(void*inData, int len){
407  this->len = len;
408  this->data.resize(len);
409  memcpy(&data[0], inData, len);
410  }
411 };
412 
421 public:
422 
423  DynamicMemoryStream() = default;
424 
425  DynamicMemoryStream(bool isLoop, int defaultBufferSize=DEFAULT_BUFFER_SIZE ) {
426  this->default_buffer_size = defaultBufferSize;
427  is_loop = isLoop;
428  }
429  // Assign values from ref, clearing the original ref
430  void assign(DynamicMemoryStream &ref){
431  audio_list.swap(ref.audio_list);
432  it = ref.it;
433  total_available=ref.total_available;
434  default_buffer_size = ref.default_buffer_size;
435  alloc_failed = ref.alloc_failed;;
436  is_loop = ref.is_loop;
437  ref.clear();
438  }
439 
441  virtual bool begin() {
442  clear();
443  temp_audio.resize(default_buffer_size);
444  return true;
445  }
446 
447  virtual void end() {
448  clear();
449  }
450 
452  virtual void setLoop(bool loop){
453  is_loop = loop;
454  }
455 
456  void clear() {
457  DataNode *p_node;
458  bool ok;
459  do{
460  ok = audio_list.pop_front(p_node);
461  if (ok){
462  delete p_node;
463  }
464  } while (ok);
465 
466  temp_audio.reset();
467  total_available = 0;
468  alloc_failed = false;
469  rewind();
470  }
471 
472  size_t size(){
473  return total_available;
474  }
475 
477  void rewind() {
478  it = audio_list.begin();
479  }
480 
481  virtual size_t write(const uint8_t *data, size_t len) override {
482  DataNode *p_node = new DataNode((void*)data, len);
483  if (p_node->data){
484  alloc_failed = false;
485  total_available += len;
486  audio_list.push_back(p_node);
487 
488  // setup interator to point to first record
489  if (it == audio_list.end()){
490  it = audio_list.begin();
491  }
492 
493  return len;
494  }
495  alloc_failed = true;
496  return 0;
497  }
498 
499  virtual int availableForWrite() override {
500  return alloc_failed ? 0 : default_buffer_size;
501  }
502 
503  virtual int available() override {
504  if (it == audio_list.end()){
505  if (is_loop) rewind();
506  if (it == audio_list.end()) {
507  return 0;
508  }
509  }
510  return (*it)->len;
511  }
512 
513  virtual size_t readBytes(uint8_t *data, size_t len) override {
514  // provide unprocessed data
515  if (temp_audio.available()>0){
516  return temp_audio.readArray(data, len);
517  }
518 
519  // We have no more data
520  if (it==audio_list.end()){
521  if (is_loop){
522  rewind();
523  } else {
524  // stop the processing
525  return 0;
526  }
527  }
528 
529  // provide data from next node
530  DataNode *p_node = *it;
531  int result_len = min(len, (size_t) p_node->len);
532  memcpy(data, &p_node->data[0], result_len);
533  // save unprocessed data to temp buffer
534  if (p_node->len>len){
535  uint8_t *start = &p_node->data[result_len];
536  int uprocessed_len = p_node->len - len;
537  temp_audio.writeArray(start, uprocessed_len);
538  }
539  //move to next pos
540  ++it;
541  return result_len;
542  }
543 
544  List<DataNode*> &list() {
545  return audio_list;
546  }
547 
551  template<typename T>
552  void postProcessSmoothTransition(int channels, float factor = 0.01, int remove=0){
553  if (remove>0){
554  for (int j=0;j<remove;j++){
555  DataNode* node = nullptr;
556  audio_list.pop_front(node);
557  if (node!=nullptr) delete node;
558  node = nullptr;
559  audio_list.pop_back(node);
560  if (node!=nullptr) delete node;
561  }
562  }
563 
564  // Remove popping noise
565  SmoothTransition<T> clean_start(channels, true, false, factor);
566  auto first = *list().begin();
567  if (first!=nullptr){
568  clean_start.convert(&(first->data[0]),first->len);
569  }
570 
571  SmoothTransition<T> clean_end(channels, false, true, factor);
572  auto last = * (--(list().end()));
573  if (last!=nullptr){
574  clean_end.convert(&(last->data[0]),last->len);
575  }
576  }
577 
578 
579 protected:
580  List<DataNode*> audio_list;
581  List<DataNode*>::Iterator it = audio_list.end();
582  size_t total_available=0;
583  int default_buffer_size=DEFAULT_BUFFER_SIZE;
584  bool alloc_failed = false;
585  RingBuffer<uint8_t> temp_audio{DEFAULT_BUFFER_SIZE};
586  bool is_loop = false;
587 
588 };
589 
590 #endif
591 
592 } // namespace audio_tools
Supports the subscription to audio change notifications.
Definition: AudioTypes.h:159
Supports changes to the sampling rate, bits and channels.
Definition: AudioTypes.h:139
virtual AudioInfo audioInfoOut()
provides the actual output AudioInfo: this is usually the same as audioInfo() unless we use a transfo...
Definition: AudioTypes.h:146
Base class for all Audio Streams. It support the boolean operator to test if the object is ready with...
Definition: BaseStream.h:109
virtual size_t readSilence(uint8_t *buffer, size_t length)
Source to generate silence: just sets the buffer to 0.
Definition: BaseStream.h:154
virtual void setAudioInfo(AudioInfo newInfo) override
Defines the input AudioInfo.
Definition: BaseStream.h:117
virtual void writeSilence(size_t len)
Writes len bytes of silence (=0).
Definition: BaseStream.h:146
virtual AudioInfo audioInfo() override
provides the actual input AudioInfo
Definition: BaseStream.h:140
Shared functionality of all buffers.
Definition: Buffers.h:30
virtual int readArray(T data[], int len)
reads multiple values
Definition: Buffers.h:41
virtual int writeArray(const T data[], int len)
Fills the buffer data.
Definition: Buffers.h:65
Base class for all Streams. It relies on write(const uint8_t *buffer, size_t size) and readBytes(uint...
Definition: BaseStream.h:34
Provides data from a concatenation of Streams. Please note that the provided Streams can be played on...
Definition: BaseStream.h:182
size_t write(const uint8_t *data, size_t size) override
not supported
Definition: BaseStream.h:224
bool moveToNextStreamOnEnd()
Definition: BaseStream.h:236
MemoryStream which is written and read using the internal RAM. For each write the data is allocated o...
Definition: BaseStream.h:420
void postProcessSmoothTransition(int channels, float factor=0.01, int remove=0)
Post processing after the recording. We add a smooth transition at the beginning and at the end.
Definition: BaseStream.h:552
virtual bool begin()
Intializes the processing.
Definition: BaseStream.h:441
virtual void setLoop(bool loop)
Automatically rewinds to the beginning when reaching the end.
Definition: BaseStream.h:452
void rewind()
Sets the read position to the beginning.
Definition: BaseStream.h:477
Definition: List.h:26
Double linked list.
Definition: List.h:18
A lock free N buffer. If count=2 we create a DoubleBuffer, if count=3 a TripleBuffer etc.
Definition: Buffers.h:563
The Arduino Stream which provides silence and simulates a null device when used as audio target or au...
Definition: BaseStream.h:277
Stream class which stores the data in a temporary queue buffer. The queue can be consumed e....
Definition: BaseStream.h:296
QueueStream(BaseBuffer< T > &buffer)
Create stream from any BaseBuffer subclass.
Definition: BaseStream.h:306
virtual bool begin()
Activates the output.
Definition: BaseStream.h:318
virtual bool begin(size_t activeWhenPercentFilled)
Activate only when filled buffer reached %.
Definition: BaseStream.h:325
void clear()
Clears the data in the buffer.
Definition: BaseStream.h:375
QueueStream(int bufferSize, int bufferCount, bool autoRemoveOldestDataIfFull=false)
Default constructor.
Definition: BaseStream.h:299
virtual void end()
stops the processing
Definition: BaseStream.h:334
virtual T read()
reads a single value
Definition: Buffers.h:309
virtual int available()
provides the number of entries that are available to read
Definition: Buffers.h:366
virtual bool write(T data)
write add an entry to the buffer
Definition: Buffers.h:347
virtual void reset()
clears the buffer
Definition: Buffers.h:359
virtual T * address()
returns the address of the start of the physical read buffer
Definition: Buffers.h:372
virtual size_t size()
Returns the maximum capacity of the buffer.
Definition: Buffers.h:383
virtual bool isFull()
checks if the buffer is full
Definition: Buffers.h:342
virtual T peek()
peeks the actual entry from the buffer
Definition: Buffers.h:320
Changes the samples at the beginning or at the end to slowly ramp up the volume.
Definition: BaseConverter.h:1738
Definition: NoArduino.h:125
Vector implementation which provides the most important methods as defined by std::vector....
Definition: Vector.h:21
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition: AudioConfig.h:823
Basic Audio information which drives e.g. I2S.
Definition: AudioTypes.h:52
Definition: BaseStream.h:400
DataNode(void *inData, int len)
Constructor.
Definition: BaseStream.h:406