2 #include "AudioTools/CoreAudio/Buffers.h"
8 #ifdef USE_STREAM_WRITE_OVERRIDE
9 #define STREAM_WRITE_OVERRIDE override
11 #define STREAM_WRITE_OVERRIDE
14 #ifdef USE_STREAM_READ_OVERRIDE
15 #define STREAM_READ_OVERRIDE override
17 #define STREAM_READ_OVERRIDE
20 #ifdef USE_STREAM_READCHAR_OVERRIDE
21 #define STREAM_READCHAR_OVERRIDE override
23 #define STREAM_READCHAR_OVERRIDE
41 virtual bool begin(){
return true;}
44 virtual size_t readBytes(uint8_t *data,
45 size_t len) STREAM_READ_OVERRIDE = 0;
46 virtual size_t write(
const uint8_t *data,
size_t len)
override = 0;
48 virtual size_t write(uint8_t ch)
override {
49 tmp_out.resize(MAX_SINGLE_CHARS);
53 return tmp_out.
write(ch);
56 virtual int available()
override {
return DEFAULT_BUFFER_SIZE; };
58 virtual int availableForWrite()
override {
return DEFAULT_BUFFER_SIZE; }
60 virtual void flush()
override {
69 virtual size_t readBytes(
char *data,
size_t len) STREAM_READCHAR_OVERRIDE {
70 return readBytes((uint8_t *)data, len);
73 virtual int read()
override {
78 virtual int peek()
override {
89 void refillReadBuffer() {
90 tmp_in.resize(MAX_SINGLE_CHARS);
91 if (tmp_in.isEmpty()) {
93 const int len = tmp_in.
size();
95 int len_eff = readBytes(bytes, len);
120 if (info != newInfo){
127 out_new.logInfo(
"out:");
128 notifyAudioChange(out_new);
133 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
return not_supported(0,
"readBytes"); }
135 virtual size_t write(
const uint8_t *data,
size_t len)
override{
return not_supported(0,
"write"); }
138 virtual operator bool() {
return info && available() > 0; }
148 for (
int j=0;j<len/2;j++){
149 write((uint8_t*)&zero,2);
155 memset(buffer, 0, length);
162 virtual int not_supported(
int out,
const char *msg =
"") {
163 LOGE(
"AudioStream: %s unsupported operation!", msg);
186 void add(
Stream *stream) { input_streams.push_back(stream); }
187 void add(
Stream &stream) { input_streams.push_back(&stream); }
194 void end() { is_active =
false; }
196 int available()
override {
197 if (!is_active)
return 0;
201 return availableWithTimout();
204 size_t readBytes(uint8_t *data,
size_t len)
override {
205 if (!is_active)
return 0;
209 return p_current_stream->readBytes(data, len);
213 operator bool() {
return is_active && available() > 0; }
215 void setOnBeginCallback(
void (*callback)(
Stream *stream)) {
216 begin_callback = callback;
218 void setOnEndCallback(
void (*callback)(Stream *stream)) {
219 end_callback = callback;
221 void setTimeout(uint32_t t) { _timeout = t; }
224 size_t write(
const uint8_t *data,
size_t size)
override {
return 0;};
228 Stream *p_current_stream =
nullptr;
229 bool is_active =
false;
230 void (*begin_callback)(
Stream *stream) =
nullptr;
231 void (*end_callback)(
Stream *stream) =
nullptr;
232 uint_fast32_t _timeout = 0;
238 if (p_current_stream !=
nullptr && p_current_stream->available() > 0)
241 if ((p_current_stream ==
nullptr || availableWithTimout() == 0)) {
242 if (end_callback && p_current_stream) end_callback(p_current_stream);
243 if (!input_streams.empty()) {
244 LOGI(
"using next stream");
245 p_current_stream = input_streams[0];
246 input_streams.pop_front();
247 if (begin_callback && p_current_stream)
248 begin_callback(p_current_stream);
250 p_current_stream =
nullptr;
254 return p_current_stream !=
nullptr;
257 int availableWithTimout() {
258 int result = p_current_stream->available();
260 for (
int j = 0; j < _timeout / 10; j++) {
262 result = p_current_stream->available();
263 if (result != 0)
break;
279 size_t write(
const uint8_t *data,
size_t len)
override {
return len; }
281 size_t readBytes(uint8_t *data,
size_t len)
override {
282 memset(data, 0, len);
300 bool autoRemoveOldestDataIfFull =
false) {
302 callback_buffer_ptr =
new NBuffer<T>(bufferSize, bufferCount);
303 remove_oldest_data = autoRemoveOldestDataIfFull;
308 callback_buffer_ptr = &buffer;
313 delete callback_buffer_ptr;
325 virtual bool begin(
size_t activeWhenPercentFilled) {
327 size_t size = callback_buffer_ptr->size() *
sizeof(T);
329 active_limit = size * activeWhenPercentFilled / 100;
339 int available()
override {
340 return active ? callback_buffer_ptr->available() *
sizeof(T) : 0;
343 int availableForWrite()
override {
344 return callback_buffer_ptr->availableForWrite() *
sizeof(T);
347 virtual size_t write(
const uint8_t *data,
size_t len)
override {
348 if (active_limit == 0 && !active)
return 0;
351 if (active_limit > 0 && !active && available() >= active_limit) {
356 if (remove_oldest_data) {
357 int available_bytes =
358 callback_buffer_ptr->availableForWrite() *
sizeof(T);
359 if ((
int)len > available_bytes) {
360 int gap = len - available_bytes;
366 return callback_buffer_ptr->writeArray(data, len /
sizeof(T));
369 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
370 if (!active)
return 0;
371 return callback_buffer_ptr->readArray(data, len /
sizeof(T));
377 callback_buffer_ptr->reset();
382 operator bool() {
return active; }
386 size_t active_limit = 0;
388 bool remove_oldest_data;
394 template <
typename T>
395 using CallbackBufferedStream = QueueStream<T>;
408 this->data.resize(len);
409 memcpy(&data[0], inData, len);
426 this->default_buffer_size = defaultBufferSize;
431 audio_list.swap(ref.audio_list);
433 total_available=ref.total_available;
434 default_buffer_size = ref.default_buffer_size;
435 alloc_failed = ref.alloc_failed;;
436 is_loop = ref.is_loop;
443 temp_audio.resize(default_buffer_size);
460 ok = audio_list.pop_front(p_node);
468 alloc_failed =
false;
473 return total_available;
478 it = audio_list.begin();
481 virtual size_t write(
const uint8_t *data,
size_t len)
override {
484 alloc_failed =
false;
485 total_available += len;
486 audio_list.push_back(p_node);
489 if (it == audio_list.end()){
490 it = audio_list.begin();
499 virtual int availableForWrite()
override {
500 return alloc_failed ? 0 : default_buffer_size;
503 virtual int available()
override {
504 if (it == audio_list.end()){
506 if (it == audio_list.end()) {
513 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
520 if (it==audio_list.end()){
530 DataNode *p_node = *it;
531 int result_len = min(len, (
size_t) p_node->len);
532 memcpy(data, &p_node->data[0], result_len);
534 if (p_node->len>len){
535 uint8_t *start = &p_node->data[result_len];
536 int uprocessed_len = p_node->len - len;
544 List<DataNode*> &list() {
554 for (
int j=0;j<remove;j++){
556 audio_list.pop_front(node);
557 if (node!=
nullptr)
delete node;
559 audio_list.pop_back(node);
560 if (node!=
nullptr)
delete node;
566 auto first = *list().begin();
568 clean_start.convert(&(first->data[0]),first->len);
572 auto last = * (--(list().end()));
574 clean_end.convert(&(last->data[0]),last->len);
582 size_t total_available=0;
583 int default_buffer_size=DEFAULT_BUFFER_SIZE;
584 bool alloc_failed =
false;
586 bool is_loop =
false;