3#include "AudioTools/CoreAudio/AudioBasic/Collections.h"
4#include "AudioTools/CoreAudio/AudioBasic/Str.h"
5#include "AudioTools/CoreAudio/AudioLogger.h"
21template <
typename T =
int16_t>
30 virtual bool read(T &result) = 0;
34 if (data ==
nullptr) {
39 for (
int j = 0; j < lenResult; j++) {
42 LOGD(
"readArray %d -> %d", len, lenResult);
60 for (
int j = 0; j < len; j++) {
61 if (!
write(data[j])) {
67 LOGD(
"writeArray %d -> %d", len, result);
81 virtual bool peek(T &result) = 0;
86 bool isEmpty() {
return available() == 0; }
106 virtual size_t size() = 0;
111 if (size() == 0)
return 0.0f;
112 return 100.0f *
static_cast<float>(
available()) /
113 static_cast<float>(size());
118 LOGE(
"resize not implemented for this buffer");
126template <
typename T =
int16_t>
132 LOGD(
"%s: %d", LOG_METHOD, len);
134 int result = min(len, p_buffer->
available());
135 for (
int j = 0; j < result; j++) {
137 p_buffer->
read(sample);
145 template <
int rows,
int channels>
147 int lenResult = min(rows, p_buffer->
available());
148 for (
int j = 0; j < lenResult; j++) {
150 p_buffer->
read(sample);
151 for (
int i = 0; i < channels; i++) {
160 BaseBuffer<T> *p_buffer =
nullptr;
171template <
typename T =
int16_t>
180 : _allocator(allocator) {
195 this->owns_buffer =
false;
196 this->buffer = (uint8_t *)
data;
197 this->current_read_pos = 0;
198 this->current_write_pos = len;
202 if (size() == 0)
resize(len);
208 if (current_write_pos < buffer.size()) {
209 buffer[current_write_pos++] = sample;
215 bool read(T &result)
override {
216 bool success =
false;
217 if (current_read_pos < current_write_pos) {
218 result = buffer[current_read_pos++];
224 bool peek(T &result)
override {
225 bool success =
false;
226 if (current_read_pos < current_write_pos) {
227 result = buffer[current_read_pos];
234 int result = current_write_pos - current_read_pos;
235 return max(result, 0);
242 int peekArray(uint8_t *
data,
int len) {
244 if (len > len_available) {
247 memcpy(
data, buffer.data() + current_read_pos, len);
256 return len_available;
258 current_read_pos += len;
259 len_available -= len;
260 memmove(buffer.data(), buffer.data() + current_read_pos, len_available *
sizeof(T));
261 current_read_pos = 0;
262 current_write_pos = len_available;
264 if (is_clear_with_zero) {
265 memset(buffer.data() + current_write_pos, 0,
266 buffer.size() - current_write_pos);
275 memmove(buffer.data(), buffer.data() + current_read_pos, av *
sizeof(T));
276 current_write_pos = av;
277 current_read_pos = 0;
281 T *
address()
override {
return buffer.data(); }
284 T *
data() {
return buffer.data() + current_read_pos; }
287 current_read_pos = 0;
288 current_write_pos = 0;
289 if (is_clear_with_zero) {
290 memset(buffer.data(), 0, buffer.size());
297 size_t result = min(available_size, (
size_t)buffer.size());
298 current_read_pos = 0;
299 current_write_pos = result;
303 size_t size()
override {
return buffer.size(); }
306 if (buffer.size() < size) {
328 int current_read_pos = 0;
329 int current_write_pos = 0;
330 bool owns_buffer =
true;
331 bool is_clear_with_zero =
false;
340template <
typename T =
int16_t>
348 bool read(T &result)
override {
353 result = _aucBuffer[_iTail];
354 _iTail = nextIndex(_iTail);
361 bool peek(T &result)
override {
366 result = _aucBuffer[_iTail];
370 virtual int peekArray(T *data,
int n) {
371 if (isEmpty())
return -1;
373 int count = _numElems;
375 for (
int j = 0; j < n; j++) {
376 data[j] = _aucBuffer[tail];
377 tail = nextIndex(tail);
380 if (count == 0)
break;
388 bool isEmpty() {
return available() == 0; }
391 virtual bool write(T data)
override {
394 _aucBuffer[_iHead] = data;
395 _iHead = nextIndex(_iHead);
416 virtual T *
address()
override {
return _aucBuffer.data(); }
419 if (max_size != len && len > 0) {
420 LOGI(
"resize: %d", len);
421 _aucBuffer.resize(len);
428 virtual size_t size()
override {
return max_size; }
438 int nextIndex(
int index) {
439 if (max_size == 0)
return 0;
440 return (uint32_t)(index + 1) % max_size; }
451template <
class File,
typename T>
460 if (p_file) p_file->close();
466 p_file = &bufferFile;
468 LOGE(
"file is not valid");
478 if (p_file ==
nullptr)
return 0;
479 int read_count = min(count,
available());
482 if (!
file_seek(offset.pos))
return false;
484 if (offset.len1 > 0) {
486 n +=
file_read(data + offset.len, offset.len1);
487 read_pos = offset.len1;
489 read_pos += read_count;
492 for (
int i = 0; i < count; i++) {
493 LOGI(
"read #%d value %d", offset.pos, (
int)data[i]);
496 element_count -= read_count;
501 bool peek(T &result)
override {
502 if (p_file ==
nullptr || isEmpty()) {
513 if (p_file ==
nullptr)
return 0;
514 int read_count = min(count,
available());
517 if (!
file_seek(offset.pos))
return false;
519 if (offset.len1 > 0) {
521 n +=
file_read(data + offset.len, offset.len1);
523 assert(n == read_count);
532 if (p_file ==
nullptr)
return 0;
533 for (
int i = 0; i < len; i++) {
534 LOGI(
"write #%d value %d", write_pos, (
int)data[i]);
540 if (!
file_seek(offset.pos))
return false;
542 if (offset.len1 > 0) {
544 n +=
file_write(data + offset.len, offset.len1);
545 write_pos = offset.len1;
547 write_pos += write_count;
549 element_count += write_count;
556 bool isEmpty() {
return available() == 0; }
573 size_t size()
override {
return max_size; }
585 File *p_file =
nullptr;
588 int element_count = 0;
602 int overflow = (pos + len) - max_size;
609 result.len = len - overflow;
610 result.len1 = overflow;
617 int file_pos = pos *
sizeof(T);
618 if (p_file->position() != file_pos) {
619 LOGD(
"file_seek: %d", pos);
620 if (!p_file->seek(file_pos)) {
621 LOGE(
"seek %d", file_pos);
630 LOGD(
"file_write: %d", count);
631 if (p_file ==
nullptr)
return 0;
632 int to_write_bytes =
sizeof(T) * count;
633 int bytes_written = p_file->write((
const uint8_t *)data, to_write_bytes);
635 int elements_written = bytes_written /
sizeof(T);
636 if (bytes_written != to_write_bytes) {
637 LOGE(
"write: %d -> %d bytes", to_write_bytes, bytes_written);
639 return elements_written;
644 LOGD(
"file_read: %d", count);
645 int read_bytes = count *
sizeof(T);
646 int result_bytes = p_file->readBytes((
char *)result, read_bytes);
647 int result_count = result_bytes /
sizeof(T);
648 if (result_count != count) {
649 LOGE(
"readBytes: %d -> %d", read_bytes, result_bytes);
662template <
typename T =
int16_t>
667 virtual ~NBuffer() { freeMemory(); }
670 bool read(T &result)
override {
672 return actual_read_buffer->
read(result);
676 bool peek(T &result)
override {
678 return actual_read_buffer->
peek(result);
687 if (actual_write_buffer ==
nullptr) {
688 actual_write_buffer = getNextAvailableBuffer();
690 if (actual_write_buffer !=
nullptr) {
691 result = actual_write_buffer->
write(data);
693 if (actual_write_buffer->
isFull()) {
694 addFilledBuffer(actual_write_buffer);
695 actual_write_buffer = getNextAvailableBuffer();
699 if (start_time == 0l) {
702 if (result) sample_count++;
709 if (actual_read_buffer ==
nullptr) {
710 actual_read_buffer = getNextFilledBuffer();
712 if (actual_read_buffer ==
nullptr) {
715 int result = actual_read_buffer->
available();
720 (actual_read_buffer ==
nullptr) ? 0 : actual_read_buffer->
available();
727 if (actual_write_buffer ==
nullptr) {
728 actual_write_buffer = getNextAvailableBuffer();
731 if (actual_write_buffer ==
nullptr) {
735 if (actual_write_buffer->
isFull()) {
738 addFilledBuffer(actual_write_buffer);
739 actual_write_buffer = getNextAvailableBuffer();
747 while (actual_read_buffer !=
nullptr) {
748 actual_read_buffer->
reset();
749 addAvailableBuffer(actual_read_buffer);
751 actual_read_buffer = getNextFilledBuffer();
757 unsigned long run_time = (
millis() - start_time);
758 return run_time == 0 ? 0 : sample_count * 1000 / run_time;
763 return actual_read_buffer ==
nullptr ? nullptr
764 : actual_read_buffer->
address();
774 int count = bytes / buffer_size;
775 return resize(buffer_size, count);
780 if (buffer_size ==
size && buffer_count == count)
return true;
782 filled_buffers.resize(count);
783 available_buffers.resize(count);
787 buffer_count = count;
789 for (
int j = 0; j < count; j++) {
791 LOGD(
"new buffer %p", buffer);
792 available_buffers.enqueue(buffer);
798 size_t size() {
return buffer_size * buffer_count; }
801 int buffer_size = 1024;
802 uint16_t buffer_count = 0;
806 QueueFromVector<BaseBuffer<T> *> filled_buffers{0,
nullptr};
807 unsigned long start_time = 0;
808 unsigned long sample_count = 0;
814 if (actual_write_buffer) {
815 LOGD(
"deleting %p", actual_write_buffer);
816 delete actual_write_buffer;
817 actual_write_buffer =
nullptr;
819 if (actual_read_buffer) {
820 LOGD(
"deleting %p", actual_read_buffer);
821 delete actual_read_buffer;
822 actual_read_buffer =
nullptr;
826 while (ptr !=
nullptr) {
827 LOGD(
"deleting %p", ptr);
829 ptr = getNextAvailableBuffer();
832 ptr = getNextFilledBuffer();
833 while (ptr !=
nullptr) {
834 LOGD(
"deleting %p", ptr);
836 ptr = getNextFilledBuffer();
840 void resetCurrent() {
841 if (actual_read_buffer !=
nullptr) {
842 actual_read_buffer->
reset();
843 addAvailableBuffer(actual_read_buffer);
846 actual_read_buffer = getNextFilledBuffer();
849 virtual BaseBuffer<T> *getNextAvailableBuffer() {
850 if (available_buffers.empty())
return nullptr;
851 BaseBuffer<T> *result =
nullptr;
852 available_buffers.dequeue(result);
856 virtual bool addAvailableBuffer(BaseBuffer<T> *buffer) {
857 return available_buffers.enqueue(buffer);
860 virtual BaseBuffer<T> *getNextFilledBuffer() {
861 if (filled_buffers.empty())
return nullptr;
862 BaseBuffer<T> *result =
nullptr;
863 filled_buffers.dequeue(result);
867 virtual bool addFilledBuffer(BaseBuffer<T> *buffer) {
868 return filled_buffers.enqueue(buffer);
878template <
typename T =
int16_t>
886 actual_write_buffer = getNextAvailableBuffer();
887 if (actual_write_buffer !=
nullptr) {
888 addFilledBuffer(actual_write_buffer);
903 for (
auto &buffer : this->filled_buffers.toVector()) {
905 if (sbuffer->
id ==
id) {
915 using NBuffer<T>::resetCurrent;
916 using NBuffer<T>::addFilledBuffer;
917 using NBuffer<T>::getNextAvailableBuffer;
918 using NBuffer<T>::actual_write_buffer;
919 using NBuffer<T>::actual_read_buffer;
932template <
class File,
typename T>
936 NBufferFile(
int fileSize) { number_of_objects_per_file = fileSize; }
942 next_file_name.
set(
"buffer-");
944 snprintf(number, 40,
"%d", file_count);
945 next_file_name.
add(number);
946 next_file_name.
add(
".tmp");
947 return next_file_name.
c_str();
953 if (!file)
return false;
954 empty_files.enqueue(file);
964 if (!filled_files.dequeue(read_file)) {
971 int result = read_file.readBytes((
char *)data, len *
sizeof(T)) /
sizeof(T);
976 empty_files.enqueue(read_file);
983 size_t pos = read_file.position();
984 bool result =
read(data);
992 if (!write_file || write_file.size() + len > number_of_objects_per_file) {
996 filled_files.enqueue(write_file);
999 if (!empty_files.dequeue(write_file))
return false;
1001 int result = write_file.write((uint8_t *)data, len *
sizeof(T));
1002 return result /
sizeof(T);
1006 return filled_files.size() * number_of_objects_per_file +
1007 (read_file.available() /
sizeof(T));
1013 number_of_objects_per_file - (write_file.available() /
sizeof(T));
1014 return empty_files.size() * number_of_objects_per_file +
1015 write_file.available() + open_current;
1018 size_t size()
override {
return number_of_objects_per_file * file_count; }
1022 cleanupFile(read_file);
1023 cleanupFile(write_file);
1025 while (empty_files.dequeue(file)) cleanupFile(file);
1026 while (filled_files.dequeue(file)) cleanupFile(file);
1031 file_delete_callback = cb;
1037 empty_files.enqueue(read_file);
1042 empty_files.enqueue(write_file);
1046 while (filled_files.dequeue(file)) {
1048 empty_files.enqueue(file);
1060 int number_of_objects_per_file = 0;
1062 const uint16_t max_file_name = 256;
1064 void (*file_delete_callback)(
const char *filename);
1066 void cleanupFile(
File &file) {
1069 int len = strlen(file.name());
1070 char file_name[len + 1];
1071 strncpy(file_name, file.name(), len);
1073 file_delete_callback(file_name);
1086template <
typename T =
int16_t>
1090 LOGI(
"BufferedArray(%d)", len);
1095 int16_t *getValues(
size_t offset,
size_t length) {
1096 LOGD(
"getValues(%d,%d) - max %d", offset, length, array.size());
1100 actual_end = length;
1103 last_end = actual_end >= 0 ? actual_end : offset;
1105 actual_end = offset + length > actual_end ? offset + length : actual_end;
1107 int size = actual_end - last_end;
1109 LOGD(
"readBytes(%d,%d)", last_end, size);
1110 assert(last_end + size <= array.size());
1111 p_stream->readBytes((uint8_t *)(&array[last_end]), size * 2);
1113 assert(offset < actual_end);
1114 return &array[offset];
1118 int actual_end = -1;
1121 Stream *p_stream =
nullptr;