2 #include "AudioConfig.h"
3 #include "AudioTimer/AudioTimer.h"
4 #include "AudioTools/AudioTypes.h"
5 #include "AudioTools/Buffers.h"
6 #include "AudioTools/AudioLogger.h"
7 #include "AudioTools/BaseConverter.h"
8 #include "AudioEffects/SoundGenerator.h"
14 #ifdef USE_STREAM_WRITE_OVERRIDE
15 # define STREAM_WRITE_OVERRIDE override
17 # define STREAM_WRITE_OVERRIDE
20 #ifdef USE_STREAM_READ_OVERRIDE
21 # define STREAM_READ_OVERRIDE override
23 # define STREAM_READ_OVERRIDE
26 #ifdef USE_STREAM_READCHAR_OVERRIDE
27 # define STREAM_READCHAR_OVERRIDE override
29 # define STREAM_READCHAR_OVERRIDE
47 virtual bool begin(){
return true;}
61 out_new.logInfo(
"out:");
62 notifyAudioChange(out_new);
67 virtual size_t readBytes(uint8_t *buffer,
size_t length) STREAM_READ_OVERRIDE {
return not_supported(0,
"readBytes"); }
69 virtual size_t write(
const uint8_t *buffer,
size_t size)
override{
return not_supported(0,
"write"); }
71 virtual size_t write(uint8_t ch)
override {
72 tmp_out.resize(MAX_SINGLE_CHARS);
76 return tmp_out.
write(ch);
79 virtual int available()
override {
return DEFAULT_BUFFER_SIZE; };
82 operator bool() {
return available() > 0; }
88 virtual int availableForWrite()
override {
return DEFAULT_BUFFER_SIZE; }
90 virtual void flush()
override {
99 for (
int j=0;j<len/2;j++){
100 write((uint8_t*)&zero,2);
106 memset(buffer, 0, length);
114 virtual size_t readBytes(
char *buffer,
size_t length) STREAM_READCHAR_OVERRIDE {
115 return readBytes((uint8_t *)buffer, length);
118 virtual int read()
override {
120 return tmp_in.
read();
123 virtual int peek()
override {
125 return tmp_in.
peek();
133 RingBuffer<uint8_t> tmp_in{0};
134 RingBuffer<uint8_t> tmp_out{0};
137 virtual int not_supported(
int out,
const char* msg=
"") {
138 LOGE(
"AudioStream: %s unsupported operation!", msg);
144 void refillReadBuffer() {
145 tmp_in.resize(MAX_SINGLE_CHARS);
146 if (tmp_in.isEmpty()){
148 const int len = tmp_in.
size();
150 int len_eff = readBytes(bytes, len);
168 p_stream->setTimeout(clientTimeout);
171 virtual bool begin(){
return true;}
174 virtual size_t readBytes(uint8_t *buffer,
size_t length) {
177 return p_stream->readBytes(buffer, length);
180 int read() {
return p_stream->read(); }
182 int peek() {
return p_stream->peek(); }
184 int available() {
return p_stream->available(); }
186 virtual size_t write(uint8_t c) {
return p_stream->write(c); }
188 virtual size_t write(
const uint8_t *buffer,
size_t size) {
189 return p_stream->write(buffer, size);
192 virtual int availableForWrite() {
return p_stream->availableForWrite(); }
194 virtual void flush() { p_stream->flush(); }
198 int32_t clientTimeout = URL_CLIENT_TIMEOUT;
227 LOGD(
"MemoryStream: %d", buffer_size);
228 this->buffer_size = buffer_size;
229 this->memory_type = memoryType;
236 LOGD(
"MemoryStream: %d", buffer_size);
237 setValue(buffer, buffer_size, memoryType);
238 is_active = isActive;
249 setValue(source.buffer, source.buffer_size, source.memory_type);
251 source.setValue(
nullptr, 0, source.memory_type);
256 if (memoryCanChange() && buffer!=
nullptr) free(buffer);
274 write_pos = memoryCanChange() ? 0 : buffer_size;
275 if (this->buffer==
nullptr && memoryCanChange()){
283 virtual size_t write(uint8_t
byte)
override {
284 if (!is_active)
return 0;
285 if (memory_type == FLASH_RAM)
return 0;
286 if (buffer==
nullptr)
return 0;
288 if (write_pos < buffer_size) {
290 buffer[write_pos] = byte;
296 virtual size_t write(
const uint8_t *buffer,
size_t size)
override {
297 if (!is_active)
return 0;
298 if (memory_type == FLASH_RAM)
return 0;
300 for (
size_t j = 0; j < size; j++) {
301 if (!write(buffer[j])) {
309 virtual int available()
override {
310 if (!is_active)
return 0;
311 if (buffer==
nullptr)
return 0;
312 int result = write_pos - read_pos;
313 if (result<=0 && is_loop){
315 read_pos = rewind_pos;
316 result = write_pos - read_pos;
318 if (rewind!=
nullptr) rewind();
320 return is_loop ? DEFAULT_BUFFER_SIZE : result;
323 virtual int availableForWrite()
override {
324 if (!is_active)
return 0;
325 if (memory_type == FLASH_RAM)
return 0;
326 return buffer_size - write_pos;
329 virtual int read()
override {
337 virtual size_t readBytes(uint8_t *buffer,
size_t length)
override {
338 if (!is_active)
return 0;
340 while (count < length) {
349 virtual int peek()
override {
350 if (!is_active)
return -1;
352 if (available() > 0) {
353 result = buffer[read_pos];
358 virtual void flush()
override {}
360 virtual void end()
override {
366 virtual void clear(
bool reset =
false) {
367 if (memoryCanChange()){
370 if (buffer==
nullptr){
375 memset(buffer, 0, buffer_size);
379 LOGW(
"data is read only");
387 if (buffer!=
nullptr && buffer_size > 12){
388 if (memcmp(
"WAVE", buffer+8, 4)==0){
395 virtual void setLoop(
bool loop,
int rewindPos){
397 rewind_pos = rewindPos;
402 if (!memoryCanChange())
return false;
406 #if defined(ESP32) && defined(ARDUINO)
408 buffer = (buffer==
nullptr) ? (uint8_t*)ps_calloc(size,1) : (uint8_t*)ps_realloc(buffer, size);
412 buffer = (buffer==
nullptr) ? (uint8_t*)calloc(size,1) : (uint8_t*)realloc(buffer, size);
415 return buffer !=
nullptr;
418 virtual uint8_t* data(){
429 this->buffer_size = buffer_size;
431 this->write_pos = buffer_size;
432 this->buffer = (uint8_t *)buffer;
433 this->memory_type = memoryType;
441 uint8_t *buffer =
nullptr;
443 bool is_loop =
false;
444 void (*rewind)() =
nullptr;
445 bool is_active =
false;
447 bool memoryCanChange() {
448 return memory_type!=FLASH_RAM;
452 if (
this == &source)
return;
453 if (source.memory_type == FLASH_RAM){
454 setValue(source.buffer, source.buffer_size, source.memory_type);
456 setValue(
nullptr, source.buffer_size, source.memory_type);
458 memcpy(buffer, source.buffer, buffer_size);
474 uint8_t* data=
nullptr;
480 this->data = (uint8_t*) malloc(len);
481 assert(this->data!=
nullptr);
482 memcpy(this->data, inData, len);
493 DynamicMemoryStream() =
default;
495 DynamicMemoryStream(
bool isLoop,
int defaultBufferSize=DEFAULT_BUFFER_SIZE ) {
496 this->default_buffer_size = defaultBufferSize;
500 void assign(DynamicMemoryStream &ref){
501 audio_list.swap(ref.audio_list);
503 total_available=ref.total_available;
504 default_buffer_size = ref.default_buffer_size;
505 alloc_failed = ref.alloc_failed;;
506 is_loop = ref.is_loop;
513 temp_audio.resize(default_buffer_size);
517 virtual void end()
override {
530 ok = audio_list.pop_front(p_node);
538 alloc_failed =
false;
543 return total_available;
548 it = audio_list.begin();
551 virtual size_t write(
const uint8_t *buffer,
size_t size)
override {
552 DataNode *p_node =
new DataNode((
void*)buffer, size);
553 if (p_node->data!=
nullptr){
554 alloc_failed =
false;
555 total_available += size;
556 audio_list.push_back(p_node);
559 if (it == audio_list.end()){
560 it = audio_list.begin();
569 virtual int availableForWrite()
override {
570 return alloc_failed ? 0 : default_buffer_size;
573 virtual int available()
override {
574 if (it == audio_list.end()){
576 if (it == audio_list.end()) {
583 virtual size_t readBytes(uint8_t *buffer,
size_t length)
override {
586 return temp_audio.
readArray(buffer, length);
590 if (it==audio_list.end()){
600 DataNode *p_node = *it;
601 int result_len = min(length, (
size_t) p_node->len);
602 memcpy(buffer, p_node->data, result_len);
604 if (p_node->len>length){
605 uint8_t *start = p_node->data+result_len;
606 int uprocessed_len = p_node->len - length;
614 List<DataNode*> &list() {
624 for (
int j=0;j<remove;j++){
626 audio_list.pop_front(node);
627 if (node!=
nullptr)
delete node;
629 audio_list.pop_back(node);
630 if (node!=
nullptr)
delete node;
636 auto first = *list().begin();
638 clean_start.convert(first->data,first->len);
642 auto last = * (--(list().end()));
644 clean_end.convert(last->data,last->len);
652 size_t total_available=0;
653 int default_buffer_size=DEFAULT_BUFFER_SIZE;
654 bool alloc_failed =
false;
656 bool is_loop =
false;
682 this->generator_ptr = &generator;
685 AudioInfo defaultConfig() {
return this->generator_ptr->defaultConfig(); }
690 if (generator_ptr==
nullptr){
691 LOGE(
"%s",source_not_defined_error);
694 generator_ptr->begin();
695 notifyAudioChange(generator_ptr->audioInfo());
703 if (generator_ptr==
nullptr){
704 LOGE(
"%s",source_not_defined_error);
707 generator_ptr->begin(cfg);
708 notifyAudioChange(generator_ptr->audioInfo());
716 generator_ptr->end();
721 return generator_ptr->audioInfo();
725 virtual int available()
override {
return active ? DEFAULT_BUFFER_SIZE*2 : 0; }
728 size_t readBytes(uint8_t *buffer,
size_t length)
override {
729 if (!active)
return 0;
730 LOGD(
"GeneratedSoundStream::readBytes: %u", (
unsigned int)length);
731 return generator_ptr->readBytes(buffer, length);
734 bool isActive() {
return active && generator_ptr->isActive();}
736 operator bool() {
return isActive(); }
738 void flush()
override {}
742 SoundGenerator<T> *generator_ptr;
743 const char* source_not_defined_error =
"Source not defined";
759 buffer.resize(buffer_size);
765 buffer.resize(buffer_size);
771 buffer.resize(buffer_size);
777 void setStream(
Print &out){
787 if (buffer.isFull()) {
790 return buffer.write(c);
794 size_t write(
const uint8_t *data,
size_t len)
override {
795 LOGD(
"%s: %zu", LOG_METHOD, len);
797 for (
int j=0;j<len;j++){
798 result +=
write(data[j]);
806 if (buffer.available() > 0) {
807 writeExt(buffer.address(), buffer.available());
814 if (buffer.isEmpty()) {
817 return buffer.read();
822 if (buffer.isEmpty()) {
825 return buffer.peek();
829 size_t readBytes(uint8_t *data,
size_t length)
override {
830 if (buffer.isEmpty()) {
831 return readExt(data, length);
834 return buffer.readArray(data, length);
840 if (buffer.isEmpty()) {
843 return buffer.available();
851 Print* p_out =
nullptr;
856 size_t result = readExt(buffer.
address(), buffer.size());
860 virtual size_t writeExt(
const uint8_t *data,
size_t len) {
861 return p_out ==
nullptr ? 0 : p_out->write(data, len);
863 virtual size_t readExt(uint8_t *data,
size_t len) {
864 return p_in ==
nullptr ? 0 : p_in->readBytes(data, len);
888 size_t write(
const uint8_t *buffer,
size_t len)
override{
892 size_t readBytes(uint8_t *buffer,
size_t len)
override{
893 memset(buffer,0, len);
915 virtual int available()
override {
920 virtual int availableForWrite()
override {
924 virtual void flush()
override {}
925 virtual int peek()
override {
return buffer.
peek(); }
926 virtual int read()
override {
return buffer.
read(); }
928 virtual size_t readBytes(uint8_t *data,
size_t length)
override {
932 virtual size_t write(
const uint8_t *data,
size_t len)
override {
937 virtual size_t write(uint8_t c)
override {
return buffer.
write(c); }
939 void resize(
int size){
944 return buffer.size();
964 QueueStream(
int bufferSize,
int bufferCount,
bool autoRemoveOldestDataIfFull=
false)
967 callback_buffer_ptr =
new NBuffer<T>(bufferSize, bufferCount);
968 remove_oldest_data = autoRemoveOldestDataIfFull;
973 callback_buffer_ptr = &buffer;
978 delete callback_buffer_ptr;
990 virtual bool begin(
size_t activeWhenPercentFilled){
992 size_t size = callback_buffer_ptr->size() *
sizeof(T);
994 active_limit = size * activeWhenPercentFilled / 100;
999 virtual void end()
override {
1004 int available()
override {
1005 return active ? callback_buffer_ptr->available()*
sizeof(T) : 0;
1008 int availableForWrite()
override {
1009 return callback_buffer_ptr->availableForWrite()*
sizeof(T);
1012 virtual size_t write(
const uint8_t *data,
size_t len)
override {
1013 if (active_limit==0 && !active)
return 0;
1016 if (active_limit > 0 && !active && available() >= active_limit){
1017 this->active =
true;
1021 if (remove_oldest_data){
1022 int available_bytes = callback_buffer_ptr->availableForWrite()*
sizeof(T);
1023 if ((
int)len>available_bytes){
1024 int gap = len-available_bytes;
1026 readBytes(tmp, gap);
1030 return callback_buffer_ptr->writeArray(data, len /
sizeof(T));
1033 virtual size_t readBytes(uint8_t *data,
size_t len)
override {
1034 if (!active)
return 0;
1035 return callback_buffer_ptr->readArray(data, len /
sizeof(T));
1041 callback_buffer_ptr->reset();
1052 size_t active_limit = 0;
1054 bool remove_oldest_data;
1060 template <
typename T>
1061 using CallbackBufferedStream = QueueStream<T>;
1071 template<
typename T>
1078 setConverter(converter);
1082 setConverter(converter);
1087 setConverter(converter);
1106 virtual int availableForWrite() {
return p_out->availableForWrite(); }
1108 virtual size_t write(
const uint8_t *buffer,
size_t size) {
1109 size_t result = p_converter->convert((uint8_t *)buffer, size);
1111 size_t result_written = p_out->write(buffer, result);
1112 return size * result_written / result;
1117 size_t readBytes(uint8_t *data,
size_t length)
override {
1118 if (p_stream==
nullptr)
return 0;
1119 size_t result = p_stream->readBytes(data, length);
1120 return p_converter->convert(data, result);
1125 if (p_stream==
nullptr)
return 0;
1126 return p_stream->available();
1130 Stream *p_stream =
nullptr;
1131 Print *p_out =
nullptr;
1145 this->count = count;
1146 this->max_count = count;
1154 this->count = count;
1155 this->max_count = count;
1162 this->count = count;
1163 this->max_count = count;
1183 return measure(p_stream->readBytes(data, len));
1186 int available()
override {
1187 return p_stream->available();
1191 virtual size_t write(
const uint8_t *buffer,
size_t size)
override {
1192 return measure(p_print->write(buffer, size));
1197 return p_print->availableForWrite();
1202 return bytes_per_second;
1211 AudioStream::info = info;
1216 return AudioStream::begin();
1219 bool begin(AudioInfo info){
1224 void setFrameSize(
int size){
1231 Stream *p_stream=
nullptr;
1232 Print *p_print=
nullptr;
1233 uint32_t start_time;
1234 int total_bytes = 0;
1235 int bytes_per_second = 0;
1238 Print *p_logout=
nullptr;
1240 size_t measure(
size_t len) {
1245 uint32_t end_time =
millis();
1246 int time_diff = end_time - start_time;
1248 bytes_per_second = total_bytes / time_diff * 1000;
1252 start_time = end_time;
1258 void printResult() {
1261 snprintf(msg, 70,
"==> Bytes per second: %d", bytes_per_second);
1263 snprintf(msg, 70,
"==> Samples per second: %d", bytes_per_second/frame_size);
1265 if (p_logout!=
nullptr){
1266 p_logout->println(msg);
1281 size_t total_size = 0;
1304 p_info_from = &stream;
1308 return progress_info;
1325 void setPrint(Print &print){
1329 bool begin()
override {
1330 if (p_info_from!=
nullptr){
1333 return AudioStream::begin();
1343 progress_info = info;
1350 total_processed = 0;
1351 progress_info.total_size = len;
1356 return progress_info.total_size;
1361 return total_processed;
1366 return total_processed / byteRate();
1371 return progress_info.total_size;
1381 if (progress_info.total_size==0)
return 0;
1382 return 100.0 * total_processed / progress_info.total_size;
1387 if (p_stream==
nullptr)
return 0;
1388 return measure(p_stream->readBytes(data, len));
1391 int available()
override {
1392 if (p_stream==
nullptr)
return 0;
1393 return p_stream->available();
1397 virtual size_t write(
const uint8_t *buffer,
size_t size)
override {
1398 if (p_print==
nullptr)
return 0;
1399 return measure(p_print->write(buffer,
size));
1404 if (p_print==
nullptr)
return 0;
1405 return p_print->availableForWrite();
1410 Stream *p_stream=
nullptr;
1411 Print *p_print=
nullptr;
1413 size_t total_processed = 0;
1415 size_t measure(
size_t len) {
1416 total_processed += len;
1422 int byte_rate = info.sample_rate * info.bits_per_sample * info.channels / 8;
1424 LOGE(
"Audio Info not defined");
1443 int correction_us = 0;
1474 bool begin(ThrottleConfig cfg) {
1475 LOGI(
"begin sample_rate: %d, channels: %d, bits: %d", (
int) info.sample_rate,(
int) info.channels, (
int)info.bits_per_sample);
1481 bool begin(AudioInfo info) {
1482 LOGI(
"begin sample_rate: %d, channels: %d, bits: %d", (
int)info.sample_rate, (
int) info.channels, (
int)info.bits_per_sample);
1484 this->cfg.copyFrom(info);
1489 frame_size = cfg.bits_per_sample / 8 * cfg.channels;
1500 int availableForWrite() {
1502 return p_out->availableForWrite();
1504 return DEFAULT_BUFFER_SIZE;
1507 size_t write(
const uint8_t* data,
size_t len){
1508 size_t result = p_out->write(data, len);
1514 if (p_in==
nullptr)
return 0;
1515 return p_in->available();
1518 size_t readBytes(uint8_t* data,
size_t len){
1519 if (p_in==
nullptr) {
1523 size_t result = p_in->readBytes(data, len);
1529 void delayBytes(
size_t bytes) { delayFrames(bytes / frame_size); }
1532 void delayFrames(
size_t frames) {
1533 sum_frames += frames;
1534 uint64_t durationUsEff =
micros() - start_time;
1535 uint64_t durationUsToBe = getDelayUs(sum_frames);
1536 int64_t waitUs = durationUsToBe - durationUsEff + cfg.correction_us;
1537 LOGD(
"wait us: %ld",
static_cast<long>(waitUs));
1539 int64_t waitMs = waitUs / 1000;
1540 if (waitMs > 0)
delay(waitMs);
1543 LOGD(
"negative delay!")
1547 inline int64_t getDelayUs(uint64_t frames){
1548 return (frames * 1000000) / cfg.sample_rate;
1551 inline int64_t getDelayMs(uint64_t frames){
1552 return getDelayUs(frames) / 1000;
1555 inline int64_t getDelaySec(uint64_t frames){
1556 return getDelayUs(frames) / 1000000l;
1560 uint32_t start_time = 0;
1561 uint32_t sum_frames = 0;
1564 Print *p_out =
nullptr;
1565 Stream *p_in =
nullptr;
1577 template<
typename T>
1584 streams.push_back(&in);
1585 weights.push_back(weight);
1586 total_weights += weight;
1591 if (channel<
size()){
1592 streams[channel] = ∈
1594 LOGE(
"Invalid channel %d - max is %d", channel,
size()-1);
1601 LOGI(
"frame_size: %d",frame_size);
1602 return frame_size>0;
1607 if (channel<
size()){
1608 weights[channel] = weight;
1610 for (
int j=0;j<weights.size();j++){
1611 total += weights[j];
1613 total_weights = total;
1615 LOGE(
"Invalid channel %d - max is %d", channel,
size()-1);
1623 result_vect.clear();
1624 current_vect.clear();
1625 total_weights = 0.0;
1630 return streams.size();
1635 if (total_weights==0 || frame_size==0 || len==0) {
1636 LOGW(
"readBytes: %d",(
int)len);
1640 if (limit_available_data){
1648 result_len = len * frame_size / frame_size;
1658 limit_available_data = flag;
1663 retry_count = retry;
1668 Vector<int> weights{0};
1669 int total_weights = 0;
1671 bool limit_available_data =
false;
1672 int retry_count = 5;
1673 Vector<int> result_vect;
1674 Vector<T> current_vect;
1678 int samples = byteCount /
sizeof(T);
1679 result_vect.resize(samples);
1680 current_vect.resize(samples);
1681 int stream_count =
size();
1683 int samples_eff_max = 0;
1684 for (
int j=0;j<stream_count;j++){
1686 int samples_eff =
readSamples(streams[j],current_vect.data(), samples, retry_count);
1687 if (samples_eff > samples_eff_max)
1688 samples_eff_max = samples_eff;
1690 float factor = total_weights == 0.0f ? 0.0f :
static_cast<float>(weights[j]) / total_weights;
1695 for (
int j=0;j<samples;j++){
1696 p_data[j] = result_vect[j];
1698 return samples_eff_max *
sizeof(T);
1703 int result = DEFAULT_BUFFER_SIZE;
1704 for (
int j=0;j<
size();j++){
1705 result = min(result, streams[j]->available());
1710 void resultAdd(
float fact){
1711 for (
int j=0;j<current_vect.size();j++){
1712 current_vect[j]*=fact;
1713 result_vect[j] += current_vect[j];
1718 memset(result_vect.data(), 0,
sizeof(
int)*result_vect.size());
1732 template<
typename T>
1749 LOGW(
"channels corrected to %d",
size());
1755 virtual bool begin() {
1758 return AudioStream::begin();
1763 LOGD(
"readBytes: %d",(
int)len);
1764 T *p_data = (T*) data;
1766 int sample_count = result_len /
sizeof(T);
1767 int size_value =
size();
1769 for (
int j=0;j<sample_count; j++){
1770 for (
int i=0; i<size_value; i++){
1771 p_data[result_idx++] = weights[i] * readSample<T>(streams[i]);
1774 return result_idx*
sizeof(T);
1779 streams.push_back(&in);
1780 weights.push_back(weight);
1785 if (channel<
size()){
1786 weights[channel] = weight;
1788 LOGE(
"Invalid channel %d - max is %d", channel,
size()-1);
1800 return streams.size();
1805 int result = streams[0]->available();
1806 for (
int j=1;j<
size();j++){
1807 int tmp = streams[j]->available();
1817 Vector<float> weights{10};
1836 setUpdateCallback(cb_update);
1842 setUpdateCallback(cb_update);
1845 CallbackStream(
size_t (*cb_read)(uint8_t* data,
size_t len),
size_t (*cb_write)(
const uint8_t* data,
size_t len)) {
1846 setWriteCallback(cb_write);
1847 setReadCallback(cb_read);
1850 void setWriteCallback(
size_t (*cb_write)(
const uint8_t* data,
size_t len)){
1851 this->cb_write = cb_write;
1854 void setReadCallback(
size_t (*cb_read)(uint8_t* data,
size_t len)){
1855 this->cb_read = cb_read;
1858 void setUpdateCallback(
size_t (*cb_update)(uint8_t* data,
size_t len)){
1859 this->cb_update = cb_update;
1863 void setAvailableCallback(
int (*cb)()){
1864 this->cb_available = cb;
1867 virtual bool begin(AudioInfo info) {
1872 virtual bool begin()
override {
1876 void end()
override { active =
false;}
1878 int available()
override {
1879 int result = AudioStream::available();
1881 if (available_bytes>=0)
1882 return available_bytes;
1884 if (cb_available==
nullptr)
1887 int tmp_available = cb_available();
1888 if (tmp_available < 0)
1891 return tmp_available;
1894 size_t readBytes(uint8_t* data,
size_t len)
override {
1895 if (!active)
return 0;
1898 return cb_read(data, len);
1903 result = p_stream->readBytes(data , len);
1906 result = cb_update(data, result);
1911 size_t write(
const uint8_t* data,
size_t len)
override {
1912 if (!active)
return 0;
1915 return cb_write(data, len);
1919 size_t result = len;
1921 result = cb_update((uint8_t*)data, len);
1923 return p_out->write(data, result);
1953 available_bytes = val;
1960 size_t (*cb_write)(
const uint8_t* data,
size_t len) =
nullptr;
1961 size_t (*cb_read)(uint8_t* data,
size_t len) =
nullptr;
1962 size_t (*cb_update)(uint8_t* data,
size_t len) =
nullptr;
1963 int (*cb_available)() =
nullptr;
1964 Stream *p_stream =
nullptr;
1965 Print *p_out =
nullptr;
1966 int available_bytes = -1;
1983 void add(
Stream *stream){
1984 input_streams.push_back(stream);
1986 void add(
Stream &stream){
1987 input_streams.push_back(&stream);
1990 bool begin()
override {
1992 return AudioStream::begin();
1995 void end()
override {
1997 return AudioStream::end();
2000 int available()
override {
2001 if (!is_active)
return 0;
2005 return availableWithTimout();
2008 size_t readBytes(uint8_t* data,
size_t len)
override {
2009 if (!is_active)
return 0;
2013 return p_current_stream->readBytes(data, len);
2018 return is_active && available()>0;
2021 void setOnBeginCallback(
void (*callback)(
Stream* stream) ){
2022 begin_callback = callback;
2024 void setOnEndCallback(
void (*callback)(Stream* stream) ){
2025 end_callback = callback;
2029 Vector<Stream*> input_streams;
2030 Stream *p_current_stream =
nullptr;
2031 bool is_active =
false;
2032 void (*begin_callback)(Stream* stream) =
nullptr;
2033 void (*end_callback)(Stream* stream) =
nullptr;
2038 if (p_current_stream!=
nullptr && p_current_stream->available()>0)
return true;
2040 if ((p_current_stream==
nullptr || availableWithTimout()==0)){
2041 if (end_callback && p_current_stream) end_callback(p_current_stream);
2042 if (!input_streams.empty()) {
2043 LOGI(
"using next stream");
2044 p_current_stream = input_streams[0];
2045 input_streams.pop_front();
2046 if (begin_callback && p_current_stream) begin_callback(p_current_stream);
2048 p_current_stream =
nullptr;
2052 return p_current_stream!=
nullptr;
2055 int availableWithTimout(){
2056 int result = p_current_stream->available();
2058 for (
int j=0; j <_timeout/10;j++){
2060 result = p_current_stream->available();
2061 if (result!=0)
break;
2076 template<
typename T,
class TF>
2084 this->channels = channels;
2092 this->channels = channels;
2109 if (p_converter !=
nullptr && info.
channels!=channels){
2110 LOGE(
"Inconsistent number of channels");
2116 bool begin()
override {
2118 LOGE(
"channels must not be 0");
2121 if (p_converter==
nullptr){
2122 p_converter =
new ConverterNChannels<T,TF>(channels);
2124 return AudioStream::begin();
2127 virtual size_t write(
const uint8_t *buffer,
size_t size)
override {
2128 if (p_converter==
nullptr)
return 0;
2129 size_t result = p_converter->convert((uint8_t *)buffer, size);
2130 return p_print->write(buffer, result);
2133 size_t readBytes(uint8_t *data,
size_t length)
override {
2134 if (p_converter==
nullptr)
return 0;
2135 if (p_stream==
nullptr)
return 0;
2136 size_t result = p_stream->readBytes(data, length);
2137 result = p_converter->convert(data, result);
2141 virtual int available()
override {
2142 if (p_stream==
nullptr)
return 0;
2143 return p_stream->available();
2146 virtual int availableForWrite()
override {
2147 return p_print->availableForWrite();
2153 if (p_converter!=
nullptr){
2154 p_converter->
setFilter(channel, filter);
2156 LOGE(
"p_converter is null");
2168 Stream *p_stream =
nullptr;
2169 Print *p_print =
nullptr;
2182 uint16_t buffer_size = DEFAULT_BUFFER_SIZE;
2183 bool use_timer =
true;
2185 TimerFunction timer_function = DirectTimerCallback;
2186 bool adapt_sample_rate =
false;
2187 uint16_t (*callback)(uint8_t *data, uint16_t len) =
nullptr;
2191 static void timerCallback(
void *obj);
2202 friend void timerCallback(
void *obj);
2209 if (timer !=
nullptr)
delete timer;
2210 if (buffer !=
nullptr)
delete buffer;
2211 if (frame !=
nullptr)
delete[] frame;
2225 bool do_restart = active;
2226 if (do_restart)
end();
2230 if (do_restart)
begin(cfg);
2239 LOGD(
"%s: %s", LOG_METHOD,
2240 config.rx_tx_mode == RX_MODE ?
"RX_MODE" :
"TX_MODE");
2242 this->frameCallback = config.callback;
2243 if (cfg.use_timer) {
2244 frameSize = cfg.bits_per_sample * cfg.channels / 8;
2245 frame =
new uint8_t[frameSize];
2248 timer->setTimerFunction(cfg.timer_function);
2249 if (cfg.timer_id>=0){
2250 timer->setTimer(cfg.timer_id);
2253 LOGI(
"sample_rate: %u -> time: %u milliseconds", (
unsigned int)cfg.sample_rate, (
unsigned int)time);
2254 timer->setCallbackParameter(
this);
2255 timer->begin(timerCallback, time, TimeUnit::US);
2258 notifyAudioChange(cfg);
2265 if (this->frameCallback !=
nullptr) {
2266 if (cfg.use_timer) {
2267 timer->begin(timerCallback, time, TimeUnit::US);
2277 if (cfg.use_timer) {
2288 bool active =
false;
2289 uint16_t (*frameCallback)(uint8_t *data, uint16_t len);
2293 uint8_t *frame =
nullptr;
2294 uint16_t frameSize = 0;
2296 unsigned long lastTimestamp = 0u;
2297 uint32_t currentRateValue = 0;
2298 uint32_t printCount = 0;
2301 virtual size_t writeExt(
const uint8_t *data,
size_t len)
override {
2302 if (!active)
return 0;
2305 if (!cfg.use_timer) {
2306 result = frameCallback((uint8_t *)data, len);
2308 result = buffer->
writeArray((uint8_t *)data, len);
2315 virtual size_t readExt(uint8_t *data,
size_t len)
override {
2316 if (!active)
return 0;
2320 if (!cfg.use_timer) {
2321 result = frameCallback(data, len);
2331 unsigned long ms =
millis();
2332 if (lastTimestamp > 0u) {
2333 uint32_t diff = ms - lastTimestamp;
2335 uint16_t rate = 1 * 1000 / diff;
2337 if (currentRateValue == 0) {
2338 currentRateValue = rate;
2340 currentRateValue = (currentRateValue + rate) / 2;
2349 LOGI(
"effective sample rate: %u", (
unsigned int)currentRateValue);
2350 if (cfg.adapt_sample_rate &&
2351 abs((
int)currentRateValue - (
int)cfg.
sample_rate) > 200) {
2353 notifyAudioChange(cfg);
2359 void IRAM_ATTR timerCallback(
void *obj) {
2360 TimerCallbackAudioStream *src = (TimerCallbackAudioStream *)obj;
2361 if (src !=
nullptr) {
2364 if (src->cfg.rx_tx_mode == RX_MODE) {
2366 uint16_t available_bytes = src->frameCallback(src->frame, src->frameSize);
2367 uint16_t buffer_available = src->buffer->availableForWrite();
2368 if (buffer_available < available_bytes) {
2370 uint16_t to_clear = available_bytes - buffer_available;
2371 uint8_t tmp[to_clear];
2372 src->buffer->readArray(tmp, to_clear);
2374 if (src->buffer->writeArray(src->frame, available_bytes) !=
2380 if (src->buffer !=
nullptr && src->frame !=
nullptr &&
2381 src->frameSize > 0) {
2382 uint16_t available_bytes =
2383 src->buffer->readArray(src->frame, src->frameSize);
2384 if (available_bytes !=
2385 src->frameCallback(src->frame, available_bytes)) {
2386 LOGE(
"data underflow");
2390 src->measureSampleRate();
MemoryType
Memory types.
Definition: AudioTypes.h:33
RxTxMode
The Microcontroller is the Audio Source (TX_MODE) or Audio Sink (RX_MODE). RXTX_MODE is Source and Si...
Definition: AudioTypes.h:26