arduino-audio-tools
HLSStream.h
1 #pragma once
2 #include "AudioCodecs/AudioEncoded.h"
3 #include "AudioConfig.h"
4 
5 #ifdef USE_URL_ARDUINO
6 #include "AudioBasic/StrExt.h"
7 #include "AudioHttp/URLStream.h"
8 #include "Concurrency/Concurrency.h"
9 
10 #define MAX_HLS_LINE 512
11 #define START_URLS_LIMIT 4
12 #define HLS_BUFFER_COUNT 10
13 
14 namespace audio_tools {
15 
18  public:
19  virtual bool begin() = 0;
20  virtual void end() = 0;
21  virtual void addUrl(const char *url) = 0;
22  virtual int urlCount() = 0;
23  virtual int available() { return 0; }
24  virtual size_t readBytes(uint8_t *data, size_t len) { return 0; }
25  const char *contentType() { return nullptr; }
26  int contentLength() { return 0; }
27 
28  virtual void setBuffer(int size, int count) {}
29 };
30 
33  public:
34  URLLoaderHLSOutput(Print &out, int maxUrls = 20) {
35  max = maxUrls;
36  p_print = &out;
37  }
38  virtual bool begin() { return true; };
39  virtual void end(){};
40  virtual void addUrl(const char *url) {
41  LOGI("saving data for %s", url);
42  url_stream.begin(url);
43  url_stream.waitForData(500);
44  copier.begin(*p_print, url_stream);
45  int bytes_copied = copier.copyAll();
46  LOGI("Copied %d of %d", bytes_copied, url_stream.contentLength());
47  assert(bytes_copied == url_stream.contentLength());
48  url_stream.end();
49  }
50  virtual int urlCount() { return 0; }
51 
52  protected:
53  int count = 0;
54  int max = 20;
55  Print *p_print;
56  URLStream url_stream;
57  StreamCopy copier;
58 };
59 
60 /***
61  * @brief We feed the URLLoaderHLS with some url strings. The data of the
62  * related segments are provided via the readBytes() method.
63  * @author Phil Schatzmann
64  * @copyright GPLv3
65  */
66 
68  public:
69  // URLLoaderHLS(URLStream &stream) { p_stream = &stream; };
70  URLLoaderHLS() = default;
71 
72  ~URLLoaderHLS() { end(); }
73 
74  bool begin() override {
75  TRACED();
76 #if USE_TASK
77  buffer.resize(buffer_size * buffer_count);
78  task.begin(std::bind(&URLLoaderHLS::bufferRefill, this));
79 #else
80  buffer.resize(buffer_size * buffer_count);
81 #endif
82 
83  active = true;
84  return true;
85  }
86 
87  void end() override {
88  TRACED();
89 #if USE_TASK
90  task.end();
91 #endif
92  if (p_stream != nullptr) p_stream->end();
93  p_stream = nullptr;
94  buffer.clear();
95  active = false;
96  }
97 
99  void addUrl(const char *url) override {
100  LOGI("Adding %s", url);
101  Str url_str(url);
102  char *str = new char[url_str.length() + 1];
103  memcpy(str, url_str.c_str(), url_str.length() + 1);
104 #if USE_TASK
105  LockGuard lock_guard{mutex};
106 #endif
107  urls.push_back(str);
108  }
109 
112  int urlCount() override { return urls.size(); }
113 
115  int available() override {
116  if (!active) return 0;
117  TRACED();
118 #if !USE_TASK
119  bufferRefill();
120 #endif
121  return buffer.available();
122  }
123 
125  size_t readBytes(uint8_t *data, size_t len) override {
126  if (!active) return 0;
127  TRACED();
128 #if !USE_TASK
129  bufferRefill();
130 #endif
131  if (buffer.available() < len) LOGW("Buffer underflow");
132  return buffer.readArray(data, len);
133  }
134 
135  const char *contentType() {
136  if (p_stream == nullptr) return nullptr;
137  return p_stream->httpRequest().reply().get(CONTENT_TYPE);
138  }
139 
140  int contentLength() {
141  if (p_stream == nullptr) return 0;
142  return p_stream->contentLength();
143  }
144 
145  void setBuffer(int size, int count) override {
146  buffer_size = size;
147  buffer_count = count;
148  }
149 
150  protected:
151  Vector<const char *> urls{10};
152 #if USE_TASK
153  BufferRTOS<uint8_t> buffer{0};
154  Task task{"Refill", 1024 * 5, 1, 1};
155  Mutex mutex;
156 #else
157  RingBuffer<uint8_t> buffer{0};
158 #endif
159  bool active = false;
160  int buffer_size = DEFAULT_BUFFER_SIZE;
161  int buffer_count = HLS_BUFFER_COUNT;
162  URLStream default_stream;
163  URLStream *p_stream = &default_stream;
164  const char *url_to_play = nullptr;
165 
167  void bufferRefill() {
168  TRACED();
169  // we have nothing to do
170  if (urls.empty()) {
171  LOGD("urls empty");
172  delay(10);
173  return;
174  }
175  if (buffer.availableForWrite() == 0) {
176  LOGD("buffer full");
177  delay(10);
178  return;
179  }
180 
181  // switch current stream if we have no more data
182  if (!*p_stream && !urls.empty()) {
183  LOGD("Refilling");
184  if (url_to_play != nullptr) {
185  delete url_to_play;
186  }
187  url_to_play = urls[0];
188  LOGI("playing %s", url_to_play);
189  p_stream->setTimeout(5000);
190  p_stream->begin(url_to_play);
191  p_stream->waitForData(500);
192 #if USE_TASK
193  LockGuard lock_guard{mutex};
194 #endif
195  urls.pop_front();
196  // assert(urls[0]!=url);
197 
198 #ifdef ESP32
199  LOGI("Free heap: %u", (unsigned)ESP.getFreeHeap());
200 #endif
201  LOGI("Playing %s of %d", p_stream->urlStr(), (int)urls.size());
202  }
203 
204  int total = 0;
205  int failed = 0;
206  int to_write = min(buffer.availableForWrite(), DEFAULT_BUFFER_SIZE);
207  // try to keep the buffer filled
208  while (to_write > 0) {
209  uint8_t tmp[to_write] = {0};
210  int read = p_stream->readBytes(tmp, to_write);
211  total += read;
212  if (read > 0) {
213  failed = 0;
214  buffer.writeArray(tmp, read);
215  LOGI("buffer add %d -> %d:", read, buffer.available());
216 
217  to_write = min(buffer.availableForWrite(), DEFAULT_BUFFER_SIZE);
218  } else {
219  delay(10);
220  // this should not really happen
221  failed++;
222  LOGW("No data idx %d: available: %d", failed, p_stream->available());
223  if (failed >= 5) {
224  LOGE("No data idx %d: available: %d", failed, p_stream->available());
225  if (p_stream->available() == 0) p_stream->end();
226  break;
227  }
228  }
229  // After we processed all data we close the stream to get a new url
230  if (p_stream->totalRead() == p_stream->contentLength()) {
231  p_stream->end();
232  break;
233  }
234  LOGD("Refilled with %d now %d available to write", total,
235  buffer.availableForWrite());
236  }
237  }
238 };
239 
244 class URLHistory {
245  public:
246  bool add(const char *url) {
247  bool found = false;
248  Str url_str(url);
249  for (int j = 0; j < history.size(); j++) {
250  if (url_str.equals(history[j])) {
251  found = true;
252  break;
253  }
254  }
255  if (!found) {
256  char *str = new char[url_str.length() + 1];
257  memcpy(str, url, url_str.length() + 1);
258  history.push_back(str);
259  if (history.size() > 20) {
260  delete (history[0]);
261  history.pop_front();
262  }
263  }
264  return !found;
265  }
266 
267  void clear() { history.clear(); }
268 
269  int size() { return history.size(); }
270 
271  protected:
272  Vector<const char *> history;
273 };
274 
280 class HLSParser {
281  public:
282  // loads the index url
283  bool begin(const char *urlStr) {
284  index_url_str = urlStr;
285  return begin();
286  }
287 
288  bool begin() {
289  TRACEI();
290  custom_log_level.set();
291  segments_url_str = "";
292  bandwidth = 0;
293  if (!parseIndex()) {
294  TRACEE();
295  return false;
296  }
297  if (!parseSegments()) {
298  TRACEE();
299  return false;
300  }
301 
302  if (!p_url_loader->begin()) {
303  TRACEE();
304  return false;
305  }
306 
307 #if USE_TASK
308  segment_load_task.begin(std::bind(&HLSParser::reloadSegments, this));
309 #endif
310 
311  custom_log_level.reset();
312  return true;
313  }
314 
315  int available() {
316  TRACED();
317  int result = 0;
318  custom_log_level.set();
319 #if !USE_TASK
320  reloadSegments();
321 #endif
322  if (active) result = p_url_loader->available();
323  custom_log_level.reset();
324  return result;
325  }
326 
327  size_t readBytes(uint8_t *buffer, size_t len) {
328  TRACED();
329  size_t result = 0;
330  custom_log_level.set();
331 #if !USE_TASK
332  reloadSegments();
333 #endif
334  if (active) result = p_url_loader->readBytes(buffer, len);
335  custom_log_level.reset();
336  return result;
337  }
338 
339  const char *indexUrl() { return index_url_str; }
340 
341  const char *segmentsUrl() {
342  if (segments_url_str == nullptr) return nullptr;
343  return segments_url_str.c_str();
344  }
345 
347  const char *getCodec() { return codec.c_str(); }
348 
350  const char *contentType() { return p_url_loader->contentType(); }
351 
352  int contentLength() { return p_url_loader->contentLength(); }
353 
355  void end() {
356  TRACEI();
357 #if USE_TASK
358  segment_load_task.end();
359 #endif
360  codec.clear();
361  segments_url_str.clear();
362  url_stream.end();
363  p_url_loader->end();
364  url_history.clear();
365  active = false;
366  }
367 
369  void setUrlCount(int count) { url_count = count; }
370 
372  void setLogLevel(AudioLogger::LogLevel level) { custom_log_level.set(level); }
373 
374  void setBuffer(int size, int count) { p_url_loader->setBuffer(size, count); }
375 
376  void setUrlLoader(URLLoaderHLSBase &loader) { p_url_loader = &loader; }
377 
378  protected:
379  CustomLogLevel custom_log_level;
380  int bandwidth = 0;
381  int url_count = 5;
382  bool url_active = false;
383  bool is_extm3u = false;
384  StrExt codec;
385  StrExt segments_url_str;
386  StrExt url_str;
387  const char *index_url_str = nullptr;
388  URLStream url_stream;
389  URLLoaderHLS default_url_loader;
390  URLLoaderHLSBase *p_url_loader = &default_url_loader;
391  URLHistory url_history;
392 #if USE_TASK
393  Task segment_load_task{"Refill", 1024 * 5, 1, 1};
394 #endif
395  bool active = false;
396  bool parse_segments_active = false;
397  int media_sequence = 0;
398  int tartget_duration_ms = 5000;
399  int segment_count = 0;
400  uint64_t next_sement_load_time = 0;
401 
402  // trigger the reloading of segments if the limit is underflowing
403  void reloadSegments() {
404  TRACED();
405  // get new urls
406  if (!segments_url_str.isEmpty()) {
407  parseSegments();
408  }
409  }
410 
411  // parse the index file and the segments
412  bool parseIndex() {
413  TRACED();
414  url_stream.setTimeout(5000);
415  // url_stream.setConnectionClose(true);
416 
417  // we only update the content length
418  url_stream.setAutoCreateLines(false);
419  bool rc = url_stream.begin(index_url_str);
420  url_active = true;
421  rc = parseIndexLines();
422  return rc;
423  }
424 
425  // parse the index file
426  bool parseIndexLines() {
427  TRACEI();
428  char tmp[MAX_HLS_LINE];
429  bool result = true;
430  is_extm3u = false;
431 
432  // parse lines
433  memset(tmp, 0, MAX_HLS_LINE);
434  while (true) {
435  memset(tmp, 0, MAX_HLS_LINE);
436  size_t len =
437  url_stream.httpRequest().readBytesUntil('\n', tmp, MAX_HLS_LINE);
438  if (len == 0 && url_stream.available() == 0) break;
439  Str str(tmp);
440 
441  // check header
442  if (str.indexOf("#EXTM3U") >= 0) {
443  is_extm3u = true;
444  }
445 
446  if (is_extm3u) {
447  if (!parseIndexLine(str)) {
448  return false;
449  }
450  }
451  }
452  return result;
453  }
454 
455 
456  // parse the segment url provided by the index
457  bool parseSegments() {
458  TRACED();
459  if (parse_segments_active) {
460  return false;
461  }
462 
463  // make sure that we load at relevant schedule
464  if (millis() < next_sement_load_time && p_url_loader->urlCount() > 1) {
465  delay(1);
466  return false;
467  }
468  parse_segments_active = true;
469 
470  LOGI("Available urls: %d", p_url_loader->urlCount());
471 
472  if (url_stream) url_stream.clear();
473  LOGI("parsing %s", segments_url_str.c_str());
474 
475  if (segments_url_str.isEmpty()) {
476  TRACEE();
477  parse_segments_active = false;
478  return false;
479  }
480 
481  if (!url_stream.begin(segments_url_str.c_str())) {
482  TRACEE();
483  parse_segments_active = false;
484  return false;
485  }
486 
487  segment_count = 0;
488  if (!parseSegmentLines()) {
489  TRACEE();
490  parse_segments_active = false;
491  // do not display as erro
492  return true;
493  }
494 
495  next_sement_load_time = millis() + (segment_count * tartget_duration_ms);
496  // assert(segment_count > 0);
497 
498  // we request a minimum of collected urls to play before we start
499  if (url_history.size() > START_URLS_LIMIT) active = true;
500  parse_segments_active = false;
501 
502  return true;
503  }
504 
505  // parse the segments
506  bool parseSegmentLines() {
507  TRACEI();
508  char tmp[MAX_HLS_LINE];
509  bool result = true;
510  is_extm3u = false;
511 
512  // parse lines
513  memset(tmp, 0, MAX_HLS_LINE);
514  while (true) {
515  memset(tmp, 0, MAX_HLS_LINE);
516  size_t len =
517  url_stream.httpRequest().readBytesUntil('\n', tmp, MAX_HLS_LINE);
518  if (len == 0 && url_stream.available() == 0) break;
519  Str str(tmp);
520 
521  // check header
522  if (str.indexOf("#EXTM3U") >= 0) {
523  is_extm3u = true;
524  }
525 
526  if (is_extm3u) {
527  if (!parseSegmentLine(str)) {
528  return false;
529  }
530  }
531  }
532  return result;
533  }
534 
535  // Add all segments to queue
536  bool parseSegmentLine(Str &str) {
537  TRACED();
538  LOGI("> %s", str.c_str());
539 
540  int pos = str.indexOf("#");
541  if (pos >= 0) {
542  LOGI("-> Segment: %s", str.c_str());
543 
544  pos = str.indexOf("#EXT-X-MEDIA-SEQUENCE:");
545  if (pos >= 0) {
546  int new_media_sequence = atoi(str.c_str() + pos + 22);
547  LOGI("media_sequence: %d", new_media_sequence);
548  if (new_media_sequence == media_sequence) {
549  LOGW("MEDIA-SEQUENCE already loaded: %d", media_sequence);
550  return false;
551  }
552  media_sequence = new_media_sequence;
553  }
554 
555  pos = str.indexOf("#EXT-X-TARGETDURATION:");
556  if (pos >= 0) {
557  const char *duration_str = str.c_str() + pos + 22;
558  tartget_duration_ms = 1000 * atoi(duration_str);
559  LOGI("tartget_duration_ms: %d (%s)", tartget_duration_ms, duration_str);
560  }
561  } else {
562  segment_count++;
563  if (url_history.add(str.c_str())) {
564  // provide audio urls to the url_loader
565  if (str.startsWith("http")) {
566  url_str = str;
567  } else {
568  // we create the complete url
569  url_str = segments_url_str;
570  url_str.add("/");
571  url_str.add(str.c_str());
572  }
573  p_url_loader->addUrl(url_str.c_str());
574  } else {
575  LOGD("Duplicate ignored: %s", str.c_str());
576  }
577  }
578  return true;
579  }
580 
581  // Determine codec for min bandwidth
582  bool parseIndexLine(Str &str) {
583  TRACED();
584  LOGI("> %s", str.c_str());
585  int tmp_bandwidth;
586  if (str.indexOf("EXT-X-STREAM-INF") >= 0) {
587  // determine min bandwidth
588  int pos = str.indexOf("BANDWIDTH=");
589  if (pos > 0) {
590  Str num(str.c_str() + pos + 10);
591  tmp_bandwidth = num.toInt();
592  url_active = (tmp_bandwidth < bandwidth || bandwidth == 0);
593  if (url_active) {
594  bandwidth = tmp_bandwidth;
595  LOGD("-> bandwith: %d", bandwidth);
596  }
597  }
598 
599  pos = str.indexOf("CODECS=");
600  if (pos > 0) {
601  int start = pos + 8;
602  int end = str.indexOf('"', pos + 10);
603  codec.substring(str, start, end);
604  LOGI("-> codec: %s", codec.c_str());
605  }
606  }
607 
608  if (str.startsWith("http")) {
609  // check if we have a valid codec
610  segments_url_str.set(str);
611  LOGD("segments_url_str = %s", str.c_str());
612  }
613 
614  return true;
615  }
616 };
617 
626 class HLSStream : public AudioStream {
627  public:
628  HLSStream() = default;
629 
630  HLSStream(const char *ssid, const char *password) {
631  setSSID(ssid);
632  setPassword(password);
633  }
634 
635  bool begin(const char *urlStr) {
636  TRACEI();
637  login();
638  // parse the url to the HLS
639  bool rc = parser.begin(urlStr);
640  return rc;
641  }
642 
643  bool begin() {
644  TRACEI();
645  login();
646  bool rc = parser.begin();
647  return rc;
648  }
649 
650  // ends the request
651  void end() { parser.end(); }
652 
654  void setSSID(const char *ssid) { this->ssid = ssid; }
655 
657  void setPassword(const char *password) { this->password = password; }
658 
660  const char *codec() { return parser.getCodec(); }
661 
662  const char *contentType() { return parser.contentType(); }
663 
664  int contentLength() { return parser.contentLength(); }
665 
666  int available() override {
667  TRACED();
668  return parser.available();
669  }
670 
671  size_t readBytes(uint8_t *data, size_t len) override {
672  TRACED();
673  return parser.readBytes(data, len);
674  }
675 
677  void setLogLevel(AudioLogger::LogLevel level) { parser.setLogLevel(level); }
678 
680  void setBuffer(int size, int count) { parser.setBuffer(size, count); }
681 
682  protected:
683  HLSParser parser;
684  const char *ssid = nullptr;
685  const char *password = nullptr;
686 
687  void login() {
688 #ifdef USE_WIFI
689  if (ssid != nullptr && password != nullptr &&
690  WiFi.status() != WL_CONNECTED) {
691  TRACED();
692  WiFi.begin(ssid, password);
693  while (WiFi.status() != WL_CONNECTED) {
694  Serial.print(".");
695  delay(500);
696  }
697  }
698 #else
699  LOGW("login not supported");
700 #endif
701  }
702 };
703 
704 } // namespace audio_tools
705 
706 #endif
Base class for all Audio Streams. It support the boolean operator to test if the object is ready with...
Definition: AudioStreams.h:24
virtual int readArray(T data[], int len)
reads multiple values
Definition: Buffers.h:41
virtual int writeArray(const T data[], int len)
Fills the buffer data.
Definition: Buffers.h:65
void clear()
same as reset
Definition: Buffers.h:134
Simple Parser for HLS data. We select the entry with min bandwidth.
Definition: HLSStream.h:280
void setLogLevel(AudioLogger::LogLevel level)
Defines the class specific custom log level.
Definition: HLSStream.h:372
void setUrlCount(int count)
Defines the number of urls that are preloaded in the URLLoaderHLS.
Definition: HLSStream.h:369
const char * contentType()
Provides the content type of the audio data.
Definition: HLSStream.h:350
void end()
Closes the processing.
Definition: HLSStream.h:355
const char * getCodec()
Provides the codec.
Definition: HLSStream.h:347
HTTP Live Streaming using HLS: The result is a MPEG-TS data stream that must be decoded e....
Definition: HLSStream.h:626
void setLogLevel(AudioLogger::LogLevel level)
Defines the class specific custom log level.
Definition: HLSStream.h:677
void setPassword(const char *password)
Sets the password that will be used for logging in (when calling begin)
Definition: HLSStream.h:657
void setSSID(const char *ssid)
Sets the ssid that will be used for logging in (when calling begin)
Definition: HLSStream.h:654
const char * codec()
Returns the string representation of the codec of the audio stream.
Definition: HLSStream.h:660
void setBuffer(int size, int count)
Defines the buffer size.
Definition: HLSStream.h:680
RAII implementaion using a Mutex: Only a few microcontrollers provide lock guards,...
Definition: LockGuard.h:92
Definition: NoArduino.h:58
virtual int availableForWrite()
provides the number of entries that are available to write
Definition: Buffers.h:365
virtual int available()
provides the number of entries that are available to read
Definition: Buffers.h:362
void clear() override
clears the string by setting the terminating 0 at the beginning
Definition: StrExt.h:165
A simple wrapper to provide string functions on char*. If the underlying char* is a const we do not a...
Definition: Str.h:27
virtual bool equals(const char *str)
checks if the string equals indicated parameter string
Definition: Str.h:164
virtual int length()
Definition: Str.h:374
virtual const char * c_str()
provides the string value as const char*
Definition: Str.h:370
We provide the typeless StreamCopy as a subclass of StreamCopyT.
Definition: StreamCopy.h:433
size_t copyAll(int retryCount=5, int retryWaitMs=200)
copies all data - returns true if we copied anything
Definition: StreamCopy.h:208
void begin()
(Re)starts the processing
Definition: StreamCopy.h:54
Definition: HLSStream.h:244
Abstract API for URLLoaderHLS.
Definition: HLSStream.h:17
Definition: HLSStream.h:67
void bufferRefill()
try to keep the buffer filled
Definition: HLSStream.h:167
void addUrl(const char *url) override
Adds the next url to be played in sequence.
Definition: HLSStream.h:99
int urlCount() override
Definition: HLSStream.h:112
size_t readBytes(uint8_t *data, size_t len) override
Provides data from the audio stream.
Definition: HLSStream.h:125
int available() override
Available bytes of the audio stream.
Definition: HLSStream.h:115
URLLoader which saves the HLS segments to the indicated output.
Definition: HLSStream.h:32
Represents the content of a URL as Stream. We use the WiFi.h API.
Definition: URLStream.h:25
virtual HttpRequest & httpRequest() override
provides access to the HttpRequest
Definition: URLStream.h:182
virtual bool begin(const char *urlStr, const char *acceptMime=nullptr, MethodID action=GET, const char *reqMime="", const char *reqData="") override
Execute http request: by default we use a GET request.
Definition: URLStream.h:79
virtual bool waitForData(int timeout)
waits for some data - returns false if the request has failed
Definition: URLStream.h:230
Generic Implementation of sound input and output for desktop environments using portaudio.
Definition: AnalogAudio.h:10
void delay(uint32_t ms)
Waits for the indicated milliseconds.
Definition: Millis.h:11
uint32_t millis()
Returns the milliseconds since the start.
Definition: Millis.h:18