arduino-audio-tools
All Classes Namespaces Files Functions Variables Typedefs Enumerations Friends Modules Pages
StreamCopy.h
1#pragma once
2
3#include "AudioConfig.h"
4#include "AudioTools/CoreAudio/AudioTypes.h"
5#include "AudioTools/CoreAudio/Buffers.h"
6#include "AudioTools/CoreAudio/BaseConverter.h"
7#include "AudioTools/CoreAudio/AudioLogger.h"
8#include "AudioTools/CoreAudio/AudioStreams.h"
9#include "AudioTools/CoreAudio/AudioMetaData/MimeDetector.h"
10
11#define NOT_ENOUGH_MEMORY_MSG "Could not allocate enough memory: %d bytes"
12
13namespace audio_tools {
14
23template <class T>
25 public:
26
27 StreamCopyT(Print &to, AudioStream &from, int bufferSize=DEFAULT_BUFFER_SIZE){
28 TRACED();
29 this->buffer_size = bufferSize;
30 begin(to, from);
31 }
32
33 StreamCopyT(Print &to, Stream &from, int bufferSize=DEFAULT_BUFFER_SIZE){
34 TRACED();
35 this->buffer_size = bufferSize;
36 begin(to, from);
37 }
38
39 StreamCopyT(int bufferSize=DEFAULT_BUFFER_SIZE){
40 TRACED();
41 this->buffer_size = bufferSize;
42 begin();
43 }
44
45 ~StreamCopyT() {
46 end();
47 }
48
50 void begin(){
51 TRACED();
52 if (p_mime_detector!=nullptr) {
53 p_mime_detector->begin();
54 };
55 resize(buffer_size);
56 if (buffer){
57 LOGI("buffer_size=%d",buffer_size);
58 } else {
59 LOGE(NOT_ENOUGH_MEMORY_MSG, buffer_size);
60 }
61 }
62
64 void end() {
65 this->from = nullptr;
66 this->to = nullptr;
67 }
68
70 void begin(Print &to, Stream &from){
71 this->from = &from;
72 this->to = &to;
73 begin();
74 }
75
77 void begin(Print &to, AudioStream &from){
78 this->from = &from;
79 this->from_audio = &from;
80 this->to = &to;
81 begin();
82 }
83
86 return from;
87 }
88
91 return to;
92 }
93
95 inline size_t copy() {
96 p_converter = nullptr;
97 return copyBytes(buffer_size);
98 }
99
101 inline size_t copy(BaseConverter &converter) {
102 p_converter = &converter;
103 return copyBytes(buffer_size);
104 }
105
107 inline size_t copyBytes(size_t bytes){
108 LOGD("copy %d bytes %s", (int) bytes, log_name);
109 if (!active) return 0;
110 // if not initialized we do nothing
111 if (from==nullptr && to==nullptr) return 0;
112
113 // synchronize AudioInfo
114 syncAudioInfo();
115
116 // E.g. if we try to write to a server we might not have any output destination yet
117 int to_write = to->availableForWrite();
118 if (check_available_for_write && to_write==0){
119 delay(500);
120 return 0;
121 }
122
123 // resize copy buffer if necessary
124 if (buffer.size() < bytes){
125 LOGI("Resize to %d", (int) bytes);
126 buffer.resize(bytes);
127 }
128
129 size_t result = 0;
130 size_t delayCount = 0;
131 size_t len = bytes;
132 if (check_available) {
133 len = available();
134 }
135 size_t bytes_to_read = bytes;
136 size_t bytes_read = 0;
137
138 if (len > 0){
139 bytes_to_read = min(len, static_cast<size_t>(buffer_size));
140 // don't overflow buffer
141 if (to_write > 0){
143 }
144
145 // round to full frames
146 int copy_size = minCopySize();
147 if (copy_size > 0){
148 size_t samples = bytes_to_read / minCopySize();
149 bytes_to_read = samples * minCopySize();
150 }
151
152 // get the data now
153 bytes_read = 0;
154 if (bytes_to_read>0){
155 bytes_read = from->readBytes((uint8_t*)&buffer[0], bytes_to_read);
156 }
157
158 // determine mime
159 if (p_mime_detector != nullptr){
160 p_mime_detector->write(buffer.data(), bytes_to_read);
161 }
162
163 // convert data
164 if (p_converter!=nullptr) p_converter->convert((uint8_t*)buffer.data(), bytes_read );
165
166 // write data
167 result = write(bytes_read, delayCount);
168
169 // callback with unconverted data
170 if (onWrite!=nullptr) onWrite(onWriteObj, &buffer[0], result);
171
172 #ifndef COPY_LOG_OFF
173 LOGI("StreamCopy::copy %s %u -> %u -> %u bytes - in %u hops",log_name, (unsigned int)bytes_to_read,(unsigned int) bytes_read, (unsigned int)result, (unsigned int)delayCount);
174 #endif
175 //TRACED();
176
177 if (result == 0){
178 TRACED();
179 // give the processor some time
180 delay(delay_on_no_data);
181 }
182
183 //TRACED();
184 CHECK_MEMORY();
185 } else {
186 // give the processor some time
187 delay(delay_on_no_data);
188 LOGD("no data %s", log_name);
189 }
190 //TRACED();
191 return result;
192 }
193
195 size_t copyN(size_t pages){
196 if (!active) return 0;
197 size_t total=0;
198 for (size_t j=0;j<pages;j++){
199 total+=copy();
200 }
201 return total;
202 }
203
205 size_t copyMs(size_t millis, AudioInfo info){
206 if (!active) return 0;
207 size_t pages = AudioTime::toBytes(millis, info) / buffer_size;
208 return copyN(pages);
209 }
210
212 size_t copyAll(int retryCount=5, int retryWaitMs=200){
213 TRACED();
214 if (!active) return 0;
215 size_t result = 0;
216 int retry = 0;
217
218 if (from==nullptr || to == nullptr)
219 return result;
220
221 // copy while source has data available
222 int count=0;
223 while (true){
224 count = copy();
225 result += count;
226 if (count==0){
227 // wait for more data
228 retry++;
229 delay(retryWaitMs);
230 } else {
231 retry = 0; // after we got new data we restart the counting
232 }
233 // stop the processing if we passed the retry limit
234 if (retry>retryCount){
235 break;
236 }
237 }
238 return result;
239 }
240
242 int available() {
243 int result = 0;
244 if (from!=nullptr) {
245 if (availableCallback!=nullptr){
246 result = availableCallback((Stream*)from);
247 } else {
248 result = from->available();
249 }
250 } else {
251 LOGW("source not defined");
252 }
253 LOGD("available: %d", result);
254 return result;
255 }
256
259 delay_on_no_data = delayMs;
260 }
261
263 void setCallbackOnWrite(void (*onWrite)(void*obj, void*buffer, size_t len), void* obj){
264 TRACED();
265 this->onWrite = onWrite;
266 this->onWriteObj = obj;
267 }
268
270 void setAvailableCallback(int (*callback)(Stream*stream)){
271 availableCallback = callback;
272 }
273
275 void setRetry(int retry){
276 retryLimit = retry;
277 }
278
281 return buffer_size;
282 }
283
286 check_available_for_write = flag;
287 }
288
291 return check_available_for_write;
292 }
293
296 check_available = flag;
297 }
298
301 return check_available;
302 }
303
305 void resize(int len){
306 buffer_size = len;
307 buffer.resize(buffer_size);
308 }
309
311 void setActive(bool flag){
312 active = flag;
313 }
314
316 bool isActive(){
317 return active;
318 }
319
321 void setLogName(const char* name){
322 log_name = name;
323 }
324
326 void setRetryDelay(int delay){
327 retry_delay = delay;
328 }
329
332 if (min_copy_size==0 && from_audio != nullptr){
333 AudioInfo info = from_audio->audioInfoOut();
334 min_copy_size = info.bits_per_sample / 8 * info.channels;
335 }
336 return min_copy_size;
337 }
338
340 void setMinCopySize(int size){
341 min_copy_size = size;
342 }
343
345 void setSynchAudioInfo(bool active){
346 is_sync_audio_info = active;
347 }
348
351 p_mime_detector = &mime;
352 }
353
354 protected:
355 Stream *from = nullptr;
356 AudioStream *from_audio = nullptr;
357 Print *to = nullptr;
358 Vector<uint8_t> buffer{0};
359 int buffer_size = DEFAULT_BUFFER_SIZE;
360 void (*onWrite)(void*obj, void*buffer, size_t len) = nullptr;
361 int (*availableCallback)(Stream*stream)=nullptr;
362 void *onWriteObj = nullptr;
363 bool check_available_for_write = false;
364 bool check_available = true;
365 int retryLimit = COPY_RETRY_LIMIT;
366 int delay_on_no_data = COPY_DELAY_ON_NODATA;
367 bool active = true;
368 const char* log_name = "";
369 int retry_delay = 10;
370 int channels = 0;
371 int min_copy_size = 1;
372 bool is_sync_audio_info = false;
373 AudioInfoSupport *p_audio_info_support = nullptr;
374 BaseConverter* p_converter = nullptr;
375 MimeDetector* p_mime_detector = nullptr;
376
377 void syncAudioInfo(){
378 // synchronize audio info
379 if (is_sync_audio_info && from_audio != nullptr && p_audio_info_support != nullptr){
380 AudioInfo info_from = from_audio->audioInfoOut();
381 AudioInfo info_to = p_audio_info_support->audioInfo();
382 if (info_from != info_to){
383 LOGI("--> StreamCopy: ");
384 p_audio_info_support->setAudioInfo(info_from);
385 }
386 }
387 }
388
390 size_t write(size_t len, size_t &delayCount ){
391 if (!buffer || len==0) return 0;
392 LOGD("write: %d", (int)len);
393 size_t total = 0;
394 long open = len;
395 int retry = 0;
396 while(open > 0){
397 size_t written = to->write((const uint8_t*)buffer.data()+total, open);
398 LOGD("write: %d -> %d", (int) open, (int) written);
399 total += written;
400 open -= written;
401 delayCount++;
402
403 if (open > 0){
404 // if we still have progress we reset the retry counter
405 if (written>0) retry = 0;
406
407 // abort if we reached the retry limit
408 if (retry++ > retryLimit){
409 LOGE("write %s to target has failed after %d retries! (%ld bytes)", log_name, retry, open);
410 break;
411 }
412
413 // wait a bit
414 if (retry>1) {
415 delay(retry_delay);
416 LOGI("try write %s - %d (open %ld bytes) ",log_name, retry, open);
417 }
418 }
419 }
420 return total;
421 }
422};
423
431
432
433} // Namespace
virtual AudioInfo audioInfoOut()
provides the actual output AudioInfo: this is usually the same as audioInfo() unless we use a transfo...
Definition AudioTypes.h:146
Base class for all Audio Streams. It support the boolean operator to test if the object is ready with...
Definition BaseStream.h:115
static uint32_t toBytes(uint32_t millis, AudioInfo info)
converts milliseconds to bytes
Definition AudioTypes.h:263
Abstract Base class for Converters A converter is processing the data in the indicated array.
Definition BaseConverter.h:24
Logic to detemine the mime type from the content. By default the following mime types are supported (...
Definition MimeDetector.h:22
size_t write(uint8_t *data, size_t len)
write the header to determine the mime
Definition MimeDetector.h:37
Definition NoArduino.h:62
Typed Stream Copy which supports the conversion from channel to 2 channels. We make sure that we allw...
Definition StreamCopy.h:24
void setCheckAvailable(bool flag)
Activates the check that we copy only if available returns a value.
Definition StreamCopy.h:295
size_t copyMs(size_t millis, AudioInfo info)
Copies audio for the indicated number of milliseconds: note that the resolution is determined by the ...
Definition StreamCopy.h:205
void setMinCopySize(int size)
Defines the minimum frame size that is used to round the copy size: 0 will automatically try to deter...
Definition StreamCopy.h:340
int minCopySize()
Determine frame size.
Definition StreamCopy.h:331
size_t copy(BaseConverter &converter)
copies the data from the source to the destination and applies the converter - the result is the proc...
Definition StreamCopy.h:101
size_t copyBytes(size_t bytes)
copies the inicated number of bytes from the source to the destination and returns the processed numb...
Definition StreamCopy.h:107
size_t write(size_t len, size_t &delayCount)
blocking write - until everything is processed
Definition StreamCopy.h:390
int available()
available bytes of the data source
Definition StreamCopy.h:242
Stream * getFrom()
Provides a pointer to the copy source. Can be used to check if the source is defined.
Definition StreamCopy.h:85
void resize(int len)
resizes the copy buffer
Definition StreamCopy.h:305
void setRetry(int retry)
Defines the max number of retries.
Definition StreamCopy.h:275
size_t copyN(size_t pages)
Copies pages * buffersize samples: returns the processed number of bytes.
Definition StreamCopy.h:195
void setCallbackOnWrite(void(*onWrite)(void *obj, void *buffer, size_t len), void *obj)
Defines a callback that is notified with the wirtten data.
Definition StreamCopy.h:263
void setRetryDelay(int delay)
Defines the delay that is added before we retry an incomplete copy.
Definition StreamCopy.h:326
void setSynchAudioInfo(bool active)
Activate the synchronization from the AudioInfo form the source to the target.
Definition StreamCopy.h:345
void setDelayOnNoData(int delayMs)
Defines the dealy that is used if no data is available.
Definition StreamCopy.h:258
size_t copyAll(int retryCount=5, int retryWaitMs=200)
copies all data - returns the number of processed bytes
Definition StreamCopy.h:212
bool isCheckAvailable()
Is Available check activated ?
Definition StreamCopy.h:300
bool isActive()
Check if copier is active.
Definition StreamCopy.h:316
void begin(Print &to, Stream &from)
assign a new output and input stream
Definition StreamCopy.h:70
void end()
Ends the processing.
Definition StreamCopy.h:64
void begin()
(Re)starts the processing
Definition StreamCopy.h:50
int bufferSize()
Provides the buffer size.
Definition StreamCopy.h:280
void setAvailableCallback(int(*callback)(Stream *stream))
Defines a callback that provides the available bytes at the source.
Definition StreamCopy.h:270
void setMimeDetector(MimeDetector &mime)
Define a mime detector.
Definition StreamCopy.h:350
void setActive(bool flag)
deactivate/activate copy - active by default
Definition StreamCopy.h:311
Print * getTo()
Provides a pointer to the copy target. Can be used to check if the target is defined.
Definition StreamCopy.h:90
bool isCheckAvailableForWrite()
Is Available for Write check activated ?
Definition StreamCopy.h:290
size_t copy()
copies the data from the source to the destination and returns the processed number of bytes
Definition StreamCopy.h:95
void setCheckAvailableForWrite(bool flag)
Activates the check that we copy only if available for write returns a value.
Definition StreamCopy.h:285
void begin(Print &to, AudioStream &from)
assign a new output and input stream
Definition StreamCopy.h:77
void setLogName(const char *name)
Defines a name which will be printed in the log to identify the copier.
Definition StreamCopy.h:321
Definition NoArduino.h:142
Vector implementation which provides the most important methods as defined by std::vector....
Definition Vector.h:21
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition AudioConfig.h:885
uint32_t millis()
Returns the milliseconds since the start.
Definition Time.h:12
Basic Audio information which drives e.g. I2S.
Definition AudioTypes.h:52
uint16_t channels
Number of channels: 2=stereo, 1=mono.
Definition AudioTypes.h:57
uint8_t bits_per_sample
Number of bits per sample (int16_t = 16 bits)
Definition AudioTypes.h:59