arduino-audio-tools
Loading...
Searching...
No Matches
RedisBuffer.h
1#pragma once
2#include "AudioTools/CoreAudio/Buffers.h"
3
4#define REDIS_RESULT_BUFFER_SIZE (10 * 1024)
5
6
7namespace audio_tools {
8
30template <typename T>
31class RedisBuffer : public BaseBuffer<T> {
32 public:
52
58 void setExpire(int seconds) { expire_seconds = seconds; }
59
67 bool write(T data) override {
68 has_written = true;
69 write_buf.write(data);
70 if (write_buf.isFull()) {
71 flushWrite(); // flush any pending writes first
72 }
73 return true;
74 }
75
83 int writeArray(const T data[], int len) override {
84 LOGI("RedisBuffer:writeArray: %d", len);
85 has_written = true;
86 int written = 0;
87 for (int i = 0; i < len; ++i) {
88 write(data[i]);
89 ++written;
90 }
91 return written;
92 }
93
100 bool read(T& result) override {
101 flushWrite(); // flush any pending writes before reading
102 if (read_buf.isEmpty()) {
103 read_buf.reset();
104 fillReadBuffer(); // fill local buffer from Redis
105 if (read_buf.isEmpty()) {
106 LOGI("RedisBuffer:read: no data available");
107 return false; // nothing left in Redis
108 }
109 }
110 T val;
111 bool rc = read_buf.read(val); // read from local buffer
112 LOGI("Redis LPOP: %d", val);
113 result = val;
114 return rc;
115 }
116
124 int readArray(T data[], int len) override {
125 flushWrite(); // flush any pending writes before reading
126 int read_count = 0;
127 while (read_count < len) {
128 if (read_buf.isEmpty()) {
130 if (read_buf.isEmpty()) break; // nothing left in Redis
131 }
132 read_buf.read(data[read_count++]);
133 }
134 return read_count;
135 }
136
143 bool peek(T& result) override {
144 flushWrite(); // flush any pending writes before peeking
145
146 // Use LINDEX to peek at the first value in Redis without removing it
147 String cmd = redisCommand("LINDEX", key, "0");
148 RedisResult resp = sendCommand(cmd);
149 if (!resp.ok) return false;
150 result = (T)resp.intValue;
151 return true;
152 }
153
158 void reset() override {
159 flushWrite();
160 String cmd = redisCommand("DEL", key);
161 auto rc = sendCommand(cmd);
162 LOGI("Redis DEL: %d", rc.intValue);
163 read_buf.reset();
165 }
166
172 int available() override {
173 flushWrite();
174 String cmd = redisCommand("LLEN", key);
175 RedisResult resp = sendCommand(cmd);
176 LOGI("LLEN: %d (ok=%d)", resp.intValue, resp.ok);
177 return resp.intValue + read_buf.available();
178 }
179
186 int availableForWrite() override { return max_size - available(); }
187
193 T* address() override { return nullptr; }
194
199 size_t size() override { return max_size; }
200
207 bool resize(int size) override {
208 if (has_written) return false;
209 LOGI("RedisBuffer::resize: %d", size);
210 max_size = size;
211 return true;
212 }
213
214 protected:
215 struct RedisResult {
216 int intValue = 0;
219 bool ok = false;
220 operator bool() const { return ok; }
221 // std::vector<T> values;
222 };
223
225 const char* key;
226 size_t max_size;
231 bool has_written = false;
232
233 void clearResponse() {
234 while (client.available()) {
235 client.read(); // clear any remaining data in the buffer
236 }
237 }
238
247 String redisCommand(const String& cmd, const String& arg1 = "",
248 const String& arg2 = "", const String& arg3 = "") {
249 String out = "*" +
250 String(1 + (arg1.length() > 0) + (arg2.length() > 0) +
251 (arg3.length() > 0)) +
252 "\r\n";
253 out += "$" + String(cmd.length()) + "\r\n" + cmd + "\r\n";
254 if (arg1.length())
255 out += "$" + String(arg1.length()) + "\r\n" + arg1 + "\r\n";
256 if (arg2.length())
257 out += "$" + String(arg2.length()) + "\r\n" + arg2 + "\r\n";
258 if (arg3.length())
259 out += "$" + String(arg3.length()) + "\r\n" + arg3 + "\r\n";
260 return out;
261 }
262
268 RedisResult sendCommand(const String& cmd) {
269 RedisResult result;
270 if (!client.connected()) {
271 LOGE("Redis not connected");
272 result.ok = false;
273 return result;
274 }
275 client.print(cmd);
276 client.flush();
277 result = readResponse();
278 return result;
279 }
280
287 RedisResult result;
288 result.ok = true;
289 uint8_t buffer[REDIS_RESULT_BUFFER_SIZE] = {};
290 int n = 0;
291 while (n <= 0) {
292 n = client.read(buffer, sizeof(buffer));
293 }
294 buffer[n] = 0;
295
296 // build vector of strings
297 result.strValues.clear();
298 String tail = (char*)buffer;
299 int nl_pos = tail.indexOf("\r\n");
300 while (nl_pos >= 0) {
301 String head = tail.substring(0, nl_pos);
302 if (!head.startsWith("$"))
303 result.strValues.push_back(head);
304 tail = tail.substring(nl_pos + 2);
305 tail.trim();
306 nl_pos = tail.indexOf("\r\n");
307 }
308
309 if (!tail.startsWith("$") && tail.length() > 0) {
310 result.strValues.push_back(tail);
311 }
312
313 // if we have more then 2 lines this is a success
314 if (result.strValues.size() > 2) {
315 result.intValue = result.strValues.size();
316 } else {
317 // Else try to determine an int value
318 StrView line((char*)buffer, sizeof(buffer), n);
319
321 if (line.startsWith("$")) {
322 int end = line.indexOf("\n");
323 line.substring(line.c_str(), end, line.length());
324 }
325
327 if (line.startsWith(":")) {
328 line.replace(":", "");
329 }
330
331 if (line.startsWith("-")){
332 result.ok = false;
333 }
334
335 if (line.isEmpty()){
336 result.intValue = -1; // no data available
337 result.ok = false;
338 }
339 else {
340 result.intValue = line.toInt();
341 }
342 }
343
344 return result;
345 }
346
351 void flushWrite() {
352 if (write_buf.isEmpty()) return;
353 int write_size = write_buf.available();
354 // Use RPUSH with multiple arguments
355 String cmd = "*" + String(2 + write_size) + "\r\n";
356 cmd += "$5\r\nRPUSH\r\n";
357 cmd += "$" + String(strlen(key)) + "\r\n" + key + "\r\n";
358 T value;
359 while (!write_buf.isEmpty()) {
360 write_buf.read(value);
361 String sval = String(value);
362 cmd += "$" + String(sval.length()) + "\r\n" + sval + "\r\n";
363 }
365 RedisResult resp = sendCommand(cmd);
366 LOGI("Redis RPUSH %d entries: %d (ok=%d)", write_size, resp.intValue,
367 resp.ok);
368
369 if (expire_seconds > 0) {
370 String expireCmd = redisCommand("EXPIRE", key, String(expire_seconds));
371 RedisResult resp = sendCommand(expireCmd);
372 LOGI("Redis EXPIRE: %d (ok=%d)", resp.intValue, resp.ok);
373 }
374 }
375
381 read_buf.reset();
382 int size = read_buf.size();
383 LOGI("Redis LPOP: %d", size)
384 // Read up to local_buf_size items from Redis
385 String cmd = redisCommand("LPOP", key, String(size));
386 auto rc = sendCommand(cmd);
387 if (!rc.ok) {
388 LOGE("Redis LPOP failed: %s", cmd.c_str());
389 return; // no data available
390 }
391 for (auto& str : rc.strValues) {
392 if (str.startsWith("*")) continue;
393 if (str.startsWith("$")) continue;
394 if (str.length() == 0) continue;
395 LOGI("- %s", str.c_str());
396 T value = (T)str.toInt();
397 read_buf.write(value);
398 }
399
400 LOGI("RedisBuffer: %d of %d items",(int) read_buf.available(),(int) read_buf.size() );
401 // if this fails the
402 if(!read_buf.isFull() ){
403 LOGW("RedisBuffer:fillReadBuffer: not enough data read from Redis");
404 }
405 }
406};
407
408} // namespace audio_tools
Shared functionality of all buffers.
Definition Buffers.h:22
void clear()
same as reset
Definition Buffers.h:95
Definition NoArduino.h:169
Buffer implementation that stores and retrieves data from a Redis server using the Arduino Client.
Definition RedisBuffer.h:31
SingleBuffer< T > write_buf
Local buffer for pending writes.
Definition RedisBuffer.h:229
size_t size() override
Returns the maximum capacity of the buffer.
Definition RedisBuffer.h:199
Client & client
Reference to the Arduino Client for Redis communication.
Definition RedisBuffer.h:224
RedisBuffer(Client &client, const char *key, size_t max_size, size_t local_buf_size=512, int expire_seconds=60 *60)
Constructs a RedisBuffer.
Definition RedisBuffer.h:43
void flushWrite()
Flushes buffered writes to Redis using RPUSH and sets expiration if configured.
Definition RedisBuffer.h:351
String redisCommand(const String &cmd, const String &arg1="", const String &arg2="", const String &arg3="")
Constructs a Redis command in RESP format.
Definition RedisBuffer.h:247
bool peek(T &result) override
Peeks at the next value in Redis directly (no local buffer). Flushes any pending writes before peekin...
Definition RedisBuffer.h:143
RedisResult sendCommand(const String &cmd)
Sends a command to the Redis server and returns the parsed result.
Definition RedisBuffer.h:268
bool read(T &result) override
Reads a single value from Redis directly (no local buffer). Flushes any pending writes before reading...
Definition RedisBuffer.h:100
void setExpire(int seconds)
Sets the expiration time (in seconds) for the Redis key. The expiration will be refreshed on every wr...
Definition RedisBuffer.h:58
bool write(T data) override
Buffers a single value for writing to Redis. Data is only sent to Redis when the local buffer is full...
Definition RedisBuffer.h:67
int available() override
Returns the number of elements available to read (local + Redis). Flushes any pending writes before c...
Definition RedisBuffer.h:172
void fillReadBuffer()
Fills the local read buffer from Redis using LRANGE. After reading, removes the items from Redis usin...
Definition RedisBuffer.h:380
bool resize(int size) override
Resizes the maximum buffer size. This operation is only allowed before any write has occurred.
Definition RedisBuffer.h:207
int availableForWrite() override
Returns the number of elements that can be written before reaching max_size. There are are no checks ...
Definition RedisBuffer.h:186
SingleBuffer< T > read_buf
Local buffer for pending reads.
Definition RedisBuffer.h:230
T * address() override
Returns the address of the start of the physical read buffer (not supported).
Definition RedisBuffer.h:193
int expire_seconds
Expiration time in seconds (0 = no expiration).
Definition RedisBuffer.h:228
size_t local_buf_size
Local buffer size for batching.
Definition RedisBuffer.h:227
bool has_written
True if any write operation has occurred.
Definition RedisBuffer.h:231
int writeArray(const T data[], int len) override
Writes multiple values to Redis in one batch. Flushes any pending writes before sending the new data.
Definition RedisBuffer.h:83
const char * key
Redis key for the buffer.
Definition RedisBuffer.h:225
RedisResult readResponse()
Reads a single line response from the Redis server and parses it into a RedisResult.
Definition RedisBuffer.h:286
void reset() override
Clears the buffer both locally and on the Redis server. Flushes any pending writes before clearing.
Definition RedisBuffer.h:158
int readArray(T data[], int len) override
Reads multiple values from the buffer in one batch. Flushes any pending writes before reading.
Definition RedisBuffer.h:124
size_t max_size
Maximum number of elements in the buffer.
Definition RedisBuffer.h:226
A simple Buffer implementation which just uses a (dynamically sized) array.
Definition Buffers.h:172
bool write(T sample) override
write add an entry to the buffer
Definition Buffers.h:206
bool read(T &result) override
reads a single value
Definition Buffers.h:215
int available() override
provides the number of entries that are available to read
Definition Buffers.h:233
bool isFull() override
checks if the buffer is full
Definition Buffers.h:240
void reset() override
clears the buffer
Definition Buffers.h:286
A simple wrapper to provide string functions on existing allocated char*. If the underlying char* is ...
Definition StrView.h:28
virtual int length()
Definition StrView.h:383
virtual bool startsWith(const char *str)
checks if the string starts with the indicated substring
Definition StrView.h:171
virtual bool isEmpty()
checks if the string is empty
Definition StrView.h:386
virtual bool replace(const char *toReplace, const char *replaced)
Replaces the first instance of toReplace with replaced.
Definition StrView.h:392
virtual void substring(StrView &from, int start, int end)
copies a substring into the current string
Definition StrView.h:477
virtual const char * c_str()
provides the string value as const char*
Definition StrView.h:379
int toInt()
Converts the string to an int.
Definition StrView.h:575
virtual int indexOf(const char c, int start=0)
Definition StrView.h:260
Vector implementation which provides the most important methods as defined by std::vector....
Definition Vector.h:21
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition AudioCodecsBase.h:10
Definition RedisBuffer.h:215
Vector< String > strValues
String value parsed from the response (if any)
Definition RedisBuffer.h:218
bool ok
True if the response was valid and not an error.
Definition RedisBuffer.h:219
int intValue
Integer value parsed from the response (if any)
Definition RedisBuffer.h:216