arduino-audio-tools
Loading...
Searching...
No Matches
StreamCopy.h
1#pragma once
2
3#include "AudioToolsConfig.h"
4#include "AudioTools/CoreAudio/AudioTypes.h"
5#include "AudioTools/CoreAudio/Buffers.h"
6#include "AudioTools/CoreAudio/BaseConverter.h"
7#include "AudioTools/CoreAudio/AudioLogger.h"
8#include "AudioTools/CoreAudio/AudioStreams.h"
9#include "AudioTools/CoreAudio/AudioMetaData/MimeDetector.h"
10
11#define NOT_ENOUGH_MEMORY_MSG "Could not allocate enough memory: %d bytes"
12
13namespace audio_tools {
14
23template <class T>
25 public:
26
27 StreamCopyT(Print &to, AudioStream &from, int bufferSize=DEFAULT_BUFFER_SIZE){
28 TRACED();
29 this->buffer_size = bufferSize;
30 begin(to, from);
31 }
32
33 StreamCopyT(Print &to, Stream &from, int bufferSize=DEFAULT_BUFFER_SIZE){
34 TRACED();
35 this->buffer_size = bufferSize;
36 begin(to, from);
37 }
38
39 StreamCopyT(int bufferSize=DEFAULT_BUFFER_SIZE){
40 TRACED();
41 this->buffer_size = bufferSize;
42 begin();
43 }
44
45 ~StreamCopyT() {
46 end();
47 }
48
50 void begin(){
51 TRACED();
52 if (p_mime_detector!=nullptr) {
53 p_mime_detector->begin();
54 };
55 resize(buffer_size);
56 if (buffer){
57 LOGI("buffer_size=%d",buffer_size);
58 } else {
59 LOGE(NOT_ENOUGH_MEMORY_MSG, buffer_size);
60 }
61 }
62
64 void end() {
65 this->from = nullptr;
66 this->to = nullptr;
67 }
68
70 void begin(Print &to, Stream &from){
71 this->from = &from;
72 this->to = &to;
73 begin();
74 }
75
77 void begin(Print &to, AudioStream &from){
78 this->from = &from;
79 this->from_audio = &from;
80 this->to = &to;
81 begin();
82 }
83
86 return from;
87 }
88
91 return to;
92 }
93
95 inline size_t copy() {
96 p_converter = nullptr;
97 return copyBytes(buffer_size);
98 }
99
101 inline size_t copy(BaseConverter &converter) {
102 p_converter = &converter;
103 return copyBytes(buffer_size);
104 }
105
107 inline size_t copyBytes(size_t bytes){
108 LOGD("copy %d bytes %s", (int) bytes, log_name);
109 if (!active) return 0;
110 // if not initialized we do nothing
111 if (from==nullptr && to==nullptr) return 0;
112
113 // if no bytes are requested, we do nothing
114 if (bytes == 0) return 0;
115
116 // synchronize AudioInfo
117 syncAudioInfo();
118
119 // E.g. if we try to write to a server we might not have any output destination yet
120 int to_write = to->availableForWrite();
121 if (check_available_for_write && to_write==0){
122 delay(500);
123 return 0;
124 }
125
126 // resize copy buffer if necessary
127 if (buffer.size() < bytes){
128 LOGI("Resize to %d", (int) bytes);
129 buffer.resize(bytes);
130 }
131
132 size_t result = 0;
133 size_t delayCount = 0;
134 size_t len = bytes;
135 if (check_available) {
136 len = available();
137 }
138 size_t bytes_to_read = bytes;
139 size_t bytes_read = 0;
140
141 if (len > 0){
142 bytes_to_read = min(len, static_cast<size_t>(buffer_size));
143 // don't overflow buffer
144 if (to_write > 0){
145 bytes_to_read = min((int)bytes_to_read, to_write);
146 }
147
148 // round to full frames
149 int copy_size = minCopySize();
150 if (copy_size > 0){
151 size_t samples = bytes_to_read / minCopySize();
152 bytes_to_read = samples * minCopySize();
153 }
154
155 // get the data now
156 bytes_read = 0;
157 if (bytes_to_read>0){
158 bytes_read = from->readBytes((uint8_t*)&buffer[0], bytes_to_read);
159 }
160
161 // determine mime
162 if (p_mime_detector != nullptr){
163 p_mime_detector->write(buffer.data(), bytes_to_read);
164 }
165
166 // convert data
167 if (p_converter!=nullptr) p_converter->convert((uint8_t*)buffer.data(), bytes_read );
168
169 // write data
170 result = write(bytes_read, delayCount);
171
172 // callback with unconverted data
173 if (onWrite!=nullptr) onWrite(onWriteObj, &buffer[0], result);
174
175 #ifndef COPY_LOG_OFF
176 LOGI("StreamCopy::copy %s %u -> %u -> %u bytes - in %u hops",log_name, (unsigned int)bytes_to_read,(unsigned int) bytes_read, (unsigned int)result, (unsigned int)delayCount);
177 #endif
178 //TRACED();
179
180 if (result == 0){
181 TRACED();
182 // give the processor some time
183 delay(delay_on_no_data);
184 }
185
186 //TRACED();
187 CHECK_MEMORY();
188 } else {
189 // give the processor some time
190 delay(delay_on_no_data);
191 LOGD("no data %s", log_name);
192 }
193 //TRACED();
194 return result;
195 }
196
198 size_t copyN(size_t pages){
199 if (!active) return 0;
200 size_t total=0;
201 for (size_t j=0;j<pages;j++){
202 total+=copy();
203 }
204 return total;
205 }
206
208 size_t copyMs(size_t millis, AudioInfo info){
209 if (!active) return 0;
210 size_t pages = AudioTime::toBytes(millis, info) / buffer_size;
211 return copyN(pages);
212 }
213
215 size_t copyAll(int retryCount=5, int retryWaitMs=200){
216 TRACED();
217 if (!active) return 0;
218 size_t result = 0;
219 int retry = 0;
220
221 if (from==nullptr || to == nullptr)
222 return result;
223
224 // copy while source has data available
225 int count=0;
226 while (true){
227 count = copy();
228 result += count;
229 if (count==0){
230 // wait for more data
231 retry++;
232 delay(retryWaitMs);
233 } else {
234 retry = 0; // after we got new data we restart the counting
235 }
236 // stop the processing if we passed the retry limit
237 if (retry>retryCount){
238 break;
239 }
240 }
241 return result;
242 }
243
245 int available() {
246 int result = 0;
247 if (from!=nullptr) {
248 if (availableCallback!=nullptr){
249 result = availableCallback((Stream*)from);
250 } else {
251 result = from->available();
252 }
253 } else {
254 LOGW("source not defined");
255 }
256 LOGD("available: %d", result);
257 return result;
258 }
259
261 void setDelayOnNoData(int delayMs){
262 delay_on_no_data = delayMs;
263 }
264
266 void setCallbackOnWrite(void (*onWrite)(void*obj, void*buffer, size_t len), void* obj){
267 TRACED();
268 this->onWrite = onWrite;
269 this->onWriteObj = obj;
270 }
271
273 void setAvailableCallback(int (*callback)(Stream*stream)){
274 availableCallback = callback;
275 }
276
278 void setRetry(int retry){
279 retryLimit = retry;
280 }
281
284 return buffer_size;
285 }
286
289 check_available_for_write = flag;
290 }
291
294 return check_available_for_write;
295 }
296
298 void setCheckAvailable(bool flag){
299 check_available = flag;
300 }
301
304 return check_available;
305 }
306
308 void resize(int len){
309 buffer_size = len;
310 buffer.resize(buffer_size);
311 }
312
314 void setActive(bool flag){
315 active = flag;
316 }
317
319 bool isActive(){
320 return active;
321 }
322
324 void setLogName(const char* name){
325 log_name = name;
326 }
327
329 void setRetryDelay(int delay){
330 retry_delay = delay;
331 }
332
335 if (min_copy_size==0 && from_audio != nullptr){
336 AudioInfo info = from_audio->audioInfoOut();
337 min_copy_size = info.bits_per_sample / 8 * info.channels;
338 }
339 return min_copy_size;
340 }
341
343 void setMinCopySize(int size){
344 min_copy_size = size;
345 }
346
348 void setSynchAudioInfo(bool active){
349 is_sync_audio_info = active;
350 }
351
354 p_mime_detector = &mime;
355 }
356
357 protected:
358 Stream *from = nullptr;
359 AudioStream *from_audio = nullptr;
360 Print *to = nullptr;
361 Vector<uint8_t> buffer{0};
362 int buffer_size = DEFAULT_BUFFER_SIZE;
363 void (*onWrite)(void*obj, void*buffer, size_t len) = nullptr;
364 int (*availableCallback)(Stream*stream)=nullptr;
365 void *onWriteObj = nullptr;
366 bool check_available_for_write = false;
367 bool check_available = true;
368 int retryLimit = COPY_RETRY_LIMIT;
369 int delay_on_no_data = COPY_DELAY_ON_NODATA;
370 bool active = true;
371 const char* log_name = "";
372 int retry_delay = 10;
373 int channels = 0;
374 int min_copy_size = 1;
375 bool is_sync_audio_info = false;
376 AudioInfoSupport *p_audio_info_support = nullptr;
377 BaseConverter* p_converter = nullptr;
378 MimeDetector* p_mime_detector = nullptr;
379
380 void syncAudioInfo(){
381 // synchronize audio info
382 if (is_sync_audio_info && from_audio != nullptr && p_audio_info_support != nullptr){
383 AudioInfo info_from = from_audio->audioInfoOut();
384 AudioInfo info_to = p_audio_info_support->audioInfo();
385 if (info_from != info_to){
386 LOGI("--> StreamCopy: ");
387 p_audio_info_support->setAudioInfo(info_from);
388 }
389 }
390 }
391
393 size_t write(size_t len, size_t &delayCount ){
394 if (!buffer || len==0) return 0;
395 LOGD("write: %d", (int)len);
396 size_t total = 0;
397 long open = len;
398 int retry = 0;
399 while(open > 0){
400 size_t written = to->write((const uint8_t*)buffer.data()+total, open);
401 LOGD("write: %d -> %d", (int) open, (int) written);
402 total += written;
403 open -= written;
404 delayCount++;
405
406 if (open > 0){
407 // if we still have progress we reset the retry counter
408 if (written>0) retry = 0;
409
410 // abort if we reached the retry limit
411 if (retry++ > retryLimit){
412 LOGE("write %s to target has failed after %d retries! (%ld bytes)", log_name, retry, open);
413 break;
414 }
415
416 // wait a bit
417 if (retry>1) {
418 delay(retry_delay);
419 LOGI("try write %s - %d (open %ld bytes) ",log_name, retry, open);
420 }
421 }
422 }
423 return total;
424 }
425};
426
434
435
436} // Namespace
virtual AudioInfo audioInfoOut()
Definition AudioTypes.h:141
Base class for all Audio Streams. It support the boolean operator to test if the object is ready with...
Definition BaseStream.h:122
static uint32_t toBytes(uint32_t millis, AudioInfo info)
converts milliseconds to bytes
Definition AudioTypes.h:255
Abstract Base class for Converters A converter is processing the data in the indicated array.
Definition BaseConverter.h:24
Logic to detemine the mime type from the content. By default the following mime types are supported (...
Definition MimeDetector.h:61
bool begin()
Sets is_first to true.
Definition MimeDetector.h:80
size_t write(uint8_t *data, size_t len)
write the header to determine the mime
Definition MimeDetector.h:93
Definition NoArduino.h:62
Typed Stream Copy which supports the conversion from channel to 2 channels. We make sure that we allw...
Definition StreamCopy.h:24
void setCheckAvailable(bool flag)
Activates the check that we copy only if available returns a value.
Definition StreamCopy.h:298
size_t copyMs(size_t millis, AudioInfo info)
Copies audio for the indicated number of milliseconds: note that the resolution is determined by the ...
Definition StreamCopy.h:208
void setMinCopySize(int size)
Defines the minimum frame size that is used to round the copy size: 0 will automatically try to deter...
Definition StreamCopy.h:343
int minCopySize()
Determine frame size.
Definition StreamCopy.h:334
size_t copy(BaseConverter &converter)
copies the data from the source to the destination and applies the converter - the result is the proc...
Definition StreamCopy.h:101
size_t copyBytes(size_t bytes)
copies the inicated number of bytes from the source to the destination and returns the processed numb...
Definition StreamCopy.h:107
size_t write(size_t len, size_t &delayCount)
blocking write - until everything is processed
Definition StreamCopy.h:393
int available()
available bytes of the data source
Definition StreamCopy.h:245
Stream * getFrom()
Provides a pointer to the copy source. Can be used to check if the source is defined.
Definition StreamCopy.h:85
void resize(int len)
resizes the copy buffer
Definition StreamCopy.h:308
void setRetry(int retry)
Defines the max number of retries.
Definition StreamCopy.h:278
size_t copyN(size_t pages)
Copies pages * buffersize samples: returns the processed number of bytes.
Definition StreamCopy.h:198
void setCallbackOnWrite(void(*onWrite)(void *obj, void *buffer, size_t len), void *obj)
Defines a callback that is notified with the wirtten data.
Definition StreamCopy.h:266
void setRetryDelay(int delay)
Defines the delay that is added before we retry an incomplete copy.
Definition StreamCopy.h:329
void setSynchAudioInfo(bool active)
Activate the synchronization from the AudioInfo form the source to the target.
Definition StreamCopy.h:348
void setDelayOnNoData(int delayMs)
Defines the dealy that is used if no data is available.
Definition StreamCopy.h:261
size_t copyAll(int retryCount=5, int retryWaitMs=200)
copies all data - returns the number of processed bytes
Definition StreamCopy.h:215
bool isCheckAvailable()
Is Available check activated ?
Definition StreamCopy.h:303
bool isActive()
Check if copier is active.
Definition StreamCopy.h:319
void begin(Print &to, Stream &from)
assign a new output and input stream
Definition StreamCopy.h:70
void end()
Ends the processing.
Definition StreamCopy.h:64
void begin()
(Re)starts the processing
Definition StreamCopy.h:50
int bufferSize()
Provides the buffer size.
Definition StreamCopy.h:283
void setAvailableCallback(int(*callback)(Stream *stream))
Defines a callback that provides the available bytes at the source.
Definition StreamCopy.h:273
void setMimeDetector(MimeDetector &mime)
Define a mime detector.
Definition StreamCopy.h:353
void setActive(bool flag)
deactivate/activate copy - active by default
Definition StreamCopy.h:314
Print * getTo()
Provides a pointer to the copy target. Can be used to check if the target is defined.
Definition StreamCopy.h:90
bool isCheckAvailableForWrite()
Is Available for Write check activated ?
Definition StreamCopy.h:293
size_t copy()
copies the data from the source to the destination and returns the processed number of bytes
Definition StreamCopy.h:95
void setCheckAvailableForWrite(bool flag)
Activates the check that we copy only if available for write returns a value.
Definition StreamCopy.h:288
void begin(Print &to, AudioStream &from)
assign a new output and input stream
Definition StreamCopy.h:77
void setLogName(const char *name)
Defines a name which will be printed in the log to identify the copier.
Definition StreamCopy.h:324
Definition NoArduino.h:142
Vector implementation which provides the most important methods as defined by std::vector....
Definition Vector.h:21
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition AudioCodecsBase.h:10
uint32_t millis()
Returns the milliseconds since the start.
Definition Time.h:12
Basic Audio information which drives e.g. I2S.
Definition AudioTypes.h:53
uint16_t channels
Number of channels: 2=stereo, 1=mono.
Definition AudioTypes.h:57
uint8_t bits_per_sample
Number of bits per sample (int16_t = 16 bits)
Definition AudioTypes.h:59