3#include "AudioTools/CoreAudio/AudioBasic/Collections.h"
4#include "AudioTools/CoreAudio/AudioBasic/Str.h"
5#include "AudioTools/CoreAudio/AudioLogger.h"
30 virtual bool read(T &result) = 0;
34 if (data ==
nullptr) {
39 for (
int j = 0; j < lenResult; j++) {
42 LOGD(
"readArray %d -> %d", len, lenResult);
60 for (
int j = 0; j < len; j++) {
61 if (!
write(data[j])) {
67 LOGD(
"writeArray %d -> %d", len, result);
81 virtual bool peek(T &result) = 0;
86 bool isEmpty() {
return available() == 0; }
106 virtual size_t size() = 0;
111 if (size() == 0)
return 0.0f;
112 return 100.0f *
static_cast<float>(
available()) /
113 static_cast<float>(size());
126 LOGD(
"%s: %d", LOG_METHOD, len);
128 int result = min(len, p_buffer->available());
129 for (
int j = 0; j < result; j++) {
131 p_buffer->read(sample);
139 template <
int rows,
int channels>
141 int lenResult = min(rows, p_buffer->available());
142 for (
int j = 0; j < lenResult; j++) {
144 p_buffer->read(sample);
145 for (
int i = 0; i < channels; i++) {
154 BaseBuffer<T> *p_buffer =
nullptr;
185 this->owns_buffer =
false;
186 this->buffer = (uint8_t *)
data;
187 this->current_read_pos = 0;
188 this->current_write_pos = len;
192 if (size() == 0) resize(len);
198 if (current_write_pos < buffer.size()) {
199 buffer[current_write_pos++] = sample;
205 bool read(T &result)
override {
206 bool success =
false;
207 if (current_read_pos < current_write_pos) {
208 result = buffer[current_read_pos++];
214 bool peek(T &result)
override {
215 bool success =
false;
216 if (current_read_pos < current_write_pos) {
217 result = buffer[current_read_pos];
224 int result = current_write_pos - current_read_pos;
225 return max(result, 0);
237 return len_available;
239 current_read_pos += len;
240 len_available -= len;
241 memmove(buffer.data(), buffer.data() + current_read_pos, len_available);
242 current_read_pos = 0;
243 current_write_pos = len_available;
245 if (is_clear_with_zero) {
246 memset(buffer.data() + current_write_pos, 0,
247 buffer.size() - current_write_pos);
254 T *
address()
override {
return buffer.data(); }
257 T *
data() {
return buffer.data() + current_read_pos; }
260 current_read_pos = 0;
261 current_write_pos = 0;
262 if (is_clear_with_zero) {
263 memset(buffer.data(), 0, buffer.size());
270 size_t result = min(available_size, (
size_t)buffer.size());
271 current_read_pos = 0;
272 current_write_pos = result;
276 size_t size()
override {
return buffer.size(); }
278 void resize(
int size) {
279 if (buffer.size() != size) {
299 int current_read_pos = 0;
300 int current_write_pos = 0;
301 bool owns_buffer =
true;
302 bool is_clear_with_zero =
false;
319 bool read(T &result)
override {
324 result = _aucBuffer[_iTail];
325 _iTail = nextIndex(_iTail);
332 bool peek(T &result)
override {
337 result = _aucBuffer[_iTail];
341 virtual int peekArray(T *data,
int n) {
342 if (isEmpty())
return -1;
344 int count = _numElems;
346 for (
int j = 0; j < n; j++) {
347 data[j] = _aucBuffer[tail];
348 tail = nextIndex(tail);
351 if (count == 0)
break;
359 bool isEmpty() {
return available() == 0; }
362 virtual bool write(T data)
override {
365 _aucBuffer[_iHead] = data;
366 _iHead = nextIndex(_iHead);
387 virtual T *
address()
override {
return _aucBuffer.data(); }
389 virtual void resize(
int len) {
390 if (max_size != len) {
391 LOGI(
"resize: %d", len);
392 _aucBuffer.resize(len);
398 virtual size_t size()
override {
return max_size; }
407 int nextIndex(
int index) {
return (uint32_t)(index + 1) % max_size; }
418template <
class File,
typename T>
427 if (p_file) p_file->close();
433 p_file = &bufferFile;
435 LOGE(
"file is not valid");
445 if (p_file ==
nullptr)
return 0;
446 int read_count = min(count,
available());
449 if (!
file_seek(offset.pos))
return false;
451 if (offset.len1 > 0) {
453 n +=
file_read(data + offset.len, offset.len1);
454 read_pos = offset.len1;
456 read_pos += read_count;
458 assert(n == read_count);
459 element_count -= read_count;
464 bool peek(T &result)
override {
465 if (p_file ==
nullptr || isEmpty()) {
476 if (p_file ==
nullptr)
return 0;
477 int read_count = min(count,
available());
480 if (!
file_seek(offset.pos))
return false;
482 if (offset.len1 > 0) {
484 n +=
file_read(data + offset.len, offset.len1);
486 assert(n == read_count);
495 if (p_file ==
nullptr)
return 0;
500 if (!
file_seek(offset.pos))
return false;
502 if (offset.len1 > 0) {
504 n +=
file_write(data + offset.len, offset.len1);
505 write_pos = offset.len1;
507 write_pos += write_count;
509 assert(n == write_count);
510 element_count += write_count;
517 bool isEmpty() {
return available() == 0; }
534 size_t size()
override {
return max_size; }
543 File *p_file =
nullptr;
546 int element_count = 0;
560 int overflow = (pos + len) - max_size;
567 result.len = len - overflow;
568 result.len1 = overflow;
575 int file_pos = pos *
sizeof(T);
576 if (p_file->position() != file_pos) {
577 LOGD(
"file_seek: %d", pos);
578 if (!p_file->seek(file_pos)) {
579 LOGE(
"seek %d", file_pos);
588 LOGD(
"file_write: %d", count);
589 if (p_file ==
nullptr)
return 0;
590 int to_write =
sizeof(T) * count;
591 int bytes_written = p_file->write((
const uint8_t *)data, to_write);
593 int elements_written = bytes_written /
sizeof(T);
594 if (bytes_written != to_write) {
595 LOGE(
"write: %d -> %d", to_write, bytes_written);
597 return elements_written;
602 LOGD(
"file_read: %d", count);
603 int read_bytes = count *
sizeof(T);
604 int result_bytes = p_file->readBytes((
char *)result, read_bytes);
605 int result_count = result_bytes /
sizeof(T);
606 if (result_count != count) {
607 LOGE(
"readBytes: %d -> %d", read_bytes, result_bytes);
625 virtual ~NBuffer() { freeMemory(); }
628 bool read(T &result)
override {
630 return actual_read_buffer->read(result);
634 bool peek(T &result)
override {
636 return actual_read_buffer->peek(result);
645 if (actual_write_buffer ==
nullptr) {
646 actual_write_buffer = getNextAvailableBuffer();
648 if (actual_write_buffer !=
nullptr) {
649 result = actual_write_buffer->write(data);
651 if (actual_write_buffer->isFull()) {
652 addFilledBuffer(actual_write_buffer);
653 actual_write_buffer = getNextAvailableBuffer();
657 if (start_time == 0l) {
660 if (result) sample_count++;
667 if (actual_read_buffer ==
nullptr) {
668 actual_read_buffer = getNextFilledBuffer();
670 if (actual_read_buffer ==
nullptr) {
673 int result = actual_read_buffer->available();
678 (actual_read_buffer ==
nullptr) ? 0 : actual_read_buffer->available();
685 if (actual_write_buffer ==
nullptr) {
686 actual_write_buffer = getNextAvailableBuffer();
689 if (actual_write_buffer ==
nullptr) {
693 if (actual_write_buffer->isFull()) {
696 addFilledBuffer(actual_write_buffer);
697 actual_write_buffer = getNextAvailableBuffer();
699 return actual_write_buffer->availableForWrite();
705 while (actual_read_buffer !=
nullptr) {
706 actual_read_buffer->reset();
707 addAvailableBuffer(actual_read_buffer);
709 actual_read_buffer = getNextFilledBuffer();
715 unsigned long run_time = (
millis() - start_time);
716 return run_time == 0 ? 0 : sample_count * 1000 / run_time;
721 return actual_read_buffer ==
nullptr ? nullptr
722 : actual_read_buffer->address();
733 if (buffer_size ==
size && buffer_count == count)
return;
735 filled_buffers.resize(count);
736 available_buffers.resize(count);
740 buffer_count = count;
742 for (
int j = 0; j < count; j++) {
744 LOGD(
"new buffer %p", buffer);
745 available_buffers.enqueue(buffer);
750 size_t size() {
return buffer_size * buffer_count; }
754 uint16_t buffer_count = 0;
758 QueueFromVector<BaseBuffer<T> *> filled_buffers{0,
nullptr};
759 unsigned long start_time = 0;
760 unsigned long sample_count = 0;
766 if (actual_write_buffer) {
767 LOGD(
"deleting %p", actual_write_buffer);
768 delete actual_write_buffer;
769 actual_write_buffer =
nullptr;
771 if (actual_read_buffer) {
772 LOGD(
"deleting %p", actual_read_buffer);
773 delete actual_read_buffer;
774 actual_read_buffer =
nullptr;
778 while (ptr !=
nullptr) {
779 LOGD(
"deleting %p", ptr);
781 ptr = getNextAvailableBuffer();
784 ptr = getNextFilledBuffer();
785 while (ptr !=
nullptr) {
786 LOGD(
"deleting %p", ptr);
788 ptr = getNextFilledBuffer();
792 void resetCurrent() {
793 if (actual_read_buffer !=
nullptr) {
794 actual_read_buffer->
reset();
795 addAvailableBuffer(actual_read_buffer);
798 actual_read_buffer = getNextFilledBuffer();
801 virtual BaseBuffer<T> *getNextAvailableBuffer() {
802 if (available_buffers.empty())
return nullptr;
803 BaseBuffer<T> *result =
nullptr;
804 available_buffers.dequeue(result);
808 virtual bool addAvailableBuffer(BaseBuffer<T> *buffer) {
809 return available_buffers.enqueue(buffer);
812 virtual BaseBuffer<T> *getNextFilledBuffer() {
813 if (filled_buffers.empty())
return nullptr;
814 BaseBuffer<T> *result =
nullptr;
815 filled_buffers.dequeue(result);
819 virtual bool addFilledBuffer(BaseBuffer<T> *buffer) {
820 return filled_buffers.enqueue(buffer);
838 actual_write_buffer = getNextAvailableBuffer();
839 if (actual_write_buffer !=
nullptr) {
840 addFilledBuffer(actual_write_buffer);
854 for (
auto &buffer : this->filled_buffers.toVector()) {
856 if (sbuffer->
id ==
id) {
866 using NBuffer<T>::resetCurrent;
867 using NBuffer<T>::addFilledBuffer;
868 using NBuffer<T>::getNextAvailableBuffer;
869 using NBuffer<T>::actual_write_buffer;
870 using NBuffer<T>::actual_read_buffer;
883template <
class File,
typename T>
887 NBufferFile(
int fileSize) { number_of_objects_per_file = fileSize; }
893 next_file_name.
set(
"buffer-");
895 snprintf(number, 40,
"%d", file_count);
896 next_file_name.
add(number);
897 next_file_name.
add(
".tmp");
898 return next_file_name.
c_str();
904 if (!file)
return false;
905 empty_files.enqueue(file);
915 if (!filled_files.dequeue(read_file)) {
922 int result = read_file.readBytes((
char *)data, len *
sizeof(T)) /
sizeof(T);
927 empty_files.enqueue(read_file);
934 size_t pos = read_file.position();
935 bool result =
read(data);
943 if (!write_file || write_file.size() + len > number_of_objects_per_file) {
947 filled_files.enqueue(write_file);
950 if (!empty_files.dequeue(write_file))
return false;
952 int result = write_file.write((uint8_t *)data, len *
sizeof(T));
953 return result /
sizeof(T);
957 return filled_files.size() * number_of_objects_per_file +
958 (read_file.available() /
sizeof(T));
964 number_of_objects_per_file - (write_file.available() /
sizeof(T));
965 return empty_files.size() * number_of_objects_per_file +
966 write_file.available() + open_current;
969 size_t size()
override {
return number_of_objects_per_file * file_count; }
973 cleanupFile(read_file);
974 cleanupFile(write_file);
976 while (empty_files.dequeue(file)) cleanupFile(file);
977 while (filled_files.dequeue(file)) cleanupFile(file);
982 file_delete_callback = cb;
988 empty_files.enqueue(read_file);
993 empty_files.enqueue(write_file);
997 while (filled_files.dequeue(file)) {
999 empty_files.enqueue(file);
1011 int number_of_objects_per_file = 0;
1013 const uint16_t max_file_name = 256;
1015 void (*file_delete_callback)(
const char *filename);
1017 void cleanupFile(
File &file) {
1020 int len = strlen(file.name());
1021 char file_name[len + 1];
1022 strncpy(file_name, file.name(), len);
1024 file_delete_callback(file_name);
1037template <
typename T>
1041 LOGI(
"BufferedArray(%d)", len);
1046 int16_t *getValues(
size_t offset,
size_t length) {
1047 LOGD(
"getValues(%d,%d) - max %d", offset, length, array.size());
1051 actual_end = length;
1054 last_end = actual_end >= 0 ? actual_end : offset;
1056 actual_end = offset + length > actual_end ? offset + length : actual_end;
1058 int size = actual_end - last_end;
1060 LOGD(
"readBytes(%d,%d)", last_end, size);
1061 assert(last_end + size <= array.size());
1062 p_stream->readBytes((uint8_t *)(&array[last_end]), size * 2);
1064 assert(offset < actual_end);
1065 return &array[offset];
1069 int actual_end = -1;
1072 Stream *p_stream =
nullptr;