arduino-audio-tools
StreamCopy.h
1 #pragma once
2 
3 #include "AudioConfig.h"
4 #include "AudioTools/AudioTypes.h"
5 #include "AudioTools/Buffers.h"
6 #include "AudioTools/BaseConverter.h"
7 #include "AudioTools/AudioLogger.h"
8 #include "AudioTools/AudioStreams.h"
9 
10 #define NOT_ENOUGH_MEMORY_MSG "Could not allocate enough memory: %d bytes"
11 
12 namespace audio_tools {
13 
22 template <class T>
23 class StreamCopyT {
24  public:
25  StreamCopyT() = default;
26 
27  StreamCopyT(Print &to, AudioStream &from, int buffer_size=DEFAULT_BUFFER_SIZE){
28  TRACED();
29  begin(to, from);
30  resize(buffer_size);
31  if (!buffer){
32  LOGE(NOT_ENOUGH_MEMORY_MSG, buffer_size);
33  }
34  }
35 
36  StreamCopyT(Print &to, Stream &from, int buffer_size=DEFAULT_BUFFER_SIZE){
37  TRACED();
38  begin(to, from);
39  resize(buffer_size);
40  if (!buffer){
41  LOGE(NOT_ENOUGH_MEMORY_MSG, buffer_size);
42  }
43  }
44 
45  StreamCopyT(int buffer_size=DEFAULT_BUFFER_SIZE){
46  TRACED();
47  resize(buffer_size);
48  if (!buffer){
49  LOGE(NOT_ENOUGH_MEMORY_MSG, buffer_size);
50  }
51  }
52 
54  void begin(){
55  is_first = true;
56  LOGI("buffer_size=%d",buffer_size);
57  }
58 
60  void end() {
61  this->from = nullptr;
62  this->to = nullptr;
63  }
64 
66  void begin(Print &to, Stream &from){
67  this->from = new AudioStreamWrapper(from);
68  this->to = &to;
69  is_first = true;
70  LOGI("buffer_size=%d",buffer_size);
71  }
72 
74  void begin(Print &to, AudioStream &from){
75  this->from = &from;
76  this->to = &to;
77  is_first = true;
78  LOGI("buffer_size=%d",buffer_size);
79  }
80 
83  return from;
84  }
85 
87  Print *getTo() {
88  return to;
89  }
90 
92  inline size_t copy(){
93  LOGD("copy %s", log_name);
94  if (!active) return 0;
95  // if not initialized we do nothing
96  if (from==nullptr && to==nullptr) return 0;
97 
98  // synchronize AudioInfo
99  syncAudioInfo();
100 
101  // E.g. if we try to write to a server we might not have any output destination yet
102  int to_write = to->availableForWrite();
103  if (check_available_for_write && to_write==0){
104  delay(500);
105  return 0;
106  }
107 
108  size_t result = 0;
109  size_t delayCount = 0;
110  size_t len = buffer_size;
111  if (check_available) {
112  len = available();
113  }
114  size_t bytes_to_read = buffer_size;
115  size_t bytes_read = 0;
116 
117  if (len > 0){
118  bytes_to_read = min(len, static_cast<size_t>(buffer_size));
119  // don't overflow buffer
120  if (to_write > 0){
121  bytes_to_read = min((int)bytes_to_read, to_write);
122  }
123 
124  // round to full frames
125  int copy_size = minCopySize();
126  if (copy_size > 0){
127  size_t samples = bytes_to_read / minCopySize();
128  bytes_to_read = samples * minCopySize();
129  }
130 
131  // get the data now
132  bytes_read = 0;
133  if (bytes_to_read>0){
134  bytes_read = from->readBytes((uint8_t*)&buffer[0], bytes_to_read);
135  }
136 
137  // determine mime
138  notifyMime(buffer.data(), bytes_to_read);
139 
140  // write data
141  result = write(bytes_read, delayCount);
142 
143  // callback with unconverted data
144  if (onWrite!=nullptr) onWrite(onWriteObj, &buffer[0], result);
145 
146  #ifndef COPY_LOG_OFF
147  LOGI("StreamCopy::copy %s %u -> %u -> %u bytes - in %u hops",log_name, (unsigned int)bytes_to_read,(unsigned int) bytes_read, (unsigned int)result, (unsigned int)delayCount);
148  #endif
149  //TRACED();
150 
151  if (result == 0){
152  TRACED();
153  // give the processor some time
154  delay(delay_on_no_data);
155  }
156 
157  //TRACED();
158  CHECK_MEMORY();
159  } else {
160  // give the processor some time
161  delay(delay_on_no_data);
162  LOGD("no data %s", log_name);
163  }
164  //TRACED();
165  return result;
166  }
167 
168 
170  int available() {
171  int result = 0;
172  if (from!=nullptr) {
173  if (availableCallback!=nullptr){
174  result = availableCallback((Stream*)from);
175  } else {
176  result = from->available();
177  }
178  } else {
179  LOGW("source not defined");
180  }
181  LOGD("available: %d", result);
182  return result;
183  }
184 
186  void setDelayOnNoData(int delayMs){
187  delay_on_no_data = delayMs;
188  }
189 
191  size_t copyN(size_t pages){
192  if (!active) return 0;
193  size_t total=0;
194  for (size_t j=0;j<pages;j++){
195  total+=copy();
196  }
197  return total;
198  }
199 
201  size_t copyMs(size_t millis, AudioInfo info){
202  if (!active) return 0;
203  size_t pages = AudioTime::toBytes(millis, info) / buffer_size;
204  return copyN(pages);
205  }
206 
208  size_t copyAll(int retryCount=5, int retryWaitMs=200){
209  TRACED();
210  if (!active) return 0;
211  size_t result = 0;
212  int retry = 0;
213 
214  if (from==nullptr || to == nullptr)
215  return result;
216 
217  // copy while source has data available
218  int count=0;
219  while (true){
220  count = copy();
221  result += count;
222  if (count==0){
223  // wait for more data
224  retry++;
225  delay(retryWaitMs);
226  } else {
227  retry = 0; // after we got new data we restart the counting
228  }
229  // stop the processing if we passed the retry limit
230  if (retry>retryCount){
231  break;
232  }
233  }
234  return result;
235  }
236 
238  const char* mime() {
239  return actual_mime;
240  }
241 
243  void setMimeCallback(void (*callback)(const char*)){
244  TRACED();
245  this->notifyMimeCallback = callback;
246  }
247 
249  void setCallbackOnWrite(void (*onWrite)(void*obj, void*buffer, size_t len), void* obj){
250  TRACED();
251  this->onWrite = onWrite;
252  this->onWriteObj = obj;
253  }
254 
256  void setAvailableCallback(int (*callback)(Stream*stream)){
257  availableCallback = callback;
258  }
259 
261  void setRetry(int retry){
262  retryLimit = retry;
263  }
264 
266  int bufferSize() {
267  return buffer_size;
268  }
269 
271  void setCheckAvailableForWrite(bool flag){
272  check_available_for_write = flag;
273  }
274 
277  return check_available_for_write;
278  }
279 
281  void setCheckAvailable(bool flag){
282  check_available = flag;
283  }
284 
287  return check_available;
288  }
289 
291  void resize(int len){
292  buffer_size = len;
293  buffer.resize(buffer_size);
294  }
295 
297  void setActive(bool flag){
298  active = flag;
299  }
300 
302  bool isActive(){
303  return active;
304  }
305 
307  void setLogName(const char* name){
308  log_name = name;
309  }
310 
312  void setRetryDelay(int delay){
313  retry_delay = delay;
314  }
315 
317  int minCopySize() {
318  if (min_copy_size==0){
319  AudioInfo info = from->audioInfoOut();
320  min_copy_size = info.bits_per_sample / 8 * info.channels;
321  }
322  return min_copy_size;
323  }
324 
326  void setMinCopySize(int size){
327  min_copy_size = size;
328  }
329 
331  void setSynchAudioInfo(bool active){
332  is_sync_audio_info = active;
333  }
334 
335  protected:
336  AudioStream *from = nullptr;
337  Print *to = nullptr;
338  Vector<uint8_t> buffer{0};
339  int buffer_size;
340  void (*onWrite)(void*obj, void*buffer, size_t len) = nullptr;
341  void (*notifyMimeCallback)(const char*mime) = nullptr;
342  int (*availableCallback)(Stream*stream)=nullptr;
343  void *onWriteObj = nullptr;
344  bool is_first = false;
345  bool check_available_for_write = false;
346  bool check_available = true;
347  const char* actual_mime = nullptr;
348  int retryLimit = COPY_RETRY_LIMIT;
349  int delay_on_no_data = COPY_DELAY_ON_NODATA;
350  bool active = true;
351  const char* log_name = "";
352  int retry_delay = 10;
353  int channels = 0;
354  int min_copy_size = 1;
355  bool is_sync_audio_info = false;
356  AudioInfoSupport *p_audio_info_support = nullptr;
357 
358  void syncAudioInfo(){
359  // synchronize audio info
360  if (is_sync_audio_info && from != nullptr && p_audio_info_support != nullptr){
361  AudioInfo info_from = from->audioInfoOut();
362  AudioInfo info_to = p_audio_info_support->audioInfo();
363  if (info_from != info_to){
364  LOGI("--> StreamCopy: ");
365  p_audio_info_support->setAudioInfo(info_from);
366  }
367  }
368  }
369 
371  size_t write(size_t len, size_t &delayCount ){
372  if (!buffer || len==0) return 0;
373  LOGD("write: %d", (int)len);
374  size_t total = 0;
375  long open = len;
376  int retry = 0;
377  while(open > 0){
378  size_t written = to->write((const uint8_t*)buffer.data()+total, open);
379  LOGD("write: %d -> %d", (int) open, (int) written);
380  total += written;
381  open -= written;
382  delayCount++;
383 
384  if (open > 0){
385  // if we still have progress we reset the retry counter
386  if (written>0) retry = 0;
387 
388  // abort if we reached the retry limit
389  if (retry++ > retryLimit){
390  LOGE("write %s to target has failed after %d retries! (%ld bytes)", log_name, retry, open);
391  break;
392  }
393 
394  // wait a bit
395  if (retry>1) {
396  delay(retry_delay);
397  LOGI("try write %s - %d (open %ld bytes) ",log_name, retry, open);
398  }
399  }
400 
401  CHECK_MEMORY();
402  }
403  return total;
404  }
405 
407  void notifyMime(void* data, size_t len){
408  if (is_first && len>4) {
409  const uint8_t *start = (const uint8_t *) data;
410  actual_mime = "audio/basic";
411  if (start[0]==0xFF && start[1]==0xF1){
412  actual_mime = "audio/aac";
413  } else if (memcmp(start,"ID3",3) || start[0]==0xFF || start[0]==0xFE ){
414  actual_mime = "audio/mpeg";
415  } else if (memcmp(start,"RIFF",4)){
416  actual_mime = "audio/vnd.wave";
417  }
418  if (notifyMimeCallback!=nullptr){
419  notifyMimeCallback(actual_mime);
420  }
421  }
422  is_first = false;
423  }
424 
425 };
426 
433 class StreamCopy : public StreamCopyT<uint8_t> {
434  public:
435  StreamCopy(int buffer_size=DEFAULT_BUFFER_SIZE): StreamCopyT<uint8_t>(buffer_size) {
436  TRACED();
437  }
438 
439  StreamCopy(AudioStream &to, AudioStream &from, int buffer_size=DEFAULT_BUFFER_SIZE) : StreamCopyT<uint8_t>(to, from, buffer_size){
440  TRACED();
441  p_audio_info_support = &to;
442  }
443 
444  StreamCopy(AudioOutput &to, AudioStream &from, int buffer_size=DEFAULT_BUFFER_SIZE) : StreamCopyT<uint8_t>(to, from, buffer_size){
445  TRACED();
446  p_audio_info_support = &to;
447  }
448 
449  StreamCopy(Print &to, AudioStream &from, int buffer_size=DEFAULT_BUFFER_SIZE) : StreamCopyT<uint8_t>(to, from, buffer_size){
450  TRACED();
451  }
452 
453  StreamCopy(Print &to, Stream &from, int buffer_size=DEFAULT_BUFFER_SIZE) : StreamCopyT<uint8_t>(to, from, buffer_size){
454  TRACED();
455  }
456 
458  size_t copy(BaseConverter &converter) {
459  size_t result = available();
460  size_t delayCount = 0;
461  syncAudioInfo();
462  BaseConverter* coverter_ptr = &converter;
463  if (result>0){
464  size_t bytes_to_read = min(result, static_cast<size_t>(buffer_size) );
465  result = from->readBytes((uint8_t*)&buffer[0], bytes_to_read);
466 
467  // determine mime
468  notifyMime(buffer.data(), bytes_to_read);
469  is_first = false;
470 
471  // callback with unconverted data
472  if (onWrite!=nullptr) onWrite(onWriteObj, buffer.data(), result);
473 
474  // convert data
475  coverter_ptr->convert((uint8_t*)buffer.data(), result );
476  write(result, delayCount);
477  #ifndef COPY_LOG_OFF
478  LOGI("StreamCopy::copy %u bytes - in %u hops", (unsigned int)result,(unsigned int) delayCount);
479  #endif
480  } else {
481  // give the processor some time
482  delay(delay_on_no_data);
483  }
484  return result;
485  }
486 
488  size_t copy() {
490  }
491 
492 };
493 
494 } // Namespace
virtual AudioInfo audioInfoOut()
provides the actual output AudioInfo: this is usually the same as audioInfo() unless we use a transfo...
Definition: AudioTypes.h:143
Abstract Audio Ouptut class.
Definition: AudioOutput.h:22
Base class for all Audio Streams. It support the boolean operator to test if the object is ready with...
Definition: AudioStreams.h:24
To be used to support implementations where the readBytes is not virtual.
Definition: AudioStreams.h:91
static uint32_t toBytes(uint32_t millis, AudioInfo info)
converts milliseconds to bytes
Definition: AudioTypes.h:272
Abstract Base class for Converters A converter is processing the data in the indicated array.
Definition: BaseConverter.h:24
Definition: NoArduino.h:58
We provide the typeless StreamCopy as a subclass of StreamCopyT.
Definition: StreamCopy.h:433
size_t copy(BaseConverter &converter)
copies a buffer length of data and applies the converter
Definition: StreamCopy.h:458
size_t copy()
Copies all bytes from the input to the output.
Definition: StreamCopy.h:488
Typed Stream Copy which supports the conversion from channel to 2 channels. We make sure that we allw...
Definition: StreamCopy.h:23
void setCheckAvailable(bool flag)
Activates the check that we copy only if available returns a value.
Definition: StreamCopy.h:281
size_t copyMs(size_t millis, AudioInfo info)
Copies audio for the indicated number of milliseconds: note that the resolution is determined by the ...
Definition: StreamCopy.h:201
void setMinCopySize(int size)
Defines the minimum frame size that is used to round the copy size: 0 will automatically try to deter...
Definition: StreamCopy.h:326
int minCopySize()
Determine frame size.
Definition: StreamCopy.h:317
const char * mime()
Provides the actual mime type, that was determined from the first available data.
Definition: StreamCopy.h:238
size_t write(size_t len, size_t &delayCount)
blocking write - until everything is processed
Definition: StreamCopy.h:371
int available()
available bytes of the data source
Definition: StreamCopy.h:170
void resize(int len)
resizes the copy buffer
Definition: StreamCopy.h:291
void setRetry(int retry)
Defines the max number of retries.
Definition: StreamCopy.h:261
size_t copyN(size_t pages)
Copies pages * buffersize samples.
Definition: StreamCopy.h:191
void setCallbackOnWrite(void(*onWrite)(void *obj, void *buffer, size_t len), void *obj)
Defines a callback that is notified with the wirtten data.
Definition: StreamCopy.h:249
void setRetryDelay(int delay)
Defines the delay that is added before we retry an incomplete copy.
Definition: StreamCopy.h:312
void setSynchAudioInfo(bool active)
Activate the synchronization from the AudioInfo form the source to the target.
Definition: StreamCopy.h:331
void setDelayOnNoData(int delayMs)
Defines the dealy that is used if no data is available.
Definition: StreamCopy.h:186
Stream * getFrom()
Provides a pointer to the copy source. Can be used to check if the source is defined.
Definition: StreamCopy.h:82
size_t copyAll(int retryCount=5, int retryWaitMs=200)
copies all data - returns true if we copied anything
Definition: StreamCopy.h:208
void notifyMime(void *data, size_t len)
Update the mime type.
Definition: StreamCopy.h:407
bool isCheckAvailable()
Is Available check activated ?
Definition: StreamCopy.h:286
bool isActive()
Check if copier is active.
Definition: StreamCopy.h:302
void begin(Print &to, Stream &from)
assign a new output and input stream
Definition: StreamCopy.h:66
void end()
Ends the processing.
Definition: StreamCopy.h:60
void begin()
(Re)starts the processing
Definition: StreamCopy.h:54
int bufferSize()
Provides the buffer size.
Definition: StreamCopy.h:266
void setMimeCallback(void(*callback)(const char *))
Define the callback that will notify about mime changes.
Definition: StreamCopy.h:243
void setAvailableCallback(int(*callback)(Stream *stream))
Defines a callback that provides the available bytes at the source.
Definition: StreamCopy.h:256
Print * getTo()
Provides a pointer to the copy target. Can be used to check if the target is defined.
Definition: StreamCopy.h:87
void setActive(bool flag)
deactivate/activate copy - active by default
Definition: StreamCopy.h:297
bool isCheckAvailableForWrite()
Is Available for Write check activated ?
Definition: StreamCopy.h:276
size_t copy()
copies the data from the source to the destination - the result is in bytes
Definition: StreamCopy.h:92
void setCheckAvailableForWrite(bool flag)
Activates the check that we copy only if available for write returns a value.
Definition: StreamCopy.h:271
void begin(Print &to, AudioStream &from)
assign a new output and input stream
Definition: StreamCopy.h:74
void setLogName(const char *name)
Defines a name which will be printed in the log to identify the copier.
Definition: StreamCopy.h:307
Definition: NoArduino.h:125
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition: AnalogAudio.h:10
void delay(uint32_t ms)
Waits for the indicated milliseconds.
Definition: Millis.h:11
uint32_t millis()
Returns the milliseconds since the start.
Definition: Millis.h:18
Basic Audio information which drives e.g. I2S.
Definition: AudioTypes.h:50
uint16_t channels
Number of channels: 2=stereo, 1=mono.
Definition: AudioTypes.h:55
uint8_t bits_per_sample
Number of bits per sample (int16_t = 16 bits)
Definition: AudioTypes.h:57