2#include "AudioTools/CoreAudio/Buffers.h"
3#include "AudioTools/CoreAudio/AudioTypes.h"
4#include "AudioTools/CoreAudio/BaseConverter.h"
10#ifdef USE_STREAM_WRITE_OVERRIDE
11#define STREAM_WRITE_OVERRIDE override
13#define STREAM_WRITE_OVERRIDE
16#ifdef USE_STREAM_READ_OVERRIDE
17#define STREAM_READ_OVERRIDE override
19#define STREAM_READ_OVERRIDE
22#ifdef USE_STREAM_READCHAR_OVERRIDE
23#define STREAM_READCHAR_OVERRIDE override
25#define STREAM_READCHAR_OVERRIDE
43 virtual bool begin(){
return true;}
46 virtual size_t readBytes(uint8_t *data,
47 size_t len) STREAM_READ_OVERRIDE = 0;
48 virtual size_t write(
const uint8_t *data,
size_t len)
override = 0;
50 virtual size_t write(uint8_t ch)
override {
51 tmp_out.
resize(write_buffer_size);
55 return tmp_out.
write(ch);
58 virtual int available()
override {
return DEFAULT_BUFFER_SIZE; };
60 virtual int availableForWrite()
override {
return DEFAULT_BUFFER_SIZE; }
62 virtual void flush()
override {
71 virtual size_t readBytes(
char *data,
size_t len) STREAM_READCHAR_OVERRIDE {
72 return readBytes((uint8_t *)data, len);
75 virtual int read()
override {
78 if (tmp_in.isEmpty())
return -1;
80 if (!tmp_in.
read(result))
return -1;
84 virtual int peek()
override {
87 if (tmp_in.isEmpty())
return -1;
89 if (!tmp_in.
peek(result))
return -1;
95 void setWriteBufferSize(
int size) { write_buffer_size = size;}
100 int write_buffer_size = MAX_SINGLE_CHARS;
102 void refillReadBuffer() {
103 tmp_in.
resize(DEFAULT_BUFFER_SIZE);
104 if (tmp_in.isEmpty()) {
106 const int len = tmp_in.
size();
108 int len_eff = readBytes(bytes, len);
133 if (info != newInfo){
140 out_new.logInfo(
"out:");
141 notifyAudioChange(out_new);
146 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
return not_supported(0,
"readBytes"); }
148 virtual size_t write(
const uint8_t *data,
size_t len)
override{
return not_supported(0,
"write"); }
151 virtual operator bool() {
return info && available() > 0; }
161 for (
int j=0;j<len/2;j++){
162 write((uint8_t*)&zero,2);
168 memset(buffer, 0, length);
175 virtual int not_supported(
int out,
const char *msg =
"") {
176 LOGE(
"AudioStream: %s unsupported operation!", msg);
199 void add(
Stream *stream) { input_streams.push_back(stream); }
200 void add(
Stream &stream) { input_streams.push_back(&stream); }
202 bool begin()
override {
207 void end()
override { is_active =
false; }
209 int available()
override {
210 if (!is_active)
return 0;
214 return availableWithTimout();
217 size_t readBytes(uint8_t *data,
size_t len)
override {
218 if (!is_active)
return 0;
222 return p_current_stream->readBytes(data, len);
226 operator bool() {
return is_active && available() > 0; }
228 void setOnBeginCallback(
void (*callback)(
Stream *stream)) {
229 begin_callback = callback;
231 void setOnEndCallback(
void (*callback)(Stream *stream)) {
232 end_callback = callback;
239 size_t write(
const uint8_t *data,
size_t size)
override {
return 0;};
243 Stream *p_current_stream =
nullptr;
244 bool is_active =
false;
245 void (*begin_callback)(
Stream *stream) =
nullptr;
246 void (*end_callback)(
Stream *stream) =
nullptr;
253 if (p_current_stream !=
nullptr && p_current_stream->available() > 0)
256 if ((p_current_stream ==
nullptr || availableWithTimout() == 0)) {
257 if (end_callback && p_current_stream) end_callback(p_current_stream);
258 if (!input_streams.empty()) {
259 LOGI(
"using next stream");
260 p_current_stream = input_streams[0];
261 input_streams.pop_front();
262 if (begin_callback && p_current_stream)
263 begin_callback(p_current_stream);
265 p_current_stream =
nullptr;
269 return p_current_stream !=
nullptr;
272 int availableWithTimout() {
273 int result = p_current_stream->available();
275 for (
int j = 0; j < _timeout / 10; j++) {
277 result = p_current_stream->available();
278 if (result != 0)
break;
294 size_t write(
const uint8_t *data,
size_t len)
override {
return len; }
296 size_t readBytes(uint8_t *data,
size_t len)
override {
297 memset(data, 0, len);
317 bool autoRemoveOldestDataIfFull =
false) {
319 p_buffer =
new NBuffer<T>(bufferSize, bufferCount);
320 remove_oldest_data = autoRemoveOldestDataIfFull;
333 void setBuffer(BaseBuffer<T> &buffer){
347 virtual bool begin(
size_t activeWhenPercentFilled) {
350 size_t size = p_buffer->size() *
sizeof(T);
352 active_limit = size * activeWhenPercentFilled / 100;
353 LOGI(
"activate after: %u bytes",(
unsigned)active_limit);
358 virtual void end()
override {
363 int available()
override {
364 return active ? p_buffer->
available() *
sizeof(T) : 0;
367 int availableForWrite()
override {
368 if (!active && active_limit > 0)
return DEFAULT_BUFFER_SIZE;
372 virtual size_t write(
const uint8_t *data,
size_t len)
override {
373 if (len == 0)
return 0;
374 if (active_limit == 0 && !active)
return 0;
377 total_written += len;
378 if (!active && active_limit > 0 && total_written >= active_limit) {
380 LOGI(
"setting active");
384 if (remove_oldest_data) {
385 int available_bytes =
387 if ((
int)len > available_bytes) {
388 int gap = len - available_bytes;
394 return p_buffer->
writeArray(data, len /
sizeof(T));
397 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
398 if (!active)
return 0;
399 return p_buffer->
readArray(data, len /
sizeof(T));
402 int read()
override {
403 if (!active)
return -1;
405 if (!p_buffer->
read(result)) {
411 int peek()
override {
412 if (!active)
return -1;
414 if (p_buffer->
peek(result)) {
415 return *(
reinterpret_cast<uint8_t *
>(&result));
428 operator bool() {
return active; }
438 size_t active_limit = 0;
439 size_t total_written = 0;
441 bool remove_oldest_data =
false;
442 bool owns_buffer =
false;
456 this->data.resize(len);
457 memcpy(&data[0], inData, len);
473 DynamicMemoryStream(
bool isLoop,
int defaultBufferSize=DEFAULT_BUFFER_SIZE,
int maxRecords = 0 ) {
474 this->default_buffer_size = defaultBufferSize;
476 this->max_records = maxRecords;
480 audio_list.swap(ref.audio_list);
482 total_available=ref.total_available;
483 default_buffer_size = ref.default_buffer_size;
484 alloc_failed = ref.alloc_failed;;
485 is_loop = ref.is_loop;
492 temp_audio.
resize(default_buffer_size);
496 virtual void end()
override {
507 consume_on_read = consume;
514 ok = audio_list.pop_front(p_node);
523 alloc_failed =
false;
528 return total_available;
533 it = audio_list.begin();
537 virtual size_t write(
const uint8_t *data,
size_t len)
override {
538 int size = audio_list.size();
539 LOGI(
"write: %d / records: %d (max %d)",(
int)len, size ,max_records);
542 alloc_failed =
false;
543 total_available += len;
544 audio_list.push_back(p_node);
547 if (it == audio_list.end()){
548 it = audio_list.begin();
557 virtual int availableForWrite()
override {
559 if (max_records > 0 && audio_list.size() >= max_records) {
562 return alloc_failed ? 0 : default_buffer_size;
565 virtual int available()
override {
566 if (it == audio_list.end()){
568 if (it == audio_list.end()) {
575 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
578 size_t result = temp_audio.
readArray(data, len);
584 if (it==audio_list.end()){
594 DataNode *p_node = *it;
595 size_t node_len = p_node->len;
596 size_t result_len = node_len < len ? node_len : len;
597 if (result_len > 0) {
598 memcpy(data, &p_node->data[0], result_len);
601 if (node_len > result_len) {
602 size_t remainder_len = node_len - result_len;
603 temp_audio.
resize((
int)remainder_len);
604 uint8_t *start = &p_node->data[result_len];
608 if (consume_on_read) {
609 DataNode* removed =
nullptr;
610 bool ok = audio_list.pop_front(removed);
611 if (ok && removed !=
nullptr) {
612 if (total_available >= removed->len) {
613 total_available -= removed->len;
619 it = audio_list.begin();
623 read_pos += result_len;
627 List<DataNode*> &list() {
637 for (
int j=0;j<remove;j++){
639 audio_list.pop_front(node);
640 if (node!=
nullptr)
delete node;
642 audio_list.pop_back(node);
643 if (node!=
nullptr)
delete node;
649 auto first = *list().begin();
651 clean_start.convert(&(first->data[0]),first->len);
655 auto last = * (--(list().end()));
657 clean_end.convert(&(last->data[0]),last->len);
666 void setMaxRecords(
int max_records){
667 this->max_records = max_records;
672 List<DataNode*> audio_list;
673 List<DataNode*>::Iterator it = audio_list.end();
674 size_t total_available = 0;
676 int default_buffer_size=DEFAULT_BUFFER_SIZE;
678 bool alloc_failed =
false;
679 RingBuffer<uint8_t> temp_audio{0};
680 bool is_loop =
false;
681 bool consume_on_read =
false;