15#include "AudioTools/Concurrency/Desktop.h"
17#include "AudioTools/Concurrency/RTOS.h"
19#include "AudioTools/CoreAudio/AudioBasic/Collections/Vector.h"
20#include "AudioTools/CoreAudio/AudioTimer.h"
21#include "IAudioSource.h"
22#include "RTSPPlatform.h"
49template <
typename Platform>
64 LOGD(
"Creating RTSP Audio streamer base");
72 m_RtpSocket = Platform::NULL_UDP_SOCKET;
73 m_RtcpSocket = Platform::NULL_UDP_SOCKET;
79 m_useTcpInterleaved =
false;
80 m_RtspTcpSocket = Platform::NULL_TCP_SOCKET;
84 m_lastSamplesSent = 0;
131 m_audioSource = source;
133 LOGI(
"RTSP Audio streamer created. Fragment size: %i bytes",
151 LOGI(
"initAudioSource");
153 LOGE(
"audio_source is null");
160 LOGI(
"m_fragmentSize (bytes): %d", m_fragmentSize);
183 m_ClientIP = aClientIP;
184 m_ClientPort = aClientPort;
186 m_SequenceNumber = random(65536);
188 if (m_udpRefCount != 0) {
193 for (u_short P = 6970; P < 0xFFFE; P += 2) {
194 m_RtpSocket = Platform::createUdpSocket(P);
197 m_RtcpSocket = Platform::createUdpSocket(P + 1);
200 m_RtcpServerPort = P + 1;
203 Platform::closeUdpSocket(m_RtpSocket);
204 Platform::closeUdpSocket(m_RtcpSocket);
210 LOGI(
"RTP Streamer set up with client IP %s and client Port %i",
211 m_ClientIP.toString().c_str(), m_ClientPort);
214 tryLearnClientFromUdp(
true);
231 int rtpChannel,
int rtcpChannel) {
232 m_RtspTcpSocket = tcpSock;
233 m_TcpRtpChannel = rtpChannel;
234 m_TcpRtcpChannel = rtcpChannel;
235 m_useTcpInterleaved =
true;
236 m_SequenceNumber = random(65536);
237 LOGI(
"Using RTP over RTSP TCP interleaved: ch=%d/%d", rtpChannel,
254 if (m_udpRefCount == 0) {
256 m_RtcpServerPort = 0;
257 Platform::closeUdpSocket(m_RtpSocket);
258 Platform::closeUdpSocket(m_RtcpSocket);
260 m_RtpSocket = Platform::NULL_UDP_SOCKET;
261 m_RtcpSocket = Platform::NULL_UDP_SOCKET;
287 if (mRtpBuf.size() == 0) {
288 LOGE(
"mRtpBuf is empty");
293 if (m_audioSource ==
nullptr) {
294 LOGE(
"No audio source provided");
299 if (m_fragmentSize + HEADER_SIZE >= STREAMING_BUFFER_SIZE) {
301 "STREAMIN_BUFFER_SIZE too small for the sampling rate: increase to "
303 m_fragmentSize + HEADER_SIZE);
308 memset(mRtpBuf.data(), 0, STREAMING_BUFFER_SIZE);
311 unsigned char *dataBuf = &mRtpBuf[HEADER_SIZE];
313 int maxPayload = STREAMING_BUFFER_SIZE - HEADER_SIZE - header_len;
314 dataBuf += header_len;
316 int toRead = m_fragmentSize;
317 if (toRead > maxPayload) {
318 LOGW(
"Fragment exceeds payload capacity (%d > %d); clamping", toRead, maxPayload);
321 int bytesRead = m_audioSource->
readBytes((
void *)dataBuf, toRead);
322 LOGI(
"Read %d bytes from audio source", bytesRead);
323 int bytesNet = m_audioSource->
getFormat().convert(dataBuf, bytesRead);
328 sendOut(HEADER_SIZE + bytesNet + header_len);
345 LOGI(
"Starting audio source (base)");
347 if (mRtpBuf.size() == 0) {
348 mRtpBuf.resize(STREAMING_BUFFER_SIZE + 1);
351 if (m_audioSource !=
nullptr) {
353 m_audioSource->
start();
354 LOGI(
"Audio source started - ready for manual streaming");
356 LOGE(
"No streaming source");
373 LOGI(
"Stopping audio source (base)");
375 if (m_audioSource !=
nullptr) {
376 m_audioSource->
stop();
379 LOGI(
"Audio source stopped");
446 if (m_audioSource ==
nullptr) {
451 if (newPeriod != m_timer_period_us && newPeriod > 0) {
452 LOGI(
"Timer period changed from %u us to %u us",
453 (
unsigned)m_timer_period_us, (
unsigned)newPeriod);
454 m_timer_period_us = newPeriod;
478 LOGD(
"timerCallback");
479 if (audioStreamerObj ==
nullptr) {
480 LOGE(
"audioStreamerObj is null");
492 LOGW(
"Direct sending of RTP stream failed");
493 }
else if (bytes > 0) {
495 streamer->m_Timestamp += inc;
496 LOGD(
"%i samples (ts inc) sent; timestamp: %u", inc,
497 (
unsigned)streamer->m_Timestamp);
501 if (
stop -
start > streamer->m_timer_period_us) {
502 LOGW(
"RTP Stream can't keep up (took %lu us, %d is max)!",
stop -
start,
503 streamer->m_timer_period_us);
508 const int STREAMING_BUFFER_SIZE = 1024 * 3;
512 int m_fragmentSize = 0;
513 int m_timer_period_us = 20000;
514 const int HEADER_SIZE = 12;
515 volatile bool m_timer_restart_needed =
518 typename Platform::UdpSocketType
520 typename Platform::UdpSocketType
523 uint16_t m_RtpServerPort;
524 uint16_t m_RtcpServerPort;
526 u_short m_SequenceNumber;
527 uint32_t m_Timestamp;
530 IPAddress m_ClientIP;
531 uint16_t m_ClientPort;
537 bool m_useTcpInterleaved;
538 typename Platform::TcpClientType *m_RtspTcpSocket;
540 int m_TcpRtcpChannel;
542 int m_payloadType = 96;
543 int m_lastSamplesSent = 0;
544 uint32_t m_Ssrc = 0x13F97E67;
547 int mMp3CarryLen = 0;
564 if (m_lastSamplesSent > 0) {
565 samples = m_lastSamplesSent;
566 }
else if (m_audioSource) {
569 samples = bytesSent / 2;
571 return (uint32_t)samples;
574 inline void buildRtpHeader() {
576 mRtpBuf[1] = (uint8_t)(m_payloadType & 0x7F);
577 if (m_payloadType == 14) {
581 mRtpBuf[2] = (uint8_t)((m_SequenceNumber >> 8) & 0xFF);
582 mRtpBuf[3] = (uint8_t)(m_SequenceNumber & 0xFF);
583 mRtpBuf[4] = (uint8_t)((m_Timestamp >> 24) & 0xFF);
584 mRtpBuf[5] = (uint8_t)((m_Timestamp >> 16) & 0xFF);
585 mRtpBuf[6] = (uint8_t)((m_Timestamp >> 8) & 0xFF);
586 mRtpBuf[7] = (uint8_t)(m_Timestamp & 0xFF);
588 mRtpBuf[8] = (uint8_t)((m_Ssrc >> 24) & 0xFF);
589 mRtpBuf[9] = (uint8_t)((m_Ssrc >> 16) & 0xFF);
590 mRtpBuf[10] = (uint8_t)((m_Ssrc >> 8) & 0xFF);
591 mRtpBuf[11] = (uint8_t)(m_Ssrc & 0xFF);
594 inline void sendOut(uint16_t totalLen) {
595 if (m_useTcpInterleaved && m_RtspTcpSocket != Platform::NULL_TCP_SOCKET) {
596 LOGD(
"Sending TCP: %d", totalLen);
599 hdr[1] = (uint8_t)m_TcpRtpChannel;
600 hdr[2] = (uint8_t)((totalLen >> 8) & 0xFF);
601 hdr[3] = (uint8_t)(totalLen & 0xFF);
602 Platform::sendSocket(m_RtspTcpSocket, hdr,
sizeof(hdr));
603 Platform::sendSocket(m_RtspTcpSocket, mRtpBuf.data(), totalLen);
606 tryLearnClientFromUdp(
false);
607 LOGI(
"Sending UDP: %d bytes (to %s:%d)", totalLen,
608 m_ClientIP.toString().c_str(), m_ClientPort);
609 Platform::sendUdpSocket(m_RtpSocket, mRtpBuf.data(), totalLen, m_ClientIP,
614 inline void tryLearnClientFromUdp(
bool warnIfNone) {
615 if (m_ClientIP == IPAddress(0, 0, 0, 0) && m_RtpSocket) {
616 int avail = m_RtpSocket->parsePacket();
618 IPAddress learnedIp = m_RtpSocket->remoteIP();
619 uint16_t learnedPort = m_RtpSocket->remotePort();
620 if (learnedIp != IPAddress(0, 0, 0, 0)) {
621 m_ClientIP = learnedIp;
622 if (m_ClientPort == 0) m_ClientPort = learnedPort;
623 LOGI(
"RTP learned client via UDP: %s:%u",
624 m_ClientIP.toString().c_str(), (
unsigned)m_ClientPort);
626 }
else if (warnIfNone) {
627 LOGW(
"Client IP unknown (0.0.0.0) and no inbound UDP yet");
649template <
typename Platform>
662 LOGD(
"Creating RTSP Audio streamer with timer");
665 rtpTimer.setCallbackParameter(
this);
673 LOGI(
"RTSPAudioStreamer: Timer set to safe task mode (ESP_TIMER_TASK)");
704 LOGI(
"Starting RTP Stream with timer");
709 if (this->m_audioSource !=
nullptr) {
712 this->m_timer_period_us, audio_tools::US)) {
713 LOGE(
"Could not start timer");
715 LOGI(
"timer: %u us", (
unsigned)this->m_timer_period_us);
717 LOGI(
"Free heap size: %i KB", esp_get_free_heap_size() / 1000);
732 LOGI(
"Updating timer period to %u us", (
unsigned)this->m_timer_period_us);
733 rtpTimer.begin(this->m_timer_period_us, audio_tools::US);
750 LOGI(
"Stopping RTP Stream with timer");
761 LOGI(
"RTP Stream stopped - ready for restart");
802template <
typename Platform>
824 LOGD(
"Creating RTSP Audio streamer with task");
883 LOGI(
"Task parameters set: stack=%d bytes, priority=%d, core=%d",
884 stackSize, priority, core);
886 LOGW(
"Cannot change task parameters while streaming is active");
904 LOGI(
"Starting RTP Stream with task");
915 LOGE(
"Failed to create streaming task");
929 LOGI(
"Streaming task started successfully");
930 LOGI(
"Task: stack=%d bytes, priority=%d, core=%d, period=%d us",
932 this->m_timer_period_us);
934 LOGI(
"Free heap size: %i KB", esp_get_free_heap_size() / 1000);
937 LOGE(
"Failed to start streaming task");
957 LOGI(
"Stopping RTP Stream with task");
973 LOGI(
"RTP Stream with task stopped - ready for restart");
1076 LOGD(
"Streaming task loop iteration");
1078 auto iterationStartUs = micros();
1109 LOGI(
"Timer period updated; resetting throttle window to %u us",
1110 (
unsigned)this->m_timer_period_us);
1118 uint64_t expectedUs =
1120 unsigned long nowUs = micros();
1123 if (actualUs < expectedUs) {
1124 uint32_t remainingUs = (uint32_t)(expectedUs - actualUs);
1126 if (remainingUs >= 1000) {
1127 delay(remainingUs / 1000);
1129 uint32_t remUs = remainingUs % 1000;
1131 delayMicroseconds(remUs);
1133 }
else if (actualUs > expectedUs + 1000) {
1134 LOGW(
"Throttling behind by %llu us over %u sends",
1135 (
unsigned long long)(actualUs - expectedUs),