arduino-audio-tools
StreamCopy.h
1 #pragma once
2 
3 #include "AudioConfig.h"
4 #include "AudioTools/CoreAudio/AudioTypes.h"
5 #include "AudioTools/CoreAudio/Buffers.h"
6 #include "AudioTools/CoreAudio/BaseConverter.h"
7 #include "AudioTools/CoreAudio/AudioLogger.h"
8 #include "AudioTools/CoreAudio/AudioStreams.h"
9 
10 #define NOT_ENOUGH_MEMORY_MSG "Could not allocate enough memory: %d bytes"
11 
12 namespace audio_tools {
13 
22 template <class T>
23 class StreamCopyT {
24  public:
25 
26  StreamCopyT(Print &to, AudioStream &from, int bufferSize=DEFAULT_BUFFER_SIZE){
27  TRACED();
28  this->buffer_size = bufferSize;
29  begin(to, from);
30  }
31 
32  StreamCopyT(Print &to, Stream &from, int bufferSize=DEFAULT_BUFFER_SIZE){
33  TRACED();
34  this->buffer_size = bufferSize;
35  begin(to, from);
36  }
37 
38  StreamCopyT(int bufferSize=DEFAULT_BUFFER_SIZE){
39  TRACED();
40  this->buffer_size = bufferSize;
41  begin();
42  }
43 
45  void begin(){
46  TRACED();
47  is_first = true;
48  resize(buffer_size);
49  if (buffer){
50  LOGI("buffer_size=%d",buffer_size);
51  } else {
52  LOGE(NOT_ENOUGH_MEMORY_MSG, buffer_size);
53  }
54  }
55 
57  void end() {
58  this->from = nullptr;
59  this->to = nullptr;
60  }
61 
63  void begin(Print &to, Stream &from){
64  this->from = new AudioStreamWrapper(from);
65  this->to = &to;
66  begin();
67  }
68 
70  void begin(Print &to, AudioStream &from){
71  this->from = &from;
72  this->to = &to;
73  begin();
74  }
75 
78  return from;
79  }
80 
82  Print *getTo() {
83  return to;
84  }
85 
87  inline size_t copy() {
88  p_converter = nullptr;
89  return copyBytes(buffer_size);
90  }
91 
93  inline size_t copy(BaseConverter &converter) {
94  p_converter = &converter;
95  return copyBytes(buffer_size);
96  }
97 
99  inline size_t copyBytes(size_t bytes){
100  LOGD("copy %d bytes %s", (int) bytes, log_name);
101  if (!active) return 0;
102  // if not initialized we do nothing
103  if (from==nullptr && to==nullptr) return 0;
104 
105  // synchronize AudioInfo
106  syncAudioInfo();
107 
108  // E.g. if we try to write to a server we might not have any output destination yet
109  int to_write = to->availableForWrite();
110  if (check_available_for_write && to_write==0){
111  delay(500);
112  return 0;
113  }
114 
115  // resize copy buffer if necessary
116  if (buffer.size() < bytes){
117  LOGI("Resize to %d", (int) bytes);
118  buffer.resize(bytes);
119  }
120 
121  size_t result = 0;
122  size_t delayCount = 0;
123  size_t len = bytes;
124  if (check_available) {
125  len = available();
126  }
127  size_t bytes_to_read = bytes;
128  size_t bytes_read = 0;
129 
130  if (len > 0){
131  bytes_to_read = min(len, static_cast<size_t>(buffer_size));
132  // don't overflow buffer
133  if (to_write > 0){
134  bytes_to_read = min((int)bytes_to_read, to_write);
135  }
136 
137  // round to full frames
138  int copy_size = minCopySize();
139  if (copy_size > 0){
140  size_t samples = bytes_to_read / minCopySize();
141  bytes_to_read = samples * minCopySize();
142  }
143 
144  // get the data now
145  bytes_read = 0;
146  if (bytes_to_read>0){
147  bytes_read = from->readBytes((uint8_t*)&buffer[0], bytes_to_read);
148  }
149 
150  // determine mime
151  notifyMime(buffer.data(), bytes_to_read);
152 
153  // convert data
154  if (p_converter!=nullptr) p_converter->convert((uint8_t*)buffer.data(), result );
155 
156  // write data
157  result = write(bytes_read, delayCount);
158 
159  // callback with unconverted data
160  if (onWrite!=nullptr) onWrite(onWriteObj, &buffer[0], result);
161 
162  #ifndef COPY_LOG_OFF
163  LOGI("StreamCopy::copy %s %u -> %u -> %u bytes - in %u hops",log_name, (unsigned int)bytes_to_read,(unsigned int) bytes_read, (unsigned int)result, (unsigned int)delayCount);
164  #endif
165  //TRACED();
166 
167  if (result == 0){
168  TRACED();
169  // give the processor some time
170  delay(delay_on_no_data);
171  }
172 
173  //TRACED();
174  CHECK_MEMORY();
175  } else {
176  // give the processor some time
177  delay(delay_on_no_data);
178  LOGD("no data %s", log_name);
179  }
180  //TRACED();
181  return result;
182  }
183 
185  size_t copyN(size_t pages){
186  if (!active) return 0;
187  size_t total=0;
188  for (size_t j=0;j<pages;j++){
189  total+=copy();
190  }
191  return total;
192  }
193 
195  size_t copyMs(size_t millis, AudioInfo info){
196  if (!active) return 0;
197  size_t pages = AudioTime::toBytes(millis, info) / buffer_size;
198  return copyN(pages);
199  }
200 
202  size_t copyAll(int retryCount=5, int retryWaitMs=200){
203  TRACED();
204  if (!active) return 0;
205  size_t result = 0;
206  int retry = 0;
207 
208  if (from==nullptr || to == nullptr)
209  return result;
210 
211  // copy while source has data available
212  int count=0;
213  while (true){
214  count = copy();
215  result += count;
216  if (count==0){
217  // wait for more data
218  retry++;
219  delay(retryWaitMs);
220  } else {
221  retry = 0; // after we got new data we restart the counting
222  }
223  // stop the processing if we passed the retry limit
224  if (retry>retryCount){
225  break;
226  }
227  }
228  return result;
229  }
230 
232  int available() {
233  int result = 0;
234  if (from!=nullptr) {
235  if (availableCallback!=nullptr){
236  result = availableCallback((Stream*)from);
237  } else {
238  result = from->available();
239  }
240  } else {
241  LOGW("source not defined");
242  }
243  LOGD("available: %d", result);
244  return result;
245  }
246 
248  void setDelayOnNoData(int delayMs){
249  delay_on_no_data = delayMs;
250  }
251 
253  const char* mime() {
254  return actual_mime;
255  }
256 
258  void setMimeCallback(void (*callback)(const char*)){
259  TRACED();
260  this->notifyMimeCallback = callback;
261  }
262 
264  void setCallbackOnWrite(void (*onWrite)(void*obj, void*buffer, size_t len), void* obj){
265  TRACED();
266  this->onWrite = onWrite;
267  this->onWriteObj = obj;
268  }
269 
271  void setAvailableCallback(int (*callback)(Stream*stream)){
272  availableCallback = callback;
273  }
274 
276  void setRetry(int retry){
277  retryLimit = retry;
278  }
279 
281  int bufferSize() {
282  return buffer_size;
283  }
284 
286  void setCheckAvailableForWrite(bool flag){
287  check_available_for_write = flag;
288  }
289 
292  return check_available_for_write;
293  }
294 
296  void setCheckAvailable(bool flag){
297  check_available = flag;
298  }
299 
302  return check_available;
303  }
304 
306  void resize(int len){
307  buffer_size = len;
308  buffer.resize(buffer_size);
309  }
310 
312  void setActive(bool flag){
313  active = flag;
314  }
315 
317  bool isActive(){
318  return active;
319  }
320 
322  void setLogName(const char* name){
323  log_name = name;
324  }
325 
327  void setRetryDelay(int delay){
328  retry_delay = delay;
329  }
330 
332  int minCopySize() {
333  if (min_copy_size==0){
334  AudioInfo info = from->audioInfoOut();
335  min_copy_size = info.bits_per_sample / 8 * info.channels;
336  }
337  return min_copy_size;
338  }
339 
341  void setMinCopySize(int size){
342  min_copy_size = size;
343  }
344 
346  void setSynchAudioInfo(bool active){
347  is_sync_audio_info = active;
348  }
349 
350  protected:
351  AudioStream *from = nullptr;
352  Print *to = nullptr;
353  Vector<uint8_t> buffer{0};
354  int buffer_size = DEFAULT_BUFFER_SIZE;
355  void (*onWrite)(void*obj, void*buffer, size_t len) = nullptr;
356  void (*notifyMimeCallback)(const char*mime) = nullptr;
357  int (*availableCallback)(Stream*stream)=nullptr;
358  void *onWriteObj = nullptr;
359  bool is_first = false;
360  bool check_available_for_write = false;
361  bool check_available = true;
362  const char* actual_mime = nullptr;
363  int retryLimit = COPY_RETRY_LIMIT;
364  int delay_on_no_data = COPY_DELAY_ON_NODATA;
365  bool active = true;
366  const char* log_name = "";
367  int retry_delay = 10;
368  int channels = 0;
369  int min_copy_size = 1;
370  bool is_sync_audio_info = false;
371  AudioInfoSupport *p_audio_info_support = nullptr;
372  BaseConverter* p_converter = nullptr;
373 
374 
375  void syncAudioInfo(){
376  // synchronize audio info
377  if (is_sync_audio_info && from != nullptr && p_audio_info_support != nullptr){
378  AudioInfo info_from = from->audioInfoOut();
379  AudioInfo info_to = p_audio_info_support->audioInfo();
380  if (info_from != info_to){
381  LOGI("--> StreamCopy: ");
382  p_audio_info_support->setAudioInfo(info_from);
383  }
384  }
385  }
386 
388  size_t write(size_t len, size_t &delayCount ){
389  if (!buffer || len==0) return 0;
390  LOGD("write: %d", (int)len);
391  size_t total = 0;
392  long open = len;
393  int retry = 0;
394  while(open > 0){
395  size_t written = to->write((const uint8_t*)buffer.data()+total, open);
396  LOGD("write: %d -> %d", (int) open, (int) written);
397  total += written;
398  open -= written;
399  delayCount++;
400 
401  if (open > 0){
402  // if we still have progress we reset the retry counter
403  if (written>0) retry = 0;
404 
405  // abort if we reached the retry limit
406  if (retry++ > retryLimit){
407  LOGE("write %s to target has failed after %d retries! (%ld bytes)", log_name, retry, open);
408  break;
409  }
410 
411  // wait a bit
412  if (retry>1) {
413  delay(retry_delay);
414  LOGI("try write %s - %d (open %ld bytes) ",log_name, retry, open);
415  }
416  }
417 
418  CHECK_MEMORY();
419  }
420  return total;
421  }
422 
424  void notifyMime(void* data, size_t len){
425  if (is_first && len>4) {
426  const uint8_t *start = (const uint8_t *) data;
427  actual_mime = "audio/basic";
428  if (start[0]==0xFF && start[1]==0xF1){
429  actual_mime = "audio/aac";
430  } else if (memcmp(start,"ID3",3) || start[0]==0xFF || start[0]==0xFE ){
431  actual_mime = "audio/mpeg";
432  } else if (memcmp(start,"RIFF",4)){
433  actual_mime = "audio/vnd.wave";
434  }
435  if (notifyMimeCallback!=nullptr){
436  notifyMimeCallback(actual_mime);
437  }
438  }
439  is_first = false;
440  }
441 
442 };
443 
451 
452 
453 } // Namespace
virtual AudioInfo audioInfoOut()
provides the actual output AudioInfo: this is usually the same as audioInfo() unless we use a transfo...
Definition: AudioTypes.h:146
Base class for all Audio Streams. It support the boolean operator to test if the object is ready with...
Definition: BaseStream.h:109
To be used to support implementations where the readBytes is not virtual.
Definition: AudioStreams.h:24
static uint32_t toBytes(uint32_t millis, AudioInfo info)
converts milliseconds to bytes
Definition: AudioTypes.h:269
Abstract Base class for Converters A converter is processing the data in the indicated array.
Definition: BaseConverter.h:24
Definition: NoArduino.h:58
Typed Stream Copy which supports the conversion from channel to 2 channels. We make sure that we allw...
Definition: StreamCopy.h:23
void setCheckAvailable(bool flag)
Activates the check that we copy only if available returns a value.
Definition: StreamCopy.h:296
size_t copyMs(size_t millis, AudioInfo info)
Copies audio for the indicated number of milliseconds: note that the resolution is determined by the ...
Definition: StreamCopy.h:195
void setMinCopySize(int size)
Defines the minimum frame size that is used to round the copy size: 0 will automatically try to deter...
Definition: StreamCopy.h:341
int minCopySize()
Determine frame size.
Definition: StreamCopy.h:332
size_t copy(BaseConverter &converter)
copies the data from the source to the destination and applies the converter - the result is the proc...
Definition: StreamCopy.h:93
const char * mime()
Provides the actual mime type, that was determined from the first available data.
Definition: StreamCopy.h:253
size_t copyBytes(size_t bytes)
copies the inicated number of bytes from the source to the destination and returns the processed numb...
Definition: StreamCopy.h:99
size_t write(size_t len, size_t &delayCount)
blocking write - until everything is processed
Definition: StreamCopy.h:388
int available()
available bytes of the data source
Definition: StreamCopy.h:232
void resize(int len)
resizes the copy buffer
Definition: StreamCopy.h:306
void setRetry(int retry)
Defines the max number of retries.
Definition: StreamCopy.h:276
size_t copyN(size_t pages)
Copies pages * buffersize samples: returns the processed number of bytes.
Definition: StreamCopy.h:185
void setCallbackOnWrite(void(*onWrite)(void *obj, void *buffer, size_t len), void *obj)
Defines a callback that is notified with the wirtten data.
Definition: StreamCopy.h:264
void setRetryDelay(int delay)
Defines the delay that is added before we retry an incomplete copy.
Definition: StreamCopy.h:327
void setSynchAudioInfo(bool active)
Activate the synchronization from the AudioInfo form the source to the target.
Definition: StreamCopy.h:346
void setDelayOnNoData(int delayMs)
Defines the dealy that is used if no data is available.
Definition: StreamCopy.h:248
Stream * getFrom()
Provides a pointer to the copy source. Can be used to check if the source is defined.
Definition: StreamCopy.h:77
size_t copyAll(int retryCount=5, int retryWaitMs=200)
copies all data - returns the number of processed bytes
Definition: StreamCopy.h:202
void notifyMime(void *data, size_t len)
Update the mime type.
Definition: StreamCopy.h:424
bool isCheckAvailable()
Is Available check activated ?
Definition: StreamCopy.h:301
bool isActive()
Check if copier is active.
Definition: StreamCopy.h:317
void begin(Print &to, Stream &from)
assign a new output and input stream
Definition: StreamCopy.h:63
void end()
Ends the processing.
Definition: StreamCopy.h:57
void begin()
(Re)starts the processing
Definition: StreamCopy.h:45
int bufferSize()
Provides the buffer size.
Definition: StreamCopy.h:281
void setMimeCallback(void(*callback)(const char *))
Define the callback that will notify about mime changes.
Definition: StreamCopy.h:258
void setAvailableCallback(int(*callback)(Stream *stream))
Defines a callback that provides the available bytes at the source.
Definition: StreamCopy.h:271
Print * getTo()
Provides a pointer to the copy target. Can be used to check if the target is defined.
Definition: StreamCopy.h:82
void setActive(bool flag)
deactivate/activate copy - active by default
Definition: StreamCopy.h:312
bool isCheckAvailableForWrite()
Is Available for Write check activated ?
Definition: StreamCopy.h:291
size_t copy()
copies the data from the source to the destination and returns the processed number of bytes
Definition: StreamCopy.h:87
void setCheckAvailableForWrite(bool flag)
Activates the check that we copy only if available for write returns a value.
Definition: StreamCopy.h:286
void begin(Print &to, AudioStream &from)
assign a new output and input stream
Definition: StreamCopy.h:70
void setLogName(const char *name)
Defines a name which will be printed in the log to identify the copier.
Definition: StreamCopy.h:322
Definition: NoArduino.h:125
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition: AudioConfig.h:823
uint32_t millis()
Returns the milliseconds since the start.
Definition: Time.h:12
Basic Audio information which drives e.g. I2S.
Definition: AudioTypes.h:52
uint16_t channels
Number of channels: 2=stereo, 1=mono.
Definition: AudioTypes.h:57
uint8_t bits_per_sample
Number of bits per sample (int16_t = 16 bits)
Definition: AudioTypes.h:59