arduino-audio-tools
Loading...
Searching...
No Matches
StreamCopy.h
1#pragma once
2
3#include "AudioTools/CoreAudio/AudioLogger.h"
4#include "AudioTools/CoreAudio/AudioMetaData/MimeDetector.h"
5#include "AudioTools/CoreAudio/AudioStreams.h"
6#include "AudioTools/CoreAudio/AudioTypes.h"
7#include "AudioTools/CoreAudio/BaseConverter.h"
8#include "AudioTools/CoreAudio/Buffers.h"
9#include "AudioToolsConfig.h"
10
11#define NOT_ENOUGH_MEMORY_MSG "Could not allocate enough memory: %d bytes"
12
13namespace audio_tools {
14
23template <class T>
25 public:
26 StreamCopyT(Allocator &allocator, int bufferSize = DEFAULT_BUFFER_SIZE)
27 : _allocator(allocator), buffer_size(bufferSize) {
28 TRACED();
29 begin();
30 }
31 StreamCopyT(Print &to, AudioStream &from,
32 int bufferSize = DEFAULT_BUFFER_SIZE,
33 Allocator &allocator = DefaultAllocator)
34 : _allocator(allocator), buffer_size(bufferSize) {
35 TRACED();
36 begin(to, from);
37 }
38
39 StreamCopyT(Print &to, Stream &from, int bufferSize = DEFAULT_BUFFER_SIZE,
40 Allocator &allocator = DefaultAllocator)
41 : _allocator(allocator), buffer_size(bufferSize) {
42 TRACED();
43 begin(to, from);
44 }
45
46 StreamCopyT(int bufferSize = DEFAULT_BUFFER_SIZE,
47 Allocator &allocator = DefaultAllocator)
48 : _allocator(allocator), buffer_size(bufferSize) {
49 TRACED();
50 begin();
51 }
52
53 ~StreamCopyT() { end(); }
54
56 void begin() {
57 TRACED();
58 if (p_mime_detector != nullptr) {
59 p_mime_detector->begin();
60 };
61 resize(buffer_size);
62 if (buffer) {
63 LOGI("buffer_size=%d", buffer_size);
64 } else {
65 LOGE(NOT_ENOUGH_MEMORY_MSG, buffer_size);
66 }
67 }
68
70 void end() {
71 this->from = nullptr;
72 this->to = nullptr;
73 }
74
76 void begin(Print &to, Stream &from) {
77 this->from = &from;
78 this->to = &to;
79 begin();
80 }
81
83 void begin(Print &to, AudioStream &from) {
84 this->from = &from;
85 this->from_audio = &from;
86 this->to = &to;
87 begin();
88 }
89
92 Stream *getFrom() { return from; }
93
96 Print *getTo() { return to; }
97
100 inline size_t copy() {
101 p_converter = nullptr;
102 return copyBytes(buffer_size);
103 }
104
107 inline size_t copy(BaseConverter &converter) {
108 p_converter = &converter;
109 return copyBytes(buffer_size);
110 }
111
114 inline size_t copyBytes(size_t bytes) {
115 LOGD("copy %d bytes %s", (int)bytes, log_name);
116 if (!active) return 0;
117 // if not initialized we do nothing
118 if (from == nullptr && to == nullptr) return 0;
119
120 // if no bytes are requested, we do nothing
121 if (bytes == 0) return 0;
122
123 // synchronize AudioInfo
124 syncAudioInfo();
125
126 // E.g. if we try to write to a server we might not have any output
127 // destination yet
128 int to_write = to->availableForWrite();
129 if (check_available_for_write && to_write == 0) {
130 delay(500);
131 return 0;
132 }
133
134 // resize copy buffer if necessary
135 if (buffer.size() < bytes) {
136 LOGI("Resize to %d", (int)bytes);
137 buffer.resize(bytes);
138 }
139
140 size_t result = 0;
141 size_t delayCount = 0;
142 size_t len = bytes;
143 if (check_available) {
144 len = available();
145 }
146 size_t bytes_to_read = len;
147 size_t bytes_read = 0;
148
149 if (len > 0) {
150 bytes_to_read = min(len, static_cast<size_t>(buffer_size));
151 // don't overflow buffer
152 if (check_available_for_write && to_write > 0) {
153 bytes_to_read = min((int)bytes_to_read, to_write);
154 }
155
156 // round to full frames
157 int copy_size = minCopySize();
158 if (copy_size > 0) {
159 size_t samples = bytes_to_read / minCopySize();
160 bytes_to_read = samples * minCopySize();
161 }
162
163 // get the data now
164 bytes_read = 0;
165 if (bytes_to_read > 0) {
166 bytes_read = from->readBytes((uint8_t *)&buffer[0], bytes_to_read);
167 }
168
169 // determine mime
170 if (p_mime_detector != nullptr) {
171 p_mime_detector->write(buffer.data(), bytes_to_read);
172 }
173
174 // convert data
175 if (p_converter != nullptr)
176 p_converter->convert((uint8_t *)buffer.data(), bytes_read);
177
178 // write data
179 result = write(bytes_read, delayCount);
180
181 // callback with unconverted data
182 if (onWrite != nullptr) onWrite(onWriteObj, &buffer[0], result);
183
184#ifndef COPY_LOG_OFF
185 LOGI("StreamCopy::copy %s %u -> %u -> %u bytes - in %u hops", log_name,
186 (unsigned int)bytes_to_read, (unsigned int)bytes_read,
187 (unsigned int)result, (unsigned int)delayCount);
188#endif
189 // TRACED();
190
191 if (result == 0) {
192 TRACED();
193 // give the processor some time
194 delay(delay_on_no_data);
195 }
196
197 // TRACED();
198 CHECK_MEMORY();
199 } else {
200 // give the processor some time
201 delay(delay_on_no_data);
202 LOGD("no data %s", log_name);
203 }
204 // TRACED();
205 return result;
206 }
207
209 size_t copyN(size_t pages) {
210 if (!active) return 0;
211 size_t total = 0;
212 for (size_t j = 0; j < pages; j++) {
213 total += copy();
214 }
215 return total;
216 }
217
220 size_t copyMs(size_t millis, AudioInfo info) {
221 if (!active) return 0;
222 size_t pages = AudioTime::toBytes(millis, info) / buffer_size;
223 return copyN(pages);
224 }
225
227 size_t copyAll(int retryCount = 5, int retryWaitMs = 200) {
228 TRACED();
229 if (!active) return 0;
230 size_t result = 0;
231 int retry = 0;
232
233 if (from == nullptr || to == nullptr) return result;
234
235 // copy while source has data available
236 int count = 0;
237 while (true) {
238 count = copy();
239 result += count;
240 if (count == 0) {
241 // wait for more data
242 retry++;
243 delay(retryWaitMs);
244 } else {
245 retry = 0; // after we got new data we restart the counting
246 }
247 // stop the processing if we passed the retry limit
248 if (retry > retryCount) {
249 break;
250 }
251 }
252 return result;
253 }
254
256 int available() {
257 int result = 0;
258 if (from != nullptr) {
259 if (availableCallback != nullptr) {
260 result = availableCallback((Stream *)from);
261 } else {
262 result = from->available();
263 }
264 } else {
265 LOGW("source not defined");
266 }
267 LOGD("available: %d", result);
268 return result;
269 }
270
272 void setDelayOnNoData(int delayMs) { delay_on_no_data = delayMs; }
273
275 void setCallbackOnWrite(void (*onWrite)(void *obj, void *buffer, size_t len),
276 void *obj) {
277 TRACED();
278 this->onWrite = onWrite;
279 this->onWriteObj = obj;
280 }
281
283 void setAvailableCallback(int (*callback)(Stream *stream)) {
284 availableCallback = callback;
285 }
286
288 void setRetry(int retry) { retryLimit = retry; }
289
291 int bufferSize() { return buffer_size; }
292
296 check_available_for_write = flag;
297 }
298
300 bool isCheckAvailableForWrite() { return check_available_for_write; }
301
303 void setCheckAvailable(bool flag) { check_available = flag; }
304
306 bool isCheckAvailable() { return check_available; }
307
309 void resize(int len) {
310 buffer_size = len;
311 buffer.resize(len);
312 }
313
315 void setActive(bool flag) { active = flag; }
316
318 bool isActive() { return active; }
319
321 void setLogName(const char *name) { log_name = name; }
322
324 void setRetryDelay(int delay) { retry_delay = delay; }
325
328 if (min_copy_size == 0 && from_audio != nullptr) {
329 AudioInfo info = from_audio->audioInfoOut();
330 min_copy_size = info.bits_per_sample / 8 * info.channels;
331 }
332 return min_copy_size;
333 }
334
337 void setMinCopySize(int size) { min_copy_size = size; }
338
341 void setSynchAudioInfo(bool active) { is_sync_audio_info = active; }
342
344 void setMimeDetector(MimeDetector &mime) { p_mime_detector = &mime; }
345
346 protected:
347 Allocator _allocator;
348 Stream *from = nullptr;
349 AudioStream *from_audio = nullptr;
350 Print *to = nullptr;
351 Vector<uint8_t> buffer{0, _allocator};
352 int buffer_size = DEFAULT_BUFFER_SIZE;
353 void (*onWrite)(void *obj, void *buffer, size_t len) = nullptr;
354 int (*availableCallback)(Stream *stream) = nullptr;
355 void *onWriteObj = nullptr;
356 bool check_available_for_write = false;
357 bool check_available = true;
358 int retryLimit = COPY_RETRY_LIMIT;
359 int delay_on_no_data = COPY_DELAY_ON_NODATA;
360 bool active = true;
361 const char *log_name = "";
362 int retry_delay = 10;
363 int channels = 0;
364 int min_copy_size = 1;
365 bool is_sync_audio_info = false;
366 AudioInfoSupport *p_audio_info_support = nullptr;
367 BaseConverter *p_converter = nullptr;
368 MimeDetector *p_mime_detector = nullptr;
369
370 void syncAudioInfo() {
371 // synchronize audio info
372 if (is_sync_audio_info && from_audio != nullptr &&
373 p_audio_info_support != nullptr) {
374 AudioInfo info_from = from_audio->audioInfoOut();
375 AudioInfo info_to = p_audio_info_support->audioInfo();
376 if (info_from != info_to) {
377 LOGI("--> StreamCopy: ");
378 p_audio_info_support->setAudioInfo(info_from);
379 }
380 }
381 }
382
384 size_t write(size_t len, size_t &delayCount) {
385 if (!buffer || len == 0) return 0;
386 LOGD("write: %d", (int)len);
387 size_t total = 0;
388 long open = len;
389 int retry = 0;
390 while (open > 0) {
391 size_t written = to->write((const uint8_t *)buffer.data() + total, open);
392 LOGD("write: %d -> %d", (int)open, (int)written);
393 total += written;
394 open -= written;
395 delayCount++;
396
397 if (open > 0) {
398 // if we still have progress we reset the retry counter
399 if (written > 0) retry = 0;
400
401 // abort if we reached the retry limit
402 if (retry++ > retryLimit) {
403 LOGE("write %s to target has failed after %d retries! (%ld bytes)",
404 log_name, retry, open);
405 break;
406 }
407
408 // wait a bit
409 if (retry > 1) {
410 delay(retry_delay);
411 LOGI("try write %s - %d (open %ld bytes) ", log_name, retry, open);
412 }
413 }
414 }
415 return total;
416 }
417};
418
426
427} // namespace audio_tools
Memory allocateator which uses malloc.
Definition Allocator.h:23
virtual AudioInfo audioInfoOut()
Definition AudioTypes.h:143
Base class for all Audio Streams. It support the boolean operator to test if the object is ready with...
Definition BaseStream.h:122
static uint32_t toBytes(uint32_t millis, AudioInfo info)
converts milliseconds to bytes
Definition AudioTypes.h:257
Abstract Base class for Converters A converter is processing the data in the indicated array.
Definition BaseConverter.h:24
Logic to detemine the mime type from the content. By default the following mime types are supported (...
Definition MimeDetector.h:61
bool begin()
Sets is_first to true.
Definition MimeDetector.h:80
size_t write(uint8_t *data, size_t len)
write the header to determine the mime
Definition MimeDetector.h:93
Definition NoArduino.h:62
Typed Stream Copy which supports the conversion from channel to 2 channels. We make sure that we allw...
Definition StreamCopy.h:24
void setCheckAvailable(bool flag)
Activates the check that we copy only if available returns a value.
Definition StreamCopy.h:303
size_t copyMs(size_t millis, AudioInfo info)
Definition StreamCopy.h:220
void setMinCopySize(int size)
Definition StreamCopy.h:337
int minCopySize()
Determine frame size.
Definition StreamCopy.h:327
size_t copy(BaseConverter &converter)
Definition StreamCopy.h:107
size_t copyBytes(size_t bytes)
Definition StreamCopy.h:114
size_t write(size_t len, size_t &delayCount)
blocking write - until everything is processed
Definition StreamCopy.h:384
int available()
available bytes of the data source
Definition StreamCopy.h:256
Stream * getFrom()
Definition StreamCopy.h:92
void resize(int len)
resizes the copy buffer
Definition StreamCopy.h:309
void setRetry(int retry)
Defines the max number of retries.
Definition StreamCopy.h:288
size_t copyN(size_t pages)
Copies pages * buffersize samples: returns the processed number of bytes.
Definition StreamCopy.h:209
void setCallbackOnWrite(void(*onWrite)(void *obj, void *buffer, size_t len), void *obj)
Defines a callback that is notified with the wirtten data.
Definition StreamCopy.h:275
void setRetryDelay(int delay)
Defines the delay that is added before we retry an incomplete copy.
Definition StreamCopy.h:324
void setSynchAudioInfo(bool active)
Definition StreamCopy.h:341
void setDelayOnNoData(int delayMs)
Defines the dealy that is used if no data is available.
Definition StreamCopy.h:272
size_t copyAll(int retryCount=5, int retryWaitMs=200)
copies all data - returns the number of processed bytes
Definition StreamCopy.h:227
bool isCheckAvailable()
Is Available check activated ?
Definition StreamCopy.h:306
bool isActive()
Check if copier is active.
Definition StreamCopy.h:318
void begin(Print &to, Stream &from)
assign a new output and input stream
Definition StreamCopy.h:76
void end()
Ends the processing.
Definition StreamCopy.h:70
void begin()
(Re)starts the processing
Definition StreamCopy.h:56
int bufferSize()
Provides the buffer size.
Definition StreamCopy.h:291
void setAvailableCallback(int(*callback)(Stream *stream))
Defines a callback that provides the available bytes at the source.
Definition StreamCopy.h:283
void setMimeDetector(MimeDetector &mime)
Define a mime detector.
Definition StreamCopy.h:344
void setActive(bool flag)
deactivate/activate copy - active by default
Definition StreamCopy.h:315
Print * getTo()
Definition StreamCopy.h:96
bool isCheckAvailableForWrite()
Is Available for Write check activated ?
Definition StreamCopy.h:300
size_t copy()
Definition StreamCopy.h:100
void setCheckAvailableForWrite(bool flag)
Definition StreamCopy.h:295
void begin(Print &to, AudioStream &from)
assign a new output and input stream
Definition StreamCopy.h:83
void setLogName(const char *name)
Defines a name which will be printed in the log to identify the copier.
Definition StreamCopy.h:321
Definition NoArduino.h:142
Vector implementation which provides the most important methods as defined by std::vector....
Definition Vector.h:21
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition AudioCodecsBase.h:10
uint32_t millis()
Returns the milliseconds since the start.
Definition Time.h:12
Basic Audio information which drives e.g. I2S.
Definition AudioTypes.h:55
uint16_t channels
Number of channels: 2=stereo, 1=mono.
Definition AudioTypes.h:59
uint8_t bits_per_sample
Number of bits per sample (int16_t = 16 bits)
Definition AudioTypes.h:61