2#include "AudioTools/CoreAudio/Buffers.h"
3#include "AudioTools/CoreAudio/AudioTypes.h"
4#include "AudioTools/CoreAudio/BaseConverter.h"
10#ifdef USE_STREAM_WRITE_OVERRIDE
11#define STREAM_WRITE_OVERRIDE override
13#define STREAM_WRITE_OVERRIDE
16#ifdef USE_STREAM_READ_OVERRIDE
17#define STREAM_READ_OVERRIDE override
19#define STREAM_READ_OVERRIDE
22#ifdef USE_STREAM_READCHAR_OVERRIDE
23#define STREAM_READCHAR_OVERRIDE override
25#define STREAM_READCHAR_OVERRIDE
43 virtual bool begin(){
return true;}
46 virtual size_t readBytes(uint8_t *data,
47 size_t len) STREAM_READ_OVERRIDE = 0;
48 virtual size_t write(
const uint8_t *data,
size_t len)
override = 0;
50 virtual size_t write(uint8_t ch)
override {
51 tmp_out.
resize(write_buffer_size);
55 return tmp_out.
write(ch);
58 virtual int available()
override {
return DEFAULT_BUFFER_SIZE; };
60 virtual int availableForWrite()
override {
return DEFAULT_BUFFER_SIZE; }
62 virtual void flush()
override {
71 virtual size_t readBytes(
char *data,
size_t len) STREAM_READCHAR_OVERRIDE {
72 return readBytes((uint8_t *)data, len);
75 virtual int read()
override {
78 if (tmp_in.isEmpty())
return -1;
80 if (!tmp_in.
read(result))
return -1;
84 virtual int peek()
override {
87 if (tmp_in.isEmpty())
return -1;
89 if (!tmp_in.
peek(result))
return -1;
95 void setWriteBufferSize(
int size) { write_buffer_size = size;}
100 int write_buffer_size = MAX_SINGLE_CHARS;
102 void refillReadBuffer() {
103 tmp_in.
resize(DEFAULT_BUFFER_SIZE);
104 if (tmp_in.isEmpty()) {
106 const int len = tmp_in.
size();
108 int len_eff = readBytes(bytes, len);
133 if (info != newInfo){
140 out_new.logInfo(
"out:");
141 notifyAudioChange(out_new);
146 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
return not_supported(0,
"readBytes"); }
148 virtual size_t write(
const uint8_t *data,
size_t len)
override{
return not_supported(0,
"write"); }
151 virtual operator bool() {
return info && available() > 0; }
161 for (
int j=0;j<len/2;j++){
162 write((uint8_t*)&zero,2);
168 memset(buffer, 0, length);
175 virtual int not_supported(
int out,
const char *msg =
"") {
176 LOGE(
"AudioStream: %s unsupported operation!", msg);
199 void add(
Stream *stream) { input_streams.push_back(stream); }
200 void add(
Stream &stream) { input_streams.push_back(&stream); }
202 bool begin()
override {
207 void end()
override { is_active =
false; }
209 int available()
override {
210 if (!is_active)
return 0;
214 return availableWithTimout();
217 size_t readBytes(uint8_t *data,
size_t len)
override {
218 if (!is_active)
return 0;
222 return p_current_stream->readBytes(data, len);
226 operator bool() {
return is_active && available() > 0; }
228 void setOnBeginCallback(
void (*callback)(
Stream *stream)) {
229 begin_callback = callback;
231 void setOnEndCallback(
void (*callback)(Stream *stream)) {
232 end_callback = callback;
239 size_t write(
const uint8_t *data,
size_t size)
override {
return 0;};
243 Stream *p_current_stream =
nullptr;
244 bool is_active =
false;
245 void (*begin_callback)(
Stream *stream) =
nullptr;
246 void (*end_callback)(
Stream *stream) =
nullptr;
253 if (p_current_stream !=
nullptr && p_current_stream->available() > 0)
256 if ((p_current_stream ==
nullptr || availableWithTimout() == 0)) {
257 if (end_callback && p_current_stream) end_callback(p_current_stream);
258 if (!input_streams.empty()) {
259 LOGI(
"using next stream");
260 p_current_stream = input_streams[0];
261 input_streams.pop_front();
262 if (begin_callback && p_current_stream)
263 begin_callback(p_current_stream);
265 p_current_stream =
nullptr;
269 return p_current_stream !=
nullptr;
272 int availableWithTimout() {
273 int result = p_current_stream->available();
275 for (
int j = 0; j < _timeout / 10; j++) {
277 result = p_current_stream->available();
278 if (result != 0)
break;
294 size_t write(
const uint8_t *data,
size_t len)
override {
return len; }
296 size_t readBytes(uint8_t *data,
size_t len)
override {
297 memset(data, 0, len);
317 bool autoRemoveOldestDataIfFull =
false) {
319 p_buffer =
new NBuffer<T>(bufferSize, bufferCount);
320 remove_oldest_data = autoRemoveOldestDataIfFull;
333 void setBuffer(BaseBuffer<T> &buffer){
347 virtual bool begin(
size_t activeWhenPercentFilled) {
350 size_t size = p_buffer->size() *
sizeof(T);
352 active_limit = size * activeWhenPercentFilled / 100;
353 LOGI(
"activate after: %u bytes",(
unsigned)active_limit);
347 virtual bool begin(
size_t activeWhenPercentFilled) {
…}
358 virtual void end()
override {
358 virtual void end()
override {
…}
363 int available()
override {
364 return active ? p_buffer->available() *
sizeof(T) : 0;
367 int availableForWrite()
override {
368 if (!active && active_limit > 0)
return DEFAULT_BUFFER_SIZE;
369 return p_buffer->availableForWrite() *
sizeof(T);
372 virtual size_t write(
const uint8_t *data,
size_t len)
override {
373 if (len == 0)
return 0;
374 if (active_limit == 0 && !active)
return 0;
377 total_written += len;
378 if (!active && active_limit > 0 && total_written >= active_limit) {
380 LOGI(
"setting active");
384 if (remove_oldest_data) {
385 int available_bytes =
386 p_buffer->availableForWrite() *
sizeof(T);
387 if ((
int)len > available_bytes) {
388 int gap = len - available_bytes;
394 return p_buffer->writeArray(data, len /
sizeof(T));
397 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
398 if (!active)
return 0;
399 return p_buffer->readArray(data, len /
sizeof(T));
402 int read()
override {
403 if (!active)
return -1;
405 if (!p_buffer->read(result)) {
411 int peek()
override {
412 if (!active)
return -1;
414 if (p_buffer->peek(result)) {
415 return *(
reinterpret_cast<uint8_t *
>(&result));
428 operator bool() {
return active; }
435 size_t active_limit = 0;
436 size_t total_written = 0;
438 bool remove_oldest_data =
false;
439 bool owns_buffer =
false;
453 this->data.resize(len);
454 memcpy(&data[0], inData, len);
471 this->default_buffer_size = defaultBufferSize;
476 audio_list.swap(ref.audio_list);
478 total_available=ref.total_available;
479 default_buffer_size = ref.default_buffer_size;
480 alloc_failed = ref.alloc_failed;;
481 is_loop = ref.is_loop;
488 temp_audio.
resize(default_buffer_size);
492 virtual void end()
override {
505 ok = audio_list.pop_front(p_node);
514 alloc_failed =
false;
519 return total_available;
524 it = audio_list.begin();
528 virtual size_t write(
const uint8_t *data,
size_t len)
override {
531 alloc_failed =
false;
532 total_available += len;
533 audio_list.push_back(p_node);
536 if (it == audio_list.end()){
537 it = audio_list.begin();
546 virtual int availableForWrite()
override {
547 return alloc_failed ? 0 : default_buffer_size;
550 virtual int available()
override {
551 if (it == audio_list.end()){
553 if (it == audio_list.end()) {
560 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
563 size_t result = temp_audio.
readArray(data, len);
569 if (it==audio_list.end()){
579 DataNode *p_node = *it;
580 int result_len = min(len, (
size_t) p_node->len);
581 memcpy(data, &p_node->data[0], result_len);
583 if (p_node->len>len){
584 uint8_t *start = &p_node->data[result_len];
585 int uprocessed_len = p_node->len - len;
590 read_pos += result_len;
594 List<DataNode*> &list() {
604 for (
int j=0;j<remove;j++){
606 audio_list.pop_front(node);
607 if (node!=
nullptr)
delete node;
609 audio_list.pop_back(node);
610 if (node!=
nullptr)
delete node;
616 auto first = *list().begin();
618 clean_start.convert(&(first->data[0]),first->len);
622 auto last = * (--(list().end()));
624 clean_end.convert(&(last->data[0]),last->len);
637 size_t total_available = 0;
639 int default_buffer_size=DEFAULT_BUFFER_SIZE;
640 bool alloc_failed =
false;
642 bool is_loop =
false;