arduino-audio-tools
StreamCopy.h
1 #pragma once
2 
3 #include "AudioConfig.h"
4 #include "AudioTools/CoreAudio/AudioTypes.h"
5 #include "AudioTools/CoreAudio/Buffers.h"
6 #include "AudioTools/CoreAudio/BaseConverter.h"
7 #include "AudioTools/CoreAudio/AudioLogger.h"
8 #include "AudioTools/CoreAudio/AudioStreams.h"
9 
10 #define NOT_ENOUGH_MEMORY_MSG "Could not allocate enough memory: %d bytes"
11 
12 namespace audio_tools {
13 
22 template <class T>
23 class StreamCopyT {
24  public:
25 
26  StreamCopyT(Print &to, AudioStream &from, int bufferSize=DEFAULT_BUFFER_SIZE){
27  TRACED();
28  this->buffer_size = bufferSize;
29  begin(to, from);
30  }
31 
32  StreamCopyT(Print &to, Stream &from, int bufferSize=DEFAULT_BUFFER_SIZE){
33  TRACED();
34  this->buffer_size = bufferSize;
35  begin(to, from);
36  }
37 
38  StreamCopyT(int bufferSize=DEFAULT_BUFFER_SIZE){
39  TRACED();
40  this->buffer_size = bufferSize;
41  begin();
42  }
43 
44  ~StreamCopyT() {
45  end();
46  }
47 
49  void begin(){
50  TRACED();
51  is_first = true;
52  resize(buffer_size);
53  if (buffer){
54  LOGI("buffer_size=%d",buffer_size);
55  } else {
56  LOGE(NOT_ENOUGH_MEMORY_MSG, buffer_size);
57  }
58  }
59 
61  void end() {
62  if (is_cleanup_from) {
63  delete this->from;
64  is_cleanup_from = false;
65  }
66  this->from = nullptr;
67  this->to = nullptr;
68  }
69 
71  void begin(Print &to, Stream &from){
72  is_cleanup_from = true;
73  this->from = new AudioStreamWrapper(from);
74  this->to = &to;
75  begin();
76  }
77 
79  void begin(Print &to, AudioStream &from){
80  this->from = &from;
81  this->to = &to;
82  begin();
83  }
84 
87  return from;
88  }
89 
91  Print *getTo() {
92  return to;
93  }
94 
96  inline size_t copy() {
97  p_converter = nullptr;
98  return copyBytes(buffer_size);
99  }
100 
102  inline size_t copy(BaseConverter &converter) {
103  p_converter = &converter;
104  return copyBytes(buffer_size);
105  }
106 
108  inline size_t copyBytes(size_t bytes){
109  LOGD("copy %d bytes %s", (int) bytes, log_name);
110  if (!active) return 0;
111  // if not initialized we do nothing
112  if (from==nullptr && to==nullptr) return 0;
113 
114  // synchronize AudioInfo
115  syncAudioInfo();
116 
117  // E.g. if we try to write to a server we might not have any output destination yet
118  int to_write = to->availableForWrite();
119  if (check_available_for_write && to_write==0){
120  delay(500);
121  return 0;
122  }
123 
124  // resize copy buffer if necessary
125  if (buffer.size() < bytes){
126  LOGI("Resize to %d", (int) bytes);
127  buffer.resize(bytes);
128  }
129 
130  size_t result = 0;
131  size_t delayCount = 0;
132  size_t len = bytes;
133  if (check_available) {
134  len = available();
135  }
136  size_t bytes_to_read = bytes;
137  size_t bytes_read = 0;
138 
139  if (len > 0){
140  bytes_to_read = min(len, static_cast<size_t>(buffer_size));
141  // don't overflow buffer
142  if (to_write > 0){
143  bytes_to_read = min((int)bytes_to_read, to_write);
144  }
145 
146  // round to full frames
147  int copy_size = minCopySize();
148  if (copy_size > 0){
149  size_t samples = bytes_to_read / minCopySize();
150  bytes_to_read = samples * minCopySize();
151  }
152 
153  // get the data now
154  bytes_read = 0;
155  if (bytes_to_read>0){
156  bytes_read = from->readBytes((uint8_t*)&buffer[0], bytes_to_read);
157  }
158 
159  // determine mime
160  notifyMime(buffer.data(), bytes_to_read);
161 
162  // convert data
163  if (p_converter!=nullptr) p_converter->convert((uint8_t*)buffer.data(), result );
164 
165  // write data
166  result = write(bytes_read, delayCount);
167 
168  // callback with unconverted data
169  if (onWrite!=nullptr) onWrite(onWriteObj, &buffer[0], result);
170 
171  #ifndef COPY_LOG_OFF
172  LOGI("StreamCopy::copy %s %u -> %u -> %u bytes - in %u hops",log_name, (unsigned int)bytes_to_read,(unsigned int) bytes_read, (unsigned int)result, (unsigned int)delayCount);
173  #endif
174  //TRACED();
175 
176  if (result == 0){
177  TRACED();
178  // give the processor some time
179  delay(delay_on_no_data);
180  }
181 
182  //TRACED();
183  CHECK_MEMORY();
184  } else {
185  // give the processor some time
186  delay(delay_on_no_data);
187  LOGD("no data %s", log_name);
188  }
189  //TRACED();
190  return result;
191  }
192 
194  size_t copyN(size_t pages){
195  if (!active) return 0;
196  size_t total=0;
197  for (size_t j=0;j<pages;j++){
198  total+=copy();
199  }
200  return total;
201  }
202 
204  size_t copyMs(size_t millis, AudioInfo info){
205  if (!active) return 0;
206  size_t pages = AudioTime::toBytes(millis, info) / buffer_size;
207  return copyN(pages);
208  }
209 
211  size_t copyAll(int retryCount=5, int retryWaitMs=200){
212  TRACED();
213  if (!active) return 0;
214  size_t result = 0;
215  int retry = 0;
216 
217  if (from==nullptr || to == nullptr)
218  return result;
219 
220  // copy while source has data available
221  int count=0;
222  while (true){
223  count = copy();
224  result += count;
225  if (count==0){
226  // wait for more data
227  retry++;
228  delay(retryWaitMs);
229  } else {
230  retry = 0; // after we got new data we restart the counting
231  }
232  // stop the processing if we passed the retry limit
233  if (retry>retryCount){
234  break;
235  }
236  }
237  return result;
238  }
239 
241  int available() {
242  int result = 0;
243  if (from!=nullptr) {
244  if (availableCallback!=nullptr){
245  result = availableCallback((Stream*)from);
246  } else {
247  result = from->available();
248  }
249  } else {
250  LOGW("source not defined");
251  }
252  LOGD("available: %d", result);
253  return result;
254  }
255 
257  void setDelayOnNoData(int delayMs){
258  delay_on_no_data = delayMs;
259  }
260 
262  const char* mime() {
263  return actual_mime;
264  }
265 
267  void setMimeCallback(void (*callback)(const char*)){
268  TRACED();
269  this->notifyMimeCallback = callback;
270  }
271 
273  void setCallbackOnWrite(void (*onWrite)(void*obj, void*buffer, size_t len), void* obj){
274  TRACED();
275  this->onWrite = onWrite;
276  this->onWriteObj = obj;
277  }
278 
280  void setAvailableCallback(int (*callback)(Stream*stream)){
281  availableCallback = callback;
282  }
283 
285  void setRetry(int retry){
286  retryLimit = retry;
287  }
288 
290  int bufferSize() {
291  return buffer_size;
292  }
293 
295  void setCheckAvailableForWrite(bool flag){
296  check_available_for_write = flag;
297  }
298 
301  return check_available_for_write;
302  }
303 
305  void setCheckAvailable(bool flag){
306  check_available = flag;
307  }
308 
311  return check_available;
312  }
313 
315  void resize(int len){
316  buffer_size = len;
317  buffer.resize(buffer_size);
318  }
319 
321  void setActive(bool flag){
322  active = flag;
323  }
324 
326  bool isActive(){
327  return active;
328  }
329 
331  void setLogName(const char* name){
332  log_name = name;
333  }
334 
336  void setRetryDelay(int delay){
337  retry_delay = delay;
338  }
339 
341  int minCopySize() {
342  if (min_copy_size==0){
343  AudioInfo info = from->audioInfoOut();
344  min_copy_size = info.bits_per_sample / 8 * info.channels;
345  }
346  return min_copy_size;
347  }
348 
350  void setMinCopySize(int size){
351  min_copy_size = size;
352  }
353 
355  void setSynchAudioInfo(bool active){
356  is_sync_audio_info = active;
357  }
358 
359  protected:
360  AudioStream *from = nullptr;
361  Print *to = nullptr;
362  Vector<uint8_t> buffer{0};
363  int buffer_size = DEFAULT_BUFFER_SIZE;
364  void (*onWrite)(void*obj, void*buffer, size_t len) = nullptr;
365  void (*notifyMimeCallback)(const char*mime) = nullptr;
366  int (*availableCallback)(Stream*stream)=nullptr;
367  void *onWriteObj = nullptr;
368  bool is_first = false;
369  bool is_cleanup_from = false;
370  bool check_available_for_write = false;
371  bool check_available = true;
372  const char* actual_mime = nullptr;
373  int retryLimit = COPY_RETRY_LIMIT;
374  int delay_on_no_data = COPY_DELAY_ON_NODATA;
375  bool active = true;
376  const char* log_name = "";
377  int retry_delay = 10;
378  int channels = 0;
379  int min_copy_size = 1;
380  bool is_sync_audio_info = false;
381  AudioInfoSupport *p_audio_info_support = nullptr;
382  BaseConverter* p_converter = nullptr;
383 
384 
385  void syncAudioInfo(){
386  // synchronize audio info
387  if (is_sync_audio_info && from != nullptr && p_audio_info_support != nullptr){
388  AudioInfo info_from = from->audioInfoOut();
389  AudioInfo info_to = p_audio_info_support->audioInfo();
390  if (info_from != info_to){
391  LOGI("--> StreamCopy: ");
392  p_audio_info_support->setAudioInfo(info_from);
393  }
394  }
395  }
396 
398  size_t write(size_t len, size_t &delayCount ){
399  if (!buffer || len==0) return 0;
400  LOGD("write: %d", (int)len);
401  size_t total = 0;
402  long open = len;
403  int retry = 0;
404  while(open > 0){
405  size_t written = to->write((const uint8_t*)buffer.data()+total, open);
406  LOGD("write: %d -> %d", (int) open, (int) written);
407  total += written;
408  open -= written;
409  delayCount++;
410 
411  if (open > 0){
412  // if we still have progress we reset the retry counter
413  if (written>0) retry = 0;
414 
415  // abort if we reached the retry limit
416  if (retry++ > retryLimit){
417  LOGE("write %s to target has failed after %d retries! (%ld bytes)", log_name, retry, open);
418  break;
419  }
420 
421  // wait a bit
422  if (retry>1) {
423  delay(retry_delay);
424  LOGI("try write %s - %d (open %ld bytes) ",log_name, retry, open);
425  }
426  }
427 
428  CHECK_MEMORY();
429  }
430  return total;
431  }
432 
434  void notifyMime(void* data, size_t len){
435  if (is_first && len>4) {
436  const uint8_t *start = (const uint8_t *) data;
437  actual_mime = "audio/basic";
438  if (start[0]==0xFF && start[1]==0xF1){
439  actual_mime = "audio/aac";
440  } else if (memcmp(start,"ID3",3) || start[0]==0xFF || start[0]==0xFE ){
441  actual_mime = "audio/mpeg";
442  } else if (memcmp(start,"RIFF",4)){
443  actual_mime = "audio/vnd.wave";
444  }
445  if (notifyMimeCallback!=nullptr){
446  notifyMimeCallback(actual_mime);
447  }
448  }
449  is_first = false;
450  }
451 
452 };
453 
461 
462 
463 } // Namespace
virtual AudioInfo audioInfoOut()
provides the actual output AudioInfo: this is usually the same as audioInfo() unless we use a transfo...
Definition: AudioTypes.h:146
Base class for all Audio Streams. It support the boolean operator to test if the object is ready with...
Definition: BaseStream.h:109
To be used to support implementations where the readBytes is not virtual.
Definition: AudioStreams.h:24
static uint32_t toBytes(uint32_t millis, AudioInfo info)
converts milliseconds to bytes
Definition: AudioTypes.h:269
Abstract Base class for Converters A converter is processing the data in the indicated array.
Definition: BaseConverter.h:24
Definition: NoArduino.h:58
Typed Stream Copy which supports the conversion from channel to 2 channels. We make sure that we allw...
Definition: StreamCopy.h:23
void setCheckAvailable(bool flag)
Activates the check that we copy only if available returns a value.
Definition: StreamCopy.h:305
size_t copyMs(size_t millis, AudioInfo info)
Copies audio for the indicated number of milliseconds: note that the resolution is determined by the ...
Definition: StreamCopy.h:204
void setMinCopySize(int size)
Defines the minimum frame size that is used to round the copy size: 0 will automatically try to deter...
Definition: StreamCopy.h:350
int minCopySize()
Determine frame size.
Definition: StreamCopy.h:341
size_t copy(BaseConverter &converter)
copies the data from the source to the destination and applies the converter - the result is the proc...
Definition: StreamCopy.h:102
const char * mime()
Provides the actual mime type, that was determined from the first available data.
Definition: StreamCopy.h:262
size_t copyBytes(size_t bytes)
copies the inicated number of bytes from the source to the destination and returns the processed numb...
Definition: StreamCopy.h:108
size_t write(size_t len, size_t &delayCount)
blocking write - until everything is processed
Definition: StreamCopy.h:398
int available()
available bytes of the data source
Definition: StreamCopy.h:241
void resize(int len)
resizes the copy buffer
Definition: StreamCopy.h:315
void setRetry(int retry)
Defines the max number of retries.
Definition: StreamCopy.h:285
size_t copyN(size_t pages)
Copies pages * buffersize samples: returns the processed number of bytes.
Definition: StreamCopy.h:194
void setCallbackOnWrite(void(*onWrite)(void *obj, void *buffer, size_t len), void *obj)
Defines a callback that is notified with the wirtten data.
Definition: StreamCopy.h:273
void setRetryDelay(int delay)
Defines the delay that is added before we retry an incomplete copy.
Definition: StreamCopy.h:336
void setSynchAudioInfo(bool active)
Activate the synchronization from the AudioInfo form the source to the target.
Definition: StreamCopy.h:355
void setDelayOnNoData(int delayMs)
Defines the dealy that is used if no data is available.
Definition: StreamCopy.h:257
Stream * getFrom()
Provides a pointer to the copy source. Can be used to check if the source is defined.
Definition: StreamCopy.h:86
size_t copyAll(int retryCount=5, int retryWaitMs=200)
copies all data - returns the number of processed bytes
Definition: StreamCopy.h:211
void notifyMime(void *data, size_t len)
Update the mime type.
Definition: StreamCopy.h:434
bool isCheckAvailable()
Is Available check activated ?
Definition: StreamCopy.h:310
bool isActive()
Check if copier is active.
Definition: StreamCopy.h:326
void begin(Print &to, Stream &from)
assign a new output and input stream
Definition: StreamCopy.h:71
void end()
Ends the processing.
Definition: StreamCopy.h:61
void begin()
(Re)starts the processing
Definition: StreamCopy.h:49
int bufferSize()
Provides the buffer size.
Definition: StreamCopy.h:290
void setMimeCallback(void(*callback)(const char *))
Define the callback that will notify about mime changes.
Definition: StreamCopy.h:267
void setAvailableCallback(int(*callback)(Stream *stream))
Defines a callback that provides the available bytes at the source.
Definition: StreamCopy.h:280
Print * getTo()
Provides a pointer to the copy target. Can be used to check if the target is defined.
Definition: StreamCopy.h:91
void setActive(bool flag)
deactivate/activate copy - active by default
Definition: StreamCopy.h:321
bool isCheckAvailableForWrite()
Is Available for Write check activated ?
Definition: StreamCopy.h:300
size_t copy()
copies the data from the source to the destination and returns the processed number of bytes
Definition: StreamCopy.h:96
void setCheckAvailableForWrite(bool flag)
Activates the check that we copy only if available for write returns a value.
Definition: StreamCopy.h:295
void begin(Print &to, AudioStream &from)
assign a new output and input stream
Definition: StreamCopy.h:79
void setLogName(const char *name)
Defines a name which will be printed in the log to identify the copier.
Definition: StreamCopy.h:331
Definition: NoArduino.h:125
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition: AudioConfig.h:868
uint32_t millis()
Returns the milliseconds since the start.
Definition: Time.h:12
Basic Audio information which drives e.g. I2S.
Definition: AudioTypes.h:52
uint16_t channels
Number of channels: 2=stereo, 1=mono.
Definition: AudioTypes.h:57
uint8_t bits_per_sample
Number of bits per sample (int16_t = 16 bits)
Definition: AudioTypes.h:59