arduino-audio-tools
All Classes Namespaces Files Functions Variables Typedefs Enumerations Friends Macros Modules Pages
RedisBuffer.h
1#pragma once
2#include "AudioTools/CoreAudio/Buffers.h"
3
4#define REDIS_RESULT_BUFFER_SIZE (10 * 1024)
5
6
7namespace audio_tools {
8
29template <typename T>
30class RedisBuffer : public BaseBuffer<T> {
31 public:
51
57 void setExpire(int seconds) { expire_seconds = seconds; }
58
66 bool write(T data) override {
67 has_written = true;
68 write_buf.write(data);
69 if (write_buf.isFull()) {
70 flushWrite(); // flush any pending writes first
71 }
72 return true;
73 }
74
82 int writeArray(const T data[], int len) override {
83 LOGI("RedisBuffer:writeArray: %d", len);
84 has_written = true;
85 int written = 0;
86 for (int i = 0; i < len; ++i) {
87 write(data[i]);
88 ++written;
89 }
90 return written;
91 }
92
99 bool read(T& result) override {
100 flushWrite(); // flush any pending writes before reading
101 if (read_buf.isEmpty()) {
102 read_buf.reset();
103 fillReadBuffer(); // fill local buffer from Redis
104 if (read_buf.isEmpty()) {
105 LOGI("RedisBuffer:read: no data available");
106 return false; // nothing left in Redis
107 }
108 }
109 T val;
110 bool rc = read_buf.read(val); // read from local buffer
111 LOGI("Redis LPOP: %d", val);
112 result = val;
113 return rc;
114 }
115
123 int readArray(T data[], int len) override {
124 flushWrite(); // flush any pending writes before reading
125 int read_count = 0;
126 while (read_count < len) {
127 if (read_buf.isEmpty()) {
129 if (read_buf.isEmpty()) break; // nothing left in Redis
130 }
131 read_buf.read(data[read_count++]);
132 }
133 return read_count;
134 }
135
142 bool peek(T& result) override {
143 flushWrite(); // flush any pending writes before peeking
144
145 // Use LINDEX to peek at the first value in Redis without removing it
146 String cmd = redisCommand("LINDEX", key, "0");
147 RedisResult resp = sendCommand(cmd);
148 if (!resp.ok) return false;
149 result = (T)resp.intValue;
150 return true;
151 }
152
157 void reset() override {
158 flushWrite();
159 String cmd = redisCommand("DEL", key);
160 auto rc = sendCommand(cmd);
161 LOGI("Redis DEL: %d", rc.intValue);
162 read_buf.reset();
163 write_buf.reset();
164 }
165
171 int available() override {
172 flushWrite();
173 String cmd = redisCommand("LLEN", key);
174 RedisResult resp = sendCommand(cmd);
175 LOGI("LLEN: %d (ok=%d)", resp.intValue, resp.ok);
176 return resp.intValue + read_buf.available();
177 }
178
185 int availableForWrite() override { return max_size - available(); }
186
192 T* address() override { return nullptr; }
193
198 size_t size() override { return max_size; }
199
206 bool resize(int size) override {
207 if (has_written) return false;
208 LOGI("RedisBuffer::resize: %d", size);
209 max_size = size;
210 return true;
211 }
212
213 protected:
214 struct RedisResult {
215 int intValue = 0;
218 bool ok = false;
219 operator bool() const { return ok; }
220 // std::vector<T> values;
221 };
222
224 const char* key;
225 size_t max_size;
230 bool has_written = false;
231
232 void clearResponse() {
233 while (client.available()) {
234 client.read(); // clear any remaining data in the buffer
235 }
236 }
237
246 String redisCommand(const String& cmd, const String& arg1 = "",
247 const String& arg2 = "", const String& arg3 = "") {
248 String out = "*" +
249 String(1 + (arg1.length() > 0) + (arg2.length() > 0) +
250 (arg3.length() > 0)) +
251 "\r\n";
252 out += "$" + String(cmd.length()) + "\r\n" + cmd + "\r\n";
253 if (arg1.length())
254 out += "$" + String(arg1.length()) + "\r\n" + arg1 + "\r\n";
255 if (arg2.length())
256 out += "$" + String(arg2.length()) + "\r\n" + arg2 + "\r\n";
257 if (arg3.length())
258 out += "$" + String(arg3.length()) + "\r\n" + arg3 + "\r\n";
259 return out;
260 }
261
267 RedisResult sendCommand(const String& cmd) {
268 RedisResult result;
269 if (!client.connected()) {
270 LOGE("Redis not connected");
271 result.ok = false;
272 return result;
273 }
274 client.print(cmd);
275 client.flush();
276 result = readResponse();
277 return result;
278 }
279
286 RedisResult result;
287 result.ok = true;
288 uint8_t buffer[REDIS_RESULT_BUFFER_SIZE] = {};
289 int n = 0;
290 while (n <= 0) {
291 n = client.read(buffer, sizeof(buffer));
292 }
293 buffer[n] = 0;
294
295 // build vector of strings
296 result.strValues.clear();
297 String tail = (char*)buffer;
298 int nl_pos = tail.indexOf("\r\n");
299 while (nl_pos >= 0) {
300 String head = tail.substring(0, nl_pos);
301 if (!head.startsWith("$"))
302 result.strValues.push_back(head);
303 tail = tail.substring(nl_pos + 2);
304 tail.trim();
305 nl_pos = tail.indexOf("\r\n");
306 }
307
308 if (!tail.startsWith("$") && tail.length() > 0) {
309 result.strValues.push_back(tail);
310 }
311
312 // if we have more then 2 lines this is a success
313 if (result.strValues.size() > 2) {
314 result.intValue = result.strValues.size();
315 } else {
316 // Else try to determine an int value
317 StrView line((char*)buffer, sizeof(buffer), n);
318
320 if (line.startsWith("$")) {
321 int end = line.indexOf("\n");
322 line.substring(line.c_str(), end, line.length());
323 }
324
326 if (line.startsWith(":")) {
327 line.replace(":", "");
328 }
329
330 if (line.startsWith("-")){
331 result.ok = false;
332 }
333
334 if (line.isEmpty()){
335 result.intValue = -1; // no data available
336 result.ok = false;
337 }
338 else {
339 result.intValue = line.toInt();
340 }
341 }
342
343 return result;
344 }
345
350 void flushWrite() {
351 if (write_buf.isEmpty()) return;
352 int write_size = write_buf.available();
353 // Use RPUSH with multiple arguments
354 String cmd = "*" + String(2 + write_size) + "\r\n";
355 cmd += "$5\r\nRPUSH\r\n";
356 cmd += "$" + String(strlen(key)) + "\r\n" + key + "\r\n";
357 T value;
358 while (!write_buf.isEmpty()) {
359 write_buf.read(value);
360 String sval = String(value);
361 cmd += "$" + String(sval.length()) + "\r\n" + sval + "\r\n";
362 }
363 write_buf.clear();
364 RedisResult resp = sendCommand(cmd);
365 LOGI("Redis RPUSH %d entries: %d (ok=%d)", write_size, resp.intValue,
366 resp.ok);
367
368 if (expire_seconds > 0) {
369 String expireCmd = redisCommand("EXPIRE", key, String(expire_seconds));
370 RedisResult resp = sendCommand(expireCmd);
371 LOGI("Redis EXPIRE: %d (ok=%d)", resp.intValue, resp.ok);
372 }
373 }
374
380 read_buf.reset();
381 int size = read_buf.size();
382 LOGI("Redis LPOP: %d", size)
383 // Read up to local_buf_size items from Redis
384 String cmd = redisCommand("LPOP", key, String(size));
385 auto rc = sendCommand(cmd);
386 if (!rc.ok) {
387 LOGE("Redis LPOP failed: %s", cmd.c_str());
388 return; // no data available
389 }
390 for (auto& str : rc.strValues) {
391 if (str.startsWith("*")) continue;
392 if (str.startsWith("$")) continue;
393 if (str.length() == 0) continue;
394 LOGI("- %s", str.c_str());
395 T value = (T)str.toInt();
396 read_buf.write(value);
397 }
398
399 LOGI("RedisBuffer: %d of %d items",(int) read_buf.available(),(int) read_buf.size() );
400 // if this fails the
401 if(!read_buf.isFull() ){
402 LOGW("RedisBuffer:fillReadBuffer: not enough data read from Redis");
403 }
404 }
405};
406
407} // namespace audio_tools
Shared functionality of all buffers.
Definition Buffers.h:22
Definition NoArduino.h:169
Buffer implementation that stores and retrieves data from a Redis server using the Arduino Client.
Definition RedisBuffer.h:30
SingleBuffer< T > write_buf
Local buffer for pending writes.
Definition RedisBuffer.h:228
size_t size() override
Returns the maximum capacity of the buffer.
Definition RedisBuffer.h:198
Client & client
Reference to the Arduino Client for Redis communication.
Definition RedisBuffer.h:223
RedisBuffer(Client &client, const char *key, size_t max_size, size_t local_buf_size=512, int expire_seconds=60 *60)
Constructs a RedisBuffer.
Definition RedisBuffer.h:42
void flushWrite()
Flushes buffered writes to Redis using RPUSH and sets expiration if configured.
Definition RedisBuffer.h:350
String redisCommand(const String &cmd, const String &arg1="", const String &arg2="", const String &arg3="")
Constructs a Redis command in RESP format.
Definition RedisBuffer.h:246
bool peek(T &result) override
Peeks at the next value in Redis directly (no local buffer). Flushes any pending writes before peekin...
Definition RedisBuffer.h:142
RedisResult sendCommand(const String &cmd)
Sends a command to the Redis server and returns the parsed result.
Definition RedisBuffer.h:267
bool read(T &result) override
Reads a single value from Redis directly (no local buffer). Flushes any pending writes before reading...
Definition RedisBuffer.h:99
void setExpire(int seconds)
Sets the expiration time (in seconds) for the Redis key. The expiration will be refreshed on every wr...
Definition RedisBuffer.h:57
bool write(T data) override
Buffers a single value for writing to Redis. Data is only sent to Redis when the local buffer is full...
Definition RedisBuffer.h:66
int available() override
Returns the number of elements available to read (local + Redis). Flushes any pending writes before c...
Definition RedisBuffer.h:171
void fillReadBuffer()
Fills the local read buffer from Redis using LRANGE. After reading, removes the items from Redis usin...
Definition RedisBuffer.h:379
bool resize(int size) override
Resizes the maximum buffer size. This operation is only allowed before any write has occurred.
Definition RedisBuffer.h:206
int availableForWrite() override
Returns the number of elements that can be written before reaching max_size. There are are no checks ...
Definition RedisBuffer.h:185
SingleBuffer< T > read_buf
Local buffer for pending reads.
Definition RedisBuffer.h:229
T * address() override
Returns the address of the start of the physical read buffer (not supported).
Definition RedisBuffer.h:192
int expire_seconds
Expiration time in seconds (0 = no expiration).
Definition RedisBuffer.h:227
size_t local_buf_size
Local buffer size for batching.
Definition RedisBuffer.h:226
bool has_written
True if any write operation has occurred.
Definition RedisBuffer.h:230
int writeArray(const T data[], int len) override
Writes multiple values to Redis in one batch. Flushes any pending writes before sending the new data.
Definition RedisBuffer.h:82
const char * key
Redis key for the buffer.
Definition RedisBuffer.h:224
RedisResult readResponse()
Reads a single line response from the Redis server and parses it into a RedisResult.
Definition RedisBuffer.h:285
void reset() override
Clears the buffer both locally and on the Redis server. Flushes any pending writes before clearing.
Definition RedisBuffer.h:157
int readArray(T data[], int len) override
Reads multiple values from the buffer in one batch. Flushes any pending writes before reading.
Definition RedisBuffer.h:123
size_t max_size
Maximum number of elements in the buffer.
Definition RedisBuffer.h:225
A simple Buffer implementation which just uses a (dynamically sized) array.
Definition Buffers.h:172
A simple wrapper to provide string functions on existing allocated char*. If the underlying char* is ...
Definition StrView.h:28
virtual int length()
Definition StrView.h:383
virtual bool startsWith(const char *str)
checks if the string starts with the indicated substring
Definition StrView.h:171
virtual bool isEmpty()
checks if the string is empty
Definition StrView.h:386
virtual bool replace(const char *toReplace, const char *replaced)
Replaces the first instance of toReplace with replaced.
Definition StrView.h:392
virtual void substring(StrView &from, int start, int end)
copies a substring into the current string
Definition StrView.h:477
virtual const char * c_str()
provides the string value as const char*
Definition StrView.h:379
int toInt()
Converts the string to an int.
Definition StrView.h:575
virtual int indexOf(const char c, int start=0)
Definition StrView.h:260
Vector implementation which provides the most important methods as defined by std::vector....
Definition Vector.h:21
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition AudioCodecsBase.h:10
Definition RedisBuffer.h:214
Vector< String > strValues
String value parsed from the response (if any)
Definition RedisBuffer.h:217
bool ok
True if the response was valid and not an error.
Definition RedisBuffer.h:218
int intValue
Integer value parsed from the response (if any)
Definition RedisBuffer.h:215