arduino-audio-tools
Loading...
Searching...
No Matches
ESPNowStream.h
1#pragma once
2#include <WiFi.h>
3#include <esp_now.h>
4#include <esp_wifi.h>
5
6#include "AudioTools/Concurrency/RTOS.h"
7#include "AudioTools/CoreAudio/AudioBasic/StrView.h"
8#include "AudioTools/CoreAudio/BaseStream.h"
9
10namespace audio_tools {
11
12// forward declarations
13class ESPNowStream;
14static ESPNowStream* ESPNowStreamSelf = nullptr;
15static const char* BROADCAST_MAC_STR = "FF:FF:FF:FF:FF:FF";
16static const uint8_t BROADCAST_MAC[6] = {0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF};
17
24 wifi_mode_t wifi_mode = WIFI_STA;
25 const char* mac_address = nullptr;
26 uint16_t buffer_size = ESP_NOW_MAX_DATA_LEN;
27 uint16_t buffer_count = 400;
28 int channel = 0;
29 const char* ssid = nullptr;
30 const char* password = nullptr;
31 bool use_send_ack = true; // we wait for
32 uint16_t delay_after_failed_write_ms = 2000;
33 int write_retry_count = 1; // -1 endless
34#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 0, 0)
35 void (*recveive_cb)(const esp_now_recv_info* info, const uint8_t* data,
36 int data_len) = nullptr;
37#else
38 void (*recveive_cb)(const uint8_t* mac_addr, const uint8_t* data,
39 int data_len) = nullptr;
40#endif
42 const char* primary_master_key = nullptr;
43 const char* local_master_key = nullptr;
45 wifi_phy_rate_t rate = WIFI_PHY_RATE_2M_S;
48 uint32_t ack_semaphore_timeout_ms = portMAX_DELAY;
49};
50
60class ESPNowStream : public BaseStream {
61 public:
62 ESPNowStream() { ESPNowStreamSelf = this; };
63
65 if (xSemaphore != nullptr) vSemaphoreDelete(xSemaphore);
66 }
67
68 ESPNowStreamConfig defaultConfig() {
69 ESPNowStreamConfig result;
70 return result;
71 }
72
74 const char* macAddress() {
75 static String mac_str = WiFi.macAddress();
76 return mac_str.c_str();
77 }
78
80 void setSendCallback(esp_now_send_cb_t cb) { send = cb; }
81
84 void setReceiveCallback(esp_now_recv_cb_t cb) { receive = cb; }
85
87 bool begin() { return begin(cfg); }
88
91 this->cfg = cfg;
92 WiFi.mode(cfg.wifi_mode);
93 // set mac address
94 if (cfg.mac_address != nullptr) {
95 LOGI("setting mac %s", cfg.mac_address);
96 byte mac[ESP_NOW_KEY_LEN];
97 str2mac(cfg.mac_address, mac);
98 if (esp_wifi_set_mac((wifi_interface_t)getInterface(), mac) != ESP_OK) {
99 LOGE("Could not set mac address");
100 return false;
101 }
102 delay(500); // On some boards calling macAddress to early leads to a race
103 // condition.
104 // checking if address has been updated
105 const char* addr = macAddress();
106 if (strcmp(addr, cfg.mac_address) != 0) {
107 LOGE("Wrong mac address: %s", addr);
108 return false;
109 }
110 }
111
112 if (WiFi.status() != WL_CONNECTED && cfg.ssid != nullptr &&
113 cfg.password != nullptr) {
114 WiFi.begin(cfg.ssid, cfg.password);
115 while (WiFi.status() != WL_CONNECTED) {
116 Serial.print('.');
117 delay(1000);
118 }
119 }
120
121#if ESP_IDF_VERSION < ESP_IDF_VERSION_VAL(5, 0, 0)
122 LOGI("Setting ESP-NEW rate");
123 if (esp_wifi_config_espnow_rate(getInterface(), cfg.rate) != ESP_OK) {
124 LOGW("Could not set rate");
125 }
126#endif
127
128 Serial.println();
129 Serial.print("mac: ");
130 Serial.println(WiFi.macAddress());
131 return setup();
132 }
133
135 void end() {
136 if (is_init) {
137 if (esp_now_deinit() != ESP_OK) {
138 LOGE("esp_now_deinit");
139 }
140 if (buffer.size() > 0) buffer.resize(0);
141 is_init = false;
142 has_peers = false;
143 read_ready = false;
144 is_broadcast = false;
145 }
146 }
147
149 bool addPeer(esp_now_peer_info_t& peer) {
150 if (!is_init) {
151 LOGE("addPeer before begin");
152 return false;
153 }
154 if (memcmp(BROADCAST_MAC, peer.peer_addr, 6) == 0) {
155 LOGI("Using broadcast");
156 is_broadcast = true;
157 }
158 esp_err_t result = esp_now_add_peer(&peer);
159 if (result == ESP_OK) {
160 LOGI("addPeer: %s", mac2str(peer.peer_addr));
161 has_peers = true;
162 } else {
163 LOGE("addPeer: %d", result);
164 }
165 return result == ESP_OK;
166 }
167
169 template <size_t size>
170 bool addPeers(const char* (&array)[size]) {
171 bool result = true;
172 for (int j = 0; j < size; j++) {
173 const char* peer = array[j];
174 if (peer != nullptr) {
175 if (!addPeer(peer)) {
176 result = false;
177 }
178 }
179 }
180 return result;
181 }
182
184 bool addPeer(const char* address) {
185 esp_now_peer_info_t peer;
186 peer.channel = cfg.channel;
187
188 peer.ifidx = getInterface();
189 peer.encrypt = false;
190
191 if (StrView(address).equals(cfg.mac_address)) {
192 LOGW("Did not add own address as peer");
193 return true;
194 }
195
196 if (isEncrypted()) {
197 peer.encrypt = true;
198 strncpy((char*)peer.lmk, cfg.local_master_key, 16);
199 }
200
201 if (!str2mac(address, peer.peer_addr)) {
202 LOGE("addPeer - Invalid address: %s", address);
203 return false;
204 }
205 return addPeer(peer);
206 }
207
211 if (cfg.use_send_ack) {
212 LOGW("Broadcast peer does not support use_send_ack");
213 cfg.use_send_ack = false;
214 }
215 return addPeer(BROADCAST_MAC_STR);
216 }
217
219 size_t write(const uint8_t* data, size_t len) override {
220 // nullptr means send to all registered peers
221 return write((const uint8_t*)nullptr, data, len);
222 }
223
225 size_t write(const char* peer, const uint8_t* data, size_t len) {
226 uint8_t mac[6];
227 if (!str2mac(peer, mac)) {
228 LOGE("write: invalid mac address %s", peer);
229 return 0;
230 }
231 return write(mac, data, len);
232 }
233
235 size_t write(const uint8_t* peer, const uint8_t* data, size_t len) {
236 // initialization: setup semaphore
237 setupSemaphore();
238
239 // initialization: if no peers registered and peer is nullptr, add broadcast
240 if (!has_peers && peer == nullptr) {
242 }
243
244 size_t total_sent = 0;
245 size_t remaining = len;
246
247 while (remaining > 0) {
248 size_t chunk_size = min(remaining, (size_t)ESP_NOW_MAX_DATA_LEN);
249 int retry_count = 0;
250
251 bool success =
252 sendPacket(data + total_sent, chunk_size, retry_count, peer);
253
254 if (success) {
255 // Chunk sent successfully
256 total_sent += chunk_size;
257 remaining -= chunk_size;
258 } else {
259 // Max retries exceeded for this chunk
260 LOGE(
261 "write: failed to send chunk after %d attempts (sent %zu/%zu "
262 "bytes)",
263 retry_count, total_sent, len);
264 // Return bytes successfully sent so far (may be 0 if first chunk
265 // failed)
266 return total_sent;
267 }
268 }
269 return total_sent;
270 }
271
273 size_t readBytes(uint8_t* data, size_t len) override {
274 if (!read_ready) return 0;
275 if (buffer.size() == 0) return 0;
276 return buffer.readArray(data, len);
277 }
278
279 int available() override {
280 if (!buffer) return 0;
281 if (!read_ready) return 0;
282 return buffer.size() == 0 ? 0 : buffer.available();
283 }
284
285 int availableForWrite() override {
286 if (!buffer) return 0;
287 return cfg.use_send_ack ? available_to_write : cfg.buffer_size;
288 }
289
292 int size = buffer.size();
293 // prevent div by 0
294 if (size == 0) return 0.0;
295 // calculate percent
296 return 100.0 * buffer.available() / size;
297 }
298
300 BufferRTOS<uint8_t>& getBuffer() { return buffer; }
301
303 uint32_t getLastIoSuccessTime() const { return last_io_success_time; }
304
305 protected:
307 BufferRTOS<uint8_t> buffer{0};
308 esp_now_recv_cb_t receive = default_recv_cb;
309 esp_now_send_cb_t send = default_send_cb;
310 volatile size_t available_to_write = 0;
311 volatile bool last_send_success = true;
312 bool is_init = false;
313 SemaphoreHandle_t xSemaphore = nullptr;
314 bool has_peers = false;
315 bool read_ready = false;
316 bool is_broadcast = false;
317 uint32_t last_io_success_time = 0;
318
319 inline void setupSemaphore() {
320 // use semaphore for confirmations
321 if (cfg.use_send_ack && xSemaphore == nullptr) {
322 xSemaphore = xSemaphoreCreateBinary();
323 xSemaphoreGive(xSemaphore);
324 }
325 }
326
327 inline void setupReceiveBuffer() {
328 // setup receive buffer
329 if (!buffer) {
330 LOGI("setupReceiveBuffer: %d", cfg.buffer_size * cfg.buffer_count);
331 buffer.resize(cfg.buffer_size * cfg.buffer_count);
332 }
333 }
334
335 inline void resetAvailableToWrite() {
336 if (cfg.use_send_ack) {
337 available_to_write = 0;
338 }
339 }
340
342 bool sendPacket(const uint8_t* data, size_t len, int& retry_count,
343 const uint8_t* destination = nullptr) {
344 TRACED();
345 const uint8_t* target = destination;
346 if (target == nullptr && is_broadcast) {
347 target = BROADCAST_MAC;
348 }
349
350 while (true) {
351 resetAvailableToWrite();
352
353 // Wait for previous send to complete (if using ACKs)
354 if (cfg.use_send_ack) {
355 TickType_t ticks = (cfg.ack_semaphore_timeout_ms == portMAX_DELAY)
356 ? portMAX_DELAY
357 : pdMS_TO_TICKS(cfg.ack_semaphore_timeout_ms);
358 if (xSemaphoreTake(xSemaphore, ticks) != pdTRUE) {
359 // Timeout waiting for previous send - check retry limit BEFORE
360 // incrementing
361 if (cfg.write_retry_count >= 0 &&
362 retry_count >= cfg.write_retry_count) {
363 LOGE("Timeout waiting for ACK semaphore after %d retries",
364 retry_count);
365 return false;
366 }
367 retry_count++;
368 LOGW("ACK semaphore timeout (attempt %d)", retry_count);
369 delay(cfg.delay_after_failed_write_ms);
370 continue;
371 }
372 }
373
374 // Try to queue the packet
375 esp_err_t rc = esp_now_send(target, data, len);
376
377 if (rc == ESP_OK) {
378 // Packet queued - wait for transmission result
379 if (handleTransmissionResult(retry_count)) {
380 return true; // Success
381 }
382 // Transmission failed - check if we've exceeded the limit
383 // handleTransmissionResult returns false both when limit is reached
384 // and when we should retry, so check the limit here
385 if (cfg.write_retry_count >= 0 &&
386 retry_count >= cfg.write_retry_count) {
387 return false; // Give up - limit reached
388 }
389 // Continue to retry
390 } else {
391 // Failed to queue - callback will NOT be called
392 if (cfg.use_send_ack) {
393 xSemaphoreGive(xSemaphore); // Give back semaphore
394 }
395
396 // Check limit BEFORE incrementing
397 if (cfg.write_retry_count >= 0 &&
398 retry_count >= cfg.write_retry_count) {
399 LOGE("esp_now_send queue error (rc=%d/0x%04X) after %d retries", rc,
400 rc, retry_count);
401 return false;
402 }
403
404 retry_count++;
405 LOGW("esp_now_send failed (rc=%d/0x%04X) - retrying (attempt %d)", rc,
406 rc, retry_count);
407 delay(cfg.delay_after_failed_write_ms);
408 }
409 }
410 }
411
413 bool handleTransmissionResult(int& retry_count) {
414 TRACED();
415 if (cfg.use_send_ack) {
416 // Wait for callback to signal result
417 TickType_t ticks = (cfg.ack_semaphore_timeout_ms == portMAX_DELAY)
418 ? portMAX_DELAY
419 : pdMS_TO_TICKS(cfg.ack_semaphore_timeout_ms);
420 if (xSemaphoreTake(xSemaphore, ticks) != pdTRUE) {
421 // Callback never came - check limit BEFORE incrementing
422 if (cfg.write_retry_count >= 0 &&
423 retry_count >= cfg.write_retry_count) {
424 LOGE("Transmission callback timeout after %d retries", retry_count);
425 return false;
426 }
427 retry_count++;
428 LOGW("Transmission callback timeout (attempt %d)", retry_count);
429 delay(cfg.delay_after_failed_write_ms);
430 return false; // Retry
431 }
432
433 // Got callback - check result
434 if (last_send_success) {
435 xSemaphoreGive(xSemaphore); // Release for next send
436 return true;
437 } else {
438 xSemaphoreGive(xSemaphore); // Release for retry
439
440 // Check limit BEFORE incrementing
441 if (cfg.write_retry_count >= 0 &&
442 retry_count >= cfg.write_retry_count) {
443 LOGE("Transmission failed after %d retries", retry_count);
444 return false;
445 }
446
447 retry_count++;
448 LOGI("Transmission failed - retrying (attempt %d)", retry_count);
449 delay(cfg.delay_after_failed_write_ms);
450 return false; // Retry
451 }
452 }
453 return true; // No ACK mode - assume success
454 }
455
457 bool handleQueueError(esp_err_t rc, int& retry_count) {
458 TRACED();
459 // esp_now_send failed to queue - callback will NOT be called
460 // Give back the semaphore we took earlier
461 if (cfg.use_send_ack) {
462 xSemaphoreGive(xSemaphore);
463 }
464
465 retry_count++;
466 LOGW("esp_now_send failed to queue (rc=%d/0x%04X) - retrying (attempt %d)",
467 rc, rc, retry_count);
468
469 if (cfg.write_retry_count >= 0 && retry_count > cfg.write_retry_count) {
470 LOGE("Send queue error after %d retries", retry_count);
471 return false;
472 }
473
474 delay(cfg.delay_after_failed_write_ms);
475 return true; // Continue retrying
476 }
477
478 bool isEncrypted() {
479 return cfg.primary_master_key != nullptr && cfg.local_master_key != nullptr;
480 }
481
482 wifi_interface_t getInterface() {
483 // define wifi_interface_t
484 wifi_interface_t result;
485 switch (cfg.wifi_mode) {
486 case WIFI_STA:
487 result = (wifi_interface_t)ESP_IF_WIFI_STA;
488 break;
489 case WIFI_AP:
490 result = (wifi_interface_t)ESP_IF_WIFI_AP;
491 break;
492 default:
493 result = (wifi_interface_t)0;
494 break;
495 }
496 return result;
497 }
498
500 bool setup() {
501 esp_err_t result = esp_now_init();
502 if (result == ESP_OK) {
503 LOGI("esp_now_init: %s", macAddress());
504 } else {
505 LOGE("esp_now_init: %d", result);
506 }
507
508 // encryption is optional
509 if (isEncrypted()) {
510 esp_now_set_pmk((uint8_t*)cfg.primary_master_key);
511 }
512
513 if (cfg.recveive_cb != nullptr) {
514 esp_now_register_recv_cb(cfg.recveive_cb);
515 } else {
516 esp_now_register_recv_cb(receive);
517 }
518 if (cfg.use_send_ack) {
519 esp_now_register_send_cb(send);
520 }
521 available_to_write = cfg.buffer_size;
522 is_init = result == ESP_OK;
523 return is_init;
524 }
525
526 bool str2mac(const char* mac, uint8_t* values) {
527 sscanf(mac, "%hhx:%hhx:%hhx:%hhx:%hhx:%hhx", &values[0], &values[1],
528 &values[2], &values[3], &values[4], &values[5]);
529 return strlen(mac) == 17;
530 }
531
532 const char* mac2str(const uint8_t* array) {
533 static char macStr[18];
534 memset(macStr, 0, 18);
535 snprintf(macStr, 18, "%02x:%02x:%02x:%02x:%02x:%02x", array[0], array[1],
536 array[2], array[3], array[4], array[5]);
537 return (const char*)macStr;
538 }
539
540 static int bufferAvailableForWrite() {
541 return ESPNowStreamSelf->buffer.availableForWrite();
542 }
543
544#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 0, 0)
545 static void default_recv_cb(const esp_now_recv_info* info,
546 const uint8_t* data, int data_len)
547#else
548 static void default_recv_cb(const uint8_t* mac_addr, const uint8_t* data,
549 int data_len)
550#endif
551 {
552 LOGD("rec_cb: %d", data_len);
553 // make sure that the receive buffer is available - moved from begin to make
554 // sure that it is only allocated when needed
555 ESPNowStreamSelf->setupReceiveBuffer();
556
557 // update last io time
558 ESPNowStreamSelf->last_io_success_time = millis();
559
560 // blocking write
561 size_t result = ESPNowStreamSelf->buffer.writeArray(data, data_len);
562 if (result != data_len) {
563 LOGE("writeArray %d -> %d", data_len, result);
564 }
565 // manage ready state
566 if (ESPNowStreamSelf->read_ready == false) {
567 if (ESPNowStreamSelf->cfg.start_read_threshold_percent == 0) {
568 ESPNowStreamSelf->read_ready = true;
569 } else {
570 float percent = ESPNowStreamSelf->getBufferPercent();
571 ESPNowStreamSelf->read_ready =
572 percent >= ESPNowStreamSelf->cfg.start_read_threshold_percent;
573 }
574 }
575 }
576
577#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 5, 0)
578
579 static void default_send_cb(const wifi_tx_info_t* tx_info,
580 esp_now_send_status_t status) {
581 const uint8_t* mac_addr = tx_info->des_addr;
582#else
583 static void default_send_cb(const uint8_t* mac_addr,
584 esp_now_send_status_t status) {
585#endif
586 static uint8_t first_mac[ESP_NOW_KEY_LEN] = {0};
587 // we use the first confirming mac_addr for further confirmations and
588 // ignore others
589 if (first_mac[0] == 0) {
590 strncpy((char*)first_mac, (char*)mac_addr, ESP_NOW_KEY_LEN);
591 }
592 LOGD("default_send_cb - %s -> %s", ESPNowStreamSelf->mac2str(mac_addr),
593 status == ESP_NOW_SEND_SUCCESS ? "+" : "-");
594
595 // ignore others
596 if (strncmp((char*)mac_addr, (char*)first_mac, ESP_NOW_KEY_LEN) == 0) {
597 ESPNowStreamSelf->available_to_write = ESPNowStreamSelf->cfg.buffer_size;
598
599 // Track send success/failure
600 ESPNowStreamSelf->last_send_success = (status == ESP_NOW_SEND_SUCCESS);
601
602 if (status == ESP_NOW_SEND_SUCCESS) {
603 ESPNowStreamSelf->last_io_success_time = millis();
604 } else {
605 LOGI(
606 "Send Error to %s! Status: %d (Possible causes: out of range, "
607 "receiver busy/offline, channel mismatch, or buffer full)",
608 ESPNowStreamSelf->mac2str(mac_addr), status);
609 }
610
611 // Release semaphore to allow write to check status and retry if needed
612 xSemaphoreGive(ESPNowStreamSelf->xSemaphore);
613 }
614 }
615};
616
617} // namespace audio_tools
Base class for all Streams. It relies on write(const uint8_t *buffer, size_t size) and readBytes(uint...
Definition BaseStream.h:36
Buffer implementation which is using a FreeRTOS StreamBuffer. The default allocator uses psram is ava...
Definition BufferRTOS.h:30
int available() override
provides the number of entries that are available to read
Definition BufferRTOS.h:137
int availableForWrite() override
provides the number of entries that are available to write
Definition BufferRTOS.h:142
int writeArray(const T data[], int len)
Fills the buffer data.
Definition BufferRTOS.h:95
bool resize(size_t size)
Re-Allocats the memory and the queue.
Definition BufferRTOS.h:51
int readArray(T data[], int len)
reads multiple values
Definition BufferRTOS.h:77
ESPNow as Arduino Stream. When use_send_ack is true we prevent any buffer overflows by blocking write...
Definition ESPNowStream.h:60
BufferRTOS< uint8_t > & getBuffer()
provides access to the receive buffer
Definition ESPNowStream.h:300
bool sendPacket(const uint8_t *data, size_t len, int &retry_count, const uint8_t *destination=nullptr)
Sends a single packet with retry logic.
Definition ESPNowStream.h:342
bool handleTransmissionResult(int &retry_count)
Handles the result of packet transmission (after queuing)
Definition ESPNowStream.h:413
bool handleQueueError(esp_err_t rc, int &retry_count)
Handles errors when queuing packets.
Definition ESPNowStream.h:457
void setSendCallback(esp_now_send_cb_t cb)
Defines an alternative send callback.
Definition ESPNowStream.h:80
size_t readBytes(uint8_t *data, size_t len) override
Reeds the data from the peers.
Definition ESPNowStream.h:273
bool begin()
Initialization of ESPNow.
Definition ESPNowStream.h:87
uint32_t getLastIoSuccessTime() const
time when we were able to send or receive the last packet successfully
Definition ESPNowStream.h:303
bool addPeer(const char *address)
Adds a peer to which we can send info or from which we can receive info.
Definition ESPNowStream.h:184
size_t write(const uint8_t *data, size_t len) override
Writes the data - sends it to all registered peers.
Definition ESPNowStream.h:219
bool addPeers(const char *(&array)[size])
Adds an array of.
Definition ESPNowStream.h:170
void setReceiveCallback(esp_now_recv_cb_t cb)
Definition ESPNowStream.h:84
const char * macAddress()
Returns the mac address of the current ESP32.
Definition ESPNowStream.h:74
bool addPeer(esp_now_peer_info_t &peer)
Adds a peer to which we can send info or from which we can receive info.
Definition ESPNowStream.h:149
void end()
DeInitialization.
Definition ESPNowStream.h:135
bool addBroadcastPeer()
Definition ESPNowStream.h:210
size_t write(const char *peer, const uint8_t *data, size_t len)
Writes the data - sends it to all the indicated peer mac address string.
Definition ESPNowStream.h:225
float getBufferPercent()
provides how much the receive buffer is filled (in percent)
Definition ESPNowStream.h:291
size_t write(const uint8_t *peer, const uint8_t *data, size_t len)
Writes the data - sends it to all the peers.
Definition ESPNowStream.h:235
bool begin(ESPNowStreamConfig cfg)
Initialization of ESPNow incl WIFI.
Definition ESPNowStream.h:90
bool setup()
Initialization.
Definition ESPNowStream.h:500
A simple wrapper to provide string functions on existing allocated char*. If the underlying char* is ...
Definition StrView.h:28
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition AudioCodecsBase.h:10
uint32_t millis()
Returns the milliseconds since the start.
Definition Time.h:12
Configuration for ESP-NOW protocolö.W.
Definition ESPNowStream.h:23
const char * primary_master_key
to encrypt set primary_master_key and local_master_key to 16 byte strings
Definition ESPNowStream.h:42
uint8_t start_read_threshold_percent
threshold in percent to start reading from buffer
Definition ESPNowStream.h:47
wifi_phy_rate_t rate
esp-now bit rate
Definition ESPNowStream.h:45