2#include "AudioTools/CoreAudio/Buffers.h"
3#include "AudioTools/CoreAudio/AudioTypes.h"
4#include "AudioTools/CoreAudio/BaseConverter.h"
10#ifdef USE_STREAM_WRITE_OVERRIDE
11#define STREAM_WRITE_OVERRIDE override
13#define STREAM_WRITE_OVERRIDE
16#ifdef USE_STREAM_READ_OVERRIDE
17#define STREAM_READ_OVERRIDE override
19#define STREAM_READ_OVERRIDE
22#ifdef USE_STREAM_READCHAR_OVERRIDE
23#define STREAM_READCHAR_OVERRIDE override
25#define STREAM_READCHAR_OVERRIDE
43 virtual bool begin(){
return true;}
46 virtual size_t readBytes(uint8_t *data,
47 size_t len) STREAM_READ_OVERRIDE = 0;
48 virtual size_t write(
const uint8_t *data,
size_t len)
override = 0;
50 virtual size_t write(uint8_t ch)
override {
51 tmp_out.resize(MAX_SINGLE_CHARS);
55 return tmp_out.
write(ch);
58 virtual int available()
override {
return DEFAULT_BUFFER_SIZE; };
60 virtual int availableForWrite()
override {
return DEFAULT_BUFFER_SIZE; }
62 virtual void flush()
override {
71 virtual size_t readBytes(
char *data,
size_t len) STREAM_READCHAR_OVERRIDE {
72 return readBytes((uint8_t *)data, len);
75 virtual int read()
override {
78 if (tmp_in.isEmpty())
return -1;
80 if (!tmp_in.
read(result))
return -1;
84 virtual int peek()
override {
87 if (tmp_in.isEmpty())
return -1;
89 if (!tmp_in.
peek(result))
return -1;
99 void refillReadBuffer() {
100 tmp_in.resize(DEFAULT_BUFFER_SIZE);
101 if (tmp_in.isEmpty()) {
103 const int len = tmp_in.
size();
105 int len_eff = readBytes(bytes, len);
130 if (info != newInfo){
137 out_new.logInfo(
"out:");
138 notifyAudioChange(out_new);
143 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
return not_supported(0,
"readBytes"); }
145 virtual size_t write(
const uint8_t *data,
size_t len)
override{
return not_supported(0,
"write"); }
148 virtual operator bool() {
return info && available() > 0; }
158 for (
int j=0;j<len/2;j++){
159 write((uint8_t*)&zero,2);
165 memset(buffer, 0, length);
172 virtual int not_supported(
int out,
const char *msg =
"") {
173 LOGE(
"AudioStream: %s unsupported operation!", msg);
196 void add(
Stream *stream) { input_streams.push_back(stream); }
197 void add(
Stream &stream) { input_streams.push_back(&stream); }
204 void end() { is_active =
false; }
206 int available()
override {
207 if (!is_active)
return 0;
211 return availableWithTimout();
214 size_t readBytes(uint8_t *data,
size_t len)
override {
215 if (!is_active)
return 0;
219 return p_current_stream->readBytes(data, len);
223 operator bool() {
return is_active && available() > 0; }
225 void setOnBeginCallback(
void (*callback)(
Stream *stream)) {
226 begin_callback = callback;
228 void setOnEndCallback(
void (*callback)(Stream *stream)) {
229 end_callback = callback;
236 size_t write(
const uint8_t *data,
size_t size)
override {
return 0;};
240 Stream *p_current_stream =
nullptr;
241 bool is_active =
false;
242 void (*begin_callback)(
Stream *stream) =
nullptr;
243 void (*end_callback)(
Stream *stream) =
nullptr;
244 uint_fast32_t _timeout = 0;
250 if (p_current_stream !=
nullptr && p_current_stream->available() > 0)
253 if ((p_current_stream ==
nullptr || availableWithTimout() == 0)) {
254 if (end_callback && p_current_stream) end_callback(p_current_stream);
255 if (!input_streams.empty()) {
256 LOGI(
"using next stream");
257 p_current_stream = input_streams[0];
258 input_streams.pop_front();
259 if (begin_callback && p_current_stream)
260 begin_callback(p_current_stream);
262 p_current_stream =
nullptr;
266 return p_current_stream !=
nullptr;
269 int availableWithTimout() {
270 int result = p_current_stream->available();
272 for (
int j = 0; j < _timeout / 10; j++) {
274 result = p_current_stream->available();
275 if (result != 0)
break;
291 size_t write(
const uint8_t *data,
size_t len)
override {
return len; }
293 size_t readBytes(uint8_t *data,
size_t len)
override {
294 memset(data, 0, len);
312 bool autoRemoveOldestDataIfFull =
false) {
314 callback_buffer_ptr =
new NBuffer<T>(bufferSize, bufferCount);
315 remove_oldest_data = autoRemoveOldestDataIfFull;
320 callback_buffer_ptr = &buffer;
325 delete callback_buffer_ptr;
338 virtual bool begin(
size_t activeWhenPercentFilled) {
341 size_t size = callback_buffer_ptr->size() *
sizeof(T);
343 active_limit = size * activeWhenPercentFilled / 100;
344 LOGI(
"activate after: %u bytes",(
unsigned)active_limit);
338 virtual bool begin(
size_t activeWhenPercentFilled) {
…}
354 int available()
override {
355 return active ? callback_buffer_ptr->available() *
sizeof(T) : 0;
358 int availableForWrite()
override {
359 if (!active && active_limit > 0)
return DEFAULT_BUFFER_SIZE;
360 return callback_buffer_ptr->availableForWrite() *
sizeof(T);
363 virtual size_t write(
const uint8_t *data,
size_t len)
override {
364 if (len == 0)
return 0;
365 if (active_limit == 0 && !active)
return 0;
368 total_written += len;
369 if (!active && active_limit > 0 && total_written >= active_limit) {
371 LOGI(
"setting active");
375 if (remove_oldest_data) {
376 int available_bytes =
377 callback_buffer_ptr->availableForWrite() *
sizeof(T);
378 if ((
int)len > available_bytes) {
379 int gap = len - available_bytes;
385 return callback_buffer_ptr->writeArray(data, len /
sizeof(T));
388 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
389 if (!active)
return 0;
390 return callback_buffer_ptr->readArray(data, len /
sizeof(T));
396 callback_buffer_ptr->reset();
401 operator bool() {
return active; }
405 size_t active_limit = 0;
406 size_t total_written = 0;
408 bool remove_oldest_data =
false;
409 bool owns_buffer =
false;
423 this->data.resize(len);
424 memcpy(&data[0], inData, len);
441 this->default_buffer_size = defaultBufferSize;
446 audio_list.swap(ref.audio_list);
448 total_available=ref.total_available;
449 default_buffer_size = ref.default_buffer_size;
450 alloc_failed = ref.alloc_failed;;
451 is_loop = ref.is_loop;
458 temp_audio.resize(default_buffer_size);
475 ok = audio_list.pop_front(p_node);
483 alloc_failed =
false;
488 return total_available;
493 it = audio_list.begin();
496 virtual size_t write(
const uint8_t *data,
size_t len)
override {
499 alloc_failed =
false;
500 total_available += len;
501 audio_list.push_back(p_node);
504 if (it == audio_list.end()){
505 it = audio_list.begin();
514 virtual int availableForWrite()
override {
515 return alloc_failed ? 0 : default_buffer_size;
518 virtual int available()
override {
519 if (it == audio_list.end()){
521 if (it == audio_list.end()) {
528 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
535 if (it==audio_list.end()){
545 DataNode *p_node = *it;
546 int result_len = min(len, (
size_t) p_node->len);
547 memcpy(data, &p_node->data[0], result_len);
549 if (p_node->len>len){
550 uint8_t *start = &p_node->data[result_len];
551 int uprocessed_len = p_node->len - len;
559 List<DataNode*> &list() {
569 for (
int j=0;j<remove;j++){
571 audio_list.pop_front(node);
572 if (node!=
nullptr)
delete node;
574 audio_list.pop_back(node);
575 if (node!=
nullptr)
delete node;
581 auto first = *list().begin();
583 clean_start.convert(&(first->data[0]),first->len);
587 auto last = * (--(list().end()));
589 clean_end.convert(&(last->data[0]),last->len);
597 size_t total_available=0;
598 int default_buffer_size=DEFAULT_BUFFER_SIZE;
599 bool alloc_failed =
false;
601 bool is_loop =
false;