2 #include "AudioTools/CoreAudio/Buffers.h"
8 #ifdef USE_STREAM_WRITE_OVERRIDE
9 #define STREAM_WRITE_OVERRIDE override
11 #define STREAM_WRITE_OVERRIDE
14 #ifdef USE_STREAM_READ_OVERRIDE
15 #define STREAM_READ_OVERRIDE override
17 #define STREAM_READ_OVERRIDE
20 #ifdef USE_STREAM_READCHAR_OVERRIDE
21 #define STREAM_READCHAR_OVERRIDE override
23 #define STREAM_READCHAR_OVERRIDE
41 virtual bool begin(){
return true;}
44 virtual size_t readBytes(uint8_t *data,
45 size_t len) STREAM_READ_OVERRIDE = 0;
46 virtual size_t write(
const uint8_t *data,
size_t len)
override = 0;
48 virtual size_t write(uint8_t ch)
override {
49 tmp_out.resize(MAX_SINGLE_CHARS);
53 return tmp_out.
write(ch);
56 virtual int available()
override {
return DEFAULT_BUFFER_SIZE; };
58 virtual int availableForWrite()
override {
return DEFAULT_BUFFER_SIZE; }
60 virtual void flush()
override {
69 virtual size_t readBytes(
char *data,
size_t len) STREAM_READCHAR_OVERRIDE {
70 return readBytes((uint8_t *)data, len);
73 virtual int read()
override {
76 if (tmp_in.isEmpty())
return -1;
80 virtual int peek()
override {
83 if (tmp_in.isEmpty())
return -1;
93 void refillReadBuffer() {
94 tmp_in.resize(DEFAULT_BUFFER_SIZE);
95 if (tmp_in.isEmpty()) {
97 const int len = tmp_in.
size();
99 int len_eff = readBytes(bytes, len);
124 if (info != newInfo){
131 out_new.logInfo(
"out:");
132 notifyAudioChange(out_new);
137 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
return not_supported(0,
"readBytes"); }
139 virtual size_t write(
const uint8_t *data,
size_t len)
override{
return not_supported(0,
"write"); }
142 virtual operator bool() {
return info && available() > 0; }
152 for (
int j=0;j<len/2;j++){
153 write((uint8_t*)&zero,2);
159 memset(buffer, 0, length);
166 virtual int not_supported(
int out,
const char *msg =
"") {
167 LOGE(
"AudioStream: %s unsupported operation!", msg);
190 void add(
Stream *stream) { input_streams.push_back(stream); }
191 void add(
Stream &stream) { input_streams.push_back(&stream); }
198 void end() { is_active =
false; }
200 int available()
override {
201 if (!is_active)
return 0;
205 return availableWithTimout();
208 size_t readBytes(uint8_t *data,
size_t len)
override {
209 if (!is_active)
return 0;
213 return p_current_stream->readBytes(data, len);
217 operator bool() {
return is_active && available() > 0; }
219 void setOnBeginCallback(
void (*callback)(
Stream *stream)) {
220 begin_callback = callback;
222 void setOnEndCallback(
void (*callback)(Stream *stream)) {
223 end_callback = callback;
225 void setTimeout(uint32_t t) { _timeout = t; }
228 size_t write(
const uint8_t *data,
size_t size)
override {
return 0;};
232 Stream *p_current_stream =
nullptr;
233 bool is_active =
false;
234 void (*begin_callback)(
Stream *stream) =
nullptr;
235 void (*end_callback)(
Stream *stream) =
nullptr;
236 uint_fast32_t _timeout = 0;
242 if (p_current_stream !=
nullptr && p_current_stream->available() > 0)
245 if ((p_current_stream ==
nullptr || availableWithTimout() == 0)) {
246 if (end_callback && p_current_stream) end_callback(p_current_stream);
247 if (!input_streams.empty()) {
248 LOGI(
"using next stream");
249 p_current_stream = input_streams[0];
250 input_streams.pop_front();
251 if (begin_callback && p_current_stream)
252 begin_callback(p_current_stream);
254 p_current_stream =
nullptr;
258 return p_current_stream !=
nullptr;
261 int availableWithTimout() {
262 int result = p_current_stream->available();
264 for (
int j = 0; j < _timeout / 10; j++) {
266 result = p_current_stream->available();
267 if (result != 0)
break;
283 size_t write(
const uint8_t *data,
size_t len)
override {
return len; }
285 size_t readBytes(uint8_t *data,
size_t len)
override {
286 memset(data, 0, len);
304 bool autoRemoveOldestDataIfFull =
false) {
306 callback_buffer_ptr =
new NBuffer<T>(bufferSize, bufferCount);
307 remove_oldest_data = autoRemoveOldestDataIfFull;
312 callback_buffer_ptr = &buffer;
317 delete callback_buffer_ptr;
329 virtual bool begin(
size_t activeWhenPercentFilled) {
331 size_t size = callback_buffer_ptr->size() *
sizeof(T);
333 active_limit = size * activeWhenPercentFilled / 100;
343 int available()
override {
344 return active ? callback_buffer_ptr->available() *
sizeof(T) : 0;
347 int availableForWrite()
override {
348 return callback_buffer_ptr->availableForWrite() *
sizeof(T);
351 virtual size_t write(
const uint8_t *data,
size_t len)
override {
352 if (active_limit == 0 && !active)
return 0;
355 if (active_limit > 0 && !active && available() >= active_limit) {
360 if (remove_oldest_data) {
361 int available_bytes =
362 callback_buffer_ptr->availableForWrite() *
sizeof(T);
363 if ((
int)len > available_bytes) {
364 int gap = len - available_bytes;
370 return callback_buffer_ptr->writeArray(data, len /
sizeof(T));
373 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
374 if (!active)
return 0;
375 return callback_buffer_ptr->readArray(data, len /
sizeof(T));
381 callback_buffer_ptr->reset();
386 operator bool() {
return active; }
390 size_t active_limit = 0;
392 bool remove_oldest_data;
398 template <
typename T>
399 using CallbackBufferedStream = QueueStream<T>;
412 this->data.resize(len);
413 memcpy(&data[0], inData, len);
430 this->default_buffer_size = defaultBufferSize;
435 audio_list.swap(ref.audio_list);
437 total_available=ref.total_available;
438 default_buffer_size = ref.default_buffer_size;
439 alloc_failed = ref.alloc_failed;;
440 is_loop = ref.is_loop;
447 temp_audio.resize(default_buffer_size);
464 ok = audio_list.pop_front(p_node);
472 alloc_failed =
false;
477 return total_available;
482 it = audio_list.begin();
485 virtual size_t write(
const uint8_t *data,
size_t len)
override {
488 alloc_failed =
false;
489 total_available += len;
490 audio_list.push_back(p_node);
493 if (it == audio_list.end()){
494 it = audio_list.begin();
503 virtual int availableForWrite()
override {
504 return alloc_failed ? 0 : default_buffer_size;
507 virtual int available()
override {
508 if (it == audio_list.end()){
510 if (it == audio_list.end()) {
517 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
524 if (it==audio_list.end()){
534 DataNode *p_node = *it;
535 int result_len = min(len, (
size_t) p_node->len);
536 memcpy(data, &p_node->data[0], result_len);
538 if (p_node->len>len){
539 uint8_t *start = &p_node->data[result_len];
540 int uprocessed_len = p_node->len - len;
548 List<DataNode*> &list() {
558 for (
int j=0;j<remove;j++){
560 audio_list.pop_front(node);
561 if (node!=
nullptr)
delete node;
563 audio_list.pop_back(node);
564 if (node!=
nullptr)
delete node;
570 auto first = *list().begin();
572 clean_start.convert(&(first->data[0]),first->len);
576 auto last = * (--(list().end()));
578 clean_end.convert(&(last->data[0]),last->len);
586 size_t total_available=0;
587 int default_buffer_size=DEFAULT_BUFFER_SIZE;
588 bool alloc_failed =
false;
590 bool is_loop =
false;