arduino-audio-tools
Loading...
Searching...
No Matches
RedisBuffer.h
Go to the documentation of this file.
1#pragma once
3#include <Client.h>
4
5#define REDIS_RESULT_BUFFER_SIZE (10 * 1024)
6
7namespace audio_tools {
8
33template <typename T>
34class RedisBuffer : public BaseBuffer<T> {
35 public:
55
62
70 bool write(T data) override {
71 has_written = true;
72 write_buf.write(data);
73 if (write_buf.isFull()) {
74 flushWrite(); // flush any pending writes first
75 }
76 return true;
77 }
78
86 int writeArray(const T data[], int len) override {
87 LOGI("RedisBuffer:writeArray: %d", len);
88 has_written = true;
89 int written = 0;
90 for (int i = 0; i < len; ++i) {
91 write(data[i]);
92 ++written;
93 }
94 return written;
95 }
96
103 bool read(T& result) override {
104 flushWrite(); // flush any pending writes before reading
105 if (read_buf.isEmpty()) {
106 read_buf.reset();
107 fillReadBuffer(); // fill local buffer from Redis
108 if (read_buf.isEmpty()) {
109 LOGI("RedisBuffer:read: no data available");
110 return false; // nothing left in Redis
111 }
112 }
113 T val;
114 bool rc = read_buf.read(val); // read from local buffer
115 LOGI("Redis LPOP: %d", val);
116 result = val;
117 return rc;
118 }
119
127 int readArray(T data[], int len) override {
128 flushWrite(); // flush any pending writes before reading
129 int read_count = 0;
130 while (read_count < len) {
131 if (read_buf.isEmpty()) {
133 if (read_buf.isEmpty()) break; // nothing left in Redis
134 }
135 read_buf.read(data[read_count++]);
136 }
137 return read_count;
138 }
139
146 bool peek(T& result) override {
147 flushWrite(); // flush any pending writes before peeking
148
149 // Use LINDEX to peek at the first value in Redis without removing it
150 String cmd = redisCommand("LINDEX", key, "0");
152 if (!resp.ok) return false;
153 result = (T)resp.intValue;
154 return true;
155 }
156
161 void reset() override {
162 flushWrite();
163 String cmd = redisCommand("DEL", key);
164 auto rc = sendCommand(cmd);
165 LOGI("Redis DEL: %d", rc.intValue);
166 read_buf.reset();
168 }
169
175 int available() override {
176 flushWrite();
177 String cmd = redisCommand("LLEN", key);
179 LOGI("LLEN: %d (ok=%d)", resp.intValue, resp.ok);
180 return resp.intValue + read_buf.available();
181 }
182
189 int availableForWrite() override { return max_size - available(); }
190
196 T* address() override { return nullptr; }
197
202 size_t size() override { return max_size; }
203
210 bool resize(size_t size) override {
211 if (has_written) return false;
212 LOGI("RedisBuffer::resize: %d", size);
213 max_size = size;
214 return true;
215 }
216
217 protected:
218 struct RedisResult {
219 int intValue = 0;
222 bool ok = false;
223 operator bool() const { return ok; }
224 // std::vector<T> values;
225 };
226
228 const char* key;
229 size_t max_size;
234 bool has_written = false;
235
237 while (client.available()) {
238 client.read(); // clear any remaining data in the buffer
239 }
240 }
241
250 String redisCommand(const String& cmd, const String& arg1 = "",
251 const String& arg2 = "", const String& arg3 = "") {
252 String out = "*" +
253 String(1 + (arg1.length() > 0) + (arg2.length() > 0) +
254 (arg3.length() > 0)) +
255 "\r\n";
256 out += "$" + String(cmd.length()) + "\r\n" + cmd + "\r\n";
257 if (arg1.length())
258 out += "$" + String(arg1.length()) + "\r\n" + arg1 + "\r\n";
259 if (arg2.length())
260 out += "$" + String(arg2.length()) + "\r\n" + arg2 + "\r\n";
261 if (arg3.length())
262 out += "$" + String(arg3.length()) + "\r\n" + arg3 + "\r\n";
263 return out;
264 }
265
272 RedisResult result;
273 if (!client.connected()) {
274 LOGE("Redis not connected");
275 result.ok = false;
276 return result;
277 }
278 client.print(cmd);
279 client.flush();
280 result = readResponse();
281 return result;
282 }
283
290 RedisResult result;
291 result.ok = true;
293 int n = 0;
294 while (n <= 0) {
295 n = client.read(buffer, sizeof(buffer));
296 }
297 buffer[n] = 0;
298
299 // build vector of strings
300 result.strValues.clear();
301 String tail = (char*)buffer;
302 int nl_pos = tail.indexOf("\r\n");
303 while (nl_pos >= 0) {
304 String head = tail.substring(0, nl_pos);
305 if (!head.startsWith("$")) result.strValues.push_back(head);
306 tail = tail.substring(nl_pos + 2);
307 tail.trim();
308 nl_pos = tail.indexOf("\r\n");
309 }
310
311 if (!tail.startsWith("$") && tail.length() > 0) {
312 result.strValues.push_back(tail);
313 }
314
315 // if we have more then 2 lines this is a success
316 if (result.strValues.size() > 2) {
317 result.intValue = result.strValues.size();
318 } else {
319 // Else try to determine an int value
320 StrView line((char*)buffer, sizeof(buffer), n);
321
323 if (line.startsWith("$")) {
324 int end = line.indexOf("\n");
325 line.substring(line.c_str(), end, line.length());
326 }
327
329 if (line.startsWith(":")) {
330 line.replace(":", "");
331 }
332
333 if (line.startsWith("-")) {
334 result.ok = false;
335 }
336
337 if (line.isEmpty()) {
338 result.intValue = -1; // no data available
339 result.ok = false;
340 } else {
341 result.intValue = line.toInt();
342 }
343 }
344
345 return result;
346 }
347
352 void flushWrite() {
353 if (write_buf.isEmpty()) return;
355 // Use RPUSH with multiple arguments
356 String cmd = "*" + String(2 + write_size) + "\r\n";
357 cmd += "$5\r\nRPUSH\r\n";
358 cmd += "$" + String(strlen(key)) + "\r\n" + key + "\r\n";
359 T value;
360 while (!write_buf.isEmpty()) {
361 write_buf.read(value);
362 String sval = String(value);
363 cmd += "$" + String(sval.length()) + "\r\n" + sval + "\r\n";
364 }
367 LOGI("Redis RPUSH %d entries: %d (ok=%d)", write_size, resp.intValue,
368 resp.ok);
369
370 if (expire_seconds > 0) {
373 LOGI("Redis EXPIRE: %d (ok=%d)", resp.intValue, resp.ok);
374 }
375 }
376
382 read_buf.reset();
383 int size = read_buf.size();
384 LOGI("Redis LPOP: %d", size)
385 // Read up to local_buf_size items from Redis
386 String cmd = redisCommand("LPOP", key, String(size));
387 auto rc = sendCommand(cmd);
388 if (!rc.ok) {
389 LOGE("Redis LPOP failed: %s", cmd.c_str());
390 return; // no data available
391 }
392 for (auto& str : rc.strValues) {
393 if (str.startsWith("*")) continue;
394 if (str.startsWith("$")) continue;
395 if (str.length() == 0) continue;
396 LOGI("- %s", str.c_str());
397 T value = (T)str.toInt();
398 read_buf.write(value);
399 }
400
401 LOGI("RedisBuffer: %d of %d items", (int)read_buf.available(),
402 (int)read_buf.size());
403 // if this fails the
404 if (!read_buf.isFull()) {
405 LOGW("RedisBuffer:fillReadBuffer: not enough data read from Redis");
406 }
407 }
408};
409
410} // namespace audio_tools
#define LOGW(...)
Definition AudioLoggerIDF.h:29
#define LOGI(...)
Definition AudioLoggerIDF.h:28
#define LOGE(...)
Definition AudioLoggerIDF.h:30
#define REDIS_RESULT_BUFFER_SIZE
Definition RedisBuffer.h:5
Definition Arduino.h:162
bool connected()
Definition Arduino.h:167
virtual int read(uint8_t *buffer, size_t len)
Definition Arduino.h:165
virtual void flush()
Definition Arduino.h:130
virtual int available()
Definition Arduino.h:139
Shared functionality of all buffers.
Definition Buffers.h:23
void clear()
same as reset
Definition Buffers.h:96
bool isEmpty()
Definition Buffers.h:87
Buffer implementation that stores and retrieves data from a Redis server using the Arduino Client.
Definition RedisBuffer.h:34
SingleBuffer< T > write_buf
Local buffer for pending writes.
Definition RedisBuffer.h:232
size_t size() override
Returns the maximum capacity of the buffer.
Definition RedisBuffer.h:202
Client & client
Reference to the Arduino Client for Redis communication.
Definition RedisBuffer.h:227
RedisBuffer(Client &client, const char *key, size_t max_size, size_t local_buf_size=512, int expire_seconds=60 *60)
Constructs a RedisBuffer.
Definition RedisBuffer.h:46
void flushWrite()
Flushes buffered writes to Redis using RPUSH and sets expiration if configured.
Definition RedisBuffer.h:352
String redisCommand(const String &cmd, const String &arg1="", const String &arg2="", const String &arg3="")
Constructs a Redis command in RESP format.
Definition RedisBuffer.h:250
bool peek(T &result) override
Peeks at the next value in Redis directly (no local buffer). Flushes any pending writes before peekin...
Definition RedisBuffer.h:146
RedisResult sendCommand(const String &cmd)
Sends a command to the Redis server and returns the parsed result.
Definition RedisBuffer.h:271
bool read(T &result) override
Reads a single value from Redis directly (no local buffer). Flushes any pending writes before reading...
Definition RedisBuffer.h:103
void setExpire(int seconds)
Sets the expiration time (in seconds) for the Redis key. The expiration will be refreshed on every wr...
Definition RedisBuffer.h:61
bool write(T data) override
Buffers a single value for writing to Redis. Data is only sent to Redis when the local buffer is full...
Definition RedisBuffer.h:70
int available() override
Returns the number of elements available to read (local + Redis). Flushes any pending writes before c...
Definition RedisBuffer.h:175
void fillReadBuffer()
Fills the local read buffer from Redis using LRANGE. After reading, removes the items from Redis usin...
Definition RedisBuffer.h:381
int availableForWrite() override
Returns the number of elements that can be written before reaching max_size. There are are no checks ...
Definition RedisBuffer.h:189
SingleBuffer< T > read_buf
Local buffer for pending reads.
Definition RedisBuffer.h:233
T * address() override
Returns the address of the start of the physical read buffer (not supported).
Definition RedisBuffer.h:196
int expire_seconds
Expiration time in seconds (0 = no expiration).
Definition RedisBuffer.h:231
size_t local_buf_size
Local buffer size for batching.
Definition RedisBuffer.h:230
void clearResponse()
Definition RedisBuffer.h:236
bool has_written
True if any write operation has occurred.
Definition RedisBuffer.h:234
int writeArray(const T data[], int len) override
Writes multiple values to Redis in one batch. Flushes any pending writes before sending the new data.
Definition RedisBuffer.h:86
bool resize(size_t size) override
Resizes the maximum buffer size. This operation is only allowed before any write has occurred.
Definition RedisBuffer.h:210
const char * key
Redis key for the buffer.
Definition RedisBuffer.h:228
RedisResult readResponse()
Reads a single line response from the Redis server and parses it into a RedisResult.
Definition RedisBuffer.h:289
void reset() override
Clears the buffer both locally and on the Redis server. Flushes any pending writes before clearing.
Definition RedisBuffer.h:161
int readArray(T data[], int len) override
Reads multiple values from the buffer in one batch. Flushes any pending writes before reading.
Definition RedisBuffer.h:127
size_t max_size
Maximum number of elements in the buffer.
Definition RedisBuffer.h:229
A simple Buffer implementation which just uses a (dynamically sized) array.
Definition Buffers.h:184
size_t size() override
Definition Buffers.h:315
bool write(T sample) override
write add an entry to the buffer
Definition Buffers.h:218
bool read(T &result) override
reads a single value
Definition Buffers.h:227
int available() override
provides the number of entries that are available to read
Definition Buffers.h:245
bool isFull() override
checks if the buffer is full
Definition Buffers.h:252
void reset() override
clears the buffer
Definition Buffers.h:298
A simple wrapper to provide string functions on existing allocated char*. If the underlying char* is ...
Definition StrView.h:28
Vector implementation which provides the most important methods as defined by std::vector....
Definition Vector.h:21
void push_back(T &&value)
Definition Vector.h:182
void clear()
Definition Vector.h:176
int size()
Definition Vector.h:178
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition AudioCodecsBase.h:10
size_t writeData(Print *p_out, T *data, int samples, int maxSamples=512)
Definition AudioTypes.h:508
Definition RedisBuffer.h:218
Vector< String > strValues
String value parsed from the response (if any)
Definition RedisBuffer.h:221
bool ok
True if the response was valid and not an error.
Definition RedisBuffer.h:222
int intValue
Integer value parsed from the response (if any)
Definition RedisBuffer.h:219