15#include "AudioTools/Concurrency/Desktop.h"
17#include "AudioTools/Concurrency/RTOS.h"
19#include "AudioTools/CoreAudio/AudioBasic/Collections/Vector.h"
20#include "AudioTools/CoreAudio/AudioTimer.h"
21#include "IAudioSource.h"
22#include "RTSPPlatform.h"
49template <
typename Platform>
64 LOGD(
"Creating RTSP Audio streamer base");
72 m_RtpSocket = Platform::NULL_UDP_SOCKET;
73 m_RtcpSocket = Platform::NULL_UDP_SOCKET;
79 m_useTcpInterleaved =
false;
80 m_RtspTcpSocket = Platform::NULL_TCP_SOCKET;
84 m_lastSamplesSent = 0;
131 m_audioSource = source;
133 LOGI(
"RTSP Audio streamer created. Fragment size: %i bytes",
151 LOGI(
"initAudioSource");
153 LOGE(
"audio_source is null");
160 LOGI(
"m_fragmentSize (bytes): %d", m_fragmentSize);
183 m_ClientIP = aClientIP;
184 m_ClientPort = aClientPort;
186 m_SequenceNumber = random(65536);
188 if (m_udpRefCount != 0) {
193 for (u_short P = 6970; P < 0xFFFE; P += 2) {
194 m_RtpSocket = Platform::createUdpSocket(P);
197 m_RtcpSocket = Platform::createUdpSocket(P + 1);
200 m_RtcpServerPort = P + 1;
203 Platform::closeUdpSocket(m_RtpSocket);
204 Platform::closeUdpSocket(m_RtcpSocket);
210 LOGD(
"RTP Streamer set up with client IP %s and client Port %i",
211 m_ClientIP.toString().c_str(), m_ClientPort);
228 int rtpChannel,
int rtcpChannel) {
229 m_RtspTcpSocket = tcpSock;
230 m_TcpRtpChannel = rtpChannel;
231 m_TcpRtcpChannel = rtcpChannel;
232 m_useTcpInterleaved =
true;
233 m_SequenceNumber = random(65536);
234 LOGI(
"Using RTP over RTSP TCP interleaved: ch=%d/%d", rtpChannel,
251 if (m_udpRefCount == 0) {
253 m_RtcpServerPort = 0;
254 Platform::closeUdpSocket(m_RtpSocket);
255 Platform::closeUdpSocket(m_RtcpSocket);
257 m_RtpSocket = Platform::NULL_UDP_SOCKET;
258 m_RtcpSocket = Platform::NULL_UDP_SOCKET;
284 if (mRtpBuf.size() == 0) {
285 LOGE(
"mRtpBuf is empty");
290 if (m_audioSource ==
nullptr) {
291 LOGE(
"No audio source provided");
296 if (m_fragmentSize + HEADER_SIZE >= STREAMING_BUFFER_SIZE) {
298 "STREAMIN_BUFFER_SIZE too small for the sampling rate: increase to "
300 m_fragmentSize + HEADER_SIZE);
305 memset(mRtpBuf.data(), 0, STREAMING_BUFFER_SIZE);
308 unsigned char *dataBuf = &mRtpBuf[HEADER_SIZE];
310 int maxPayload = STREAMING_BUFFER_SIZE - HEADER_SIZE - header_len;
311 dataBuf += header_len;
313 int toRead = m_fragmentSize;
314 if (toRead > maxPayload) {
315 LOGW(
"Fragment exceeds payload capacity (%d > %d); clamping", toRead, maxPayload);
318 int bytesRead = m_audioSource->
readBytes((
void *)dataBuf, toRead);
319 LOGI(
"Read %d bytes from audio source", bytesRead);
320 int bytesNet = m_audioSource->
getFormat().convert(dataBuf, bytesRead);
325 sendOut(HEADER_SIZE + bytesNet + header_len);
342 LOGI(
"Starting audio source (base)");
344 if (mRtpBuf.size() == 0) {
345 mRtpBuf.resize(STREAMING_BUFFER_SIZE + 1);
348 if (m_audioSource !=
nullptr) {
350 m_audioSource->
start();
351 LOGI(
"Audio source started - ready for manual streaming");
353 LOGE(
"No streaming source");
370 LOGI(
"Stopping audio source (base)");
372 if (m_audioSource !=
nullptr) {
373 m_audioSource->
stop();
376 LOGI(
"Audio source stopped");
443 if (m_audioSource ==
nullptr) {
448 if (newPeriod != m_timer_period_us && newPeriod > 0) {
449 LOGI(
"Timer period changed from %u us to %u us",
450 (
unsigned)m_timer_period_us, (
unsigned)newPeriod);
451 m_timer_period_us = newPeriod;
475 LOGD(
"timerCallback");
476 if (audioStreamerObj ==
nullptr) {
477 LOGE(
"audioStreamerObj is null");
489 LOGW(
"Direct sending of RTP stream failed");
490 }
else if (bytes > 0) {
492 streamer->m_Timestamp += inc;
493 LOGD(
"%i samples (ts inc) sent; timestamp: %u", inc,
494 (
unsigned)streamer->m_Timestamp);
498 if (
stop -
start > streamer->m_timer_period_us) {
499 LOGW(
"RTP Stream can't keep up (took %lu us, %d is max)!",
stop -
start,
500 streamer->m_timer_period_us);
505 const int STREAMING_BUFFER_SIZE = 1024 * 3;
509 int m_fragmentSize = 0;
510 int m_timer_period_us = 20000;
511 const int HEADER_SIZE = 12;
512 volatile bool m_timer_restart_needed =
515 typename Platform::UdpSocketType
517 typename Platform::UdpSocketType
520 uint16_t m_RtpServerPort;
521 uint16_t m_RtcpServerPort;
523 u_short m_SequenceNumber;
524 uint32_t m_Timestamp;
527 IPAddress m_ClientIP;
528 uint16_t m_ClientPort;
534 bool m_useTcpInterleaved;
535 typename Platform::TcpClientType *m_RtspTcpSocket;
537 int m_TcpRtcpChannel;
539 int m_payloadType = 96;
540 int m_lastSamplesSent = 0;
541 uint32_t m_Ssrc = 0x13F97E67;
544 int mMp3CarryLen = 0;
561 if (m_lastSamplesSent > 0) {
562 samples = m_lastSamplesSent;
563 }
else if (m_audioSource) {
566 samples = bytesSent / 2;
568 return (uint32_t)samples;
571 inline void buildRtpHeader() {
573 mRtpBuf[1] = (uint8_t)(m_payloadType & 0x7F);
574 if (m_payloadType == 14) {
578 mRtpBuf[2] = (uint8_t)((m_SequenceNumber >> 8) & 0xFF);
579 mRtpBuf[3] = (uint8_t)(m_SequenceNumber & 0xFF);
580 mRtpBuf[4] = (uint8_t)((m_Timestamp >> 24) & 0xFF);
581 mRtpBuf[5] = (uint8_t)((m_Timestamp >> 16) & 0xFF);
582 mRtpBuf[6] = (uint8_t)((m_Timestamp >> 8) & 0xFF);
583 mRtpBuf[7] = (uint8_t)(m_Timestamp & 0xFF);
585 mRtpBuf[8] = (uint8_t)((m_Ssrc >> 24) & 0xFF);
586 mRtpBuf[9] = (uint8_t)((m_Ssrc >> 16) & 0xFF);
587 mRtpBuf[10] = (uint8_t)((m_Ssrc >> 8) & 0xFF);
588 mRtpBuf[11] = (uint8_t)(m_Ssrc & 0xFF);
591 inline void sendOut(uint16_t totalLen) {
592 if (m_useTcpInterleaved && m_RtspTcpSocket != Platform::NULL_TCP_SOCKET) {
595 hdr[1] = (uint8_t)m_TcpRtpChannel;
596 hdr[2] = (uint8_t)((totalLen >> 8) & 0xFF);
597 hdr[3] = (uint8_t)(totalLen & 0xFF);
598 Platform::sendSocket(m_RtspTcpSocket, hdr,
sizeof(hdr));
599 Platform::sendSocket(m_RtspTcpSocket, mRtpBuf.data(), totalLen);
601 Platform::sendUdpSocket(m_RtpSocket, mRtpBuf.data(), totalLen, m_ClientIP,
623template <
typename Platform>
636 LOGD(
"Creating RTSP Audio streamer with timer");
639 rtpTimer.setCallbackParameter(
this);
647 LOGI(
"RTSPAudioStreamer: Timer set to safe task mode (ESP_TIMER_TASK)");
678 LOGI(
"Starting RTP Stream with timer");
683 if (this->m_audioSource !=
nullptr) {
686 this->m_timer_period_us, audio_tools::US)) {
687 LOGE(
"Could not start timer");
689 LOGI(
"timer: %u us", (
unsigned)this->m_timer_period_us);
691 LOGI(
"Free heap size: %i KB", esp_get_free_heap_size() / 1000);
706 LOGI(
"Updating timer period to %u us", (
unsigned)this->m_timer_period_us);
707 rtpTimer.begin(this->m_timer_period_us, audio_tools::US);
724 LOGI(
"Stopping RTP Stream with timer");
735 LOGI(
"RTP Stream stopped - ready for restart");
776template <
typename Platform>
798 LOGD(
"Creating RTSP Audio streamer with task");
857 LOGI(
"Task parameters set: stack=%d bytes, priority=%d, core=%d",
858 stackSize, priority, core);
860 LOGW(
"Cannot change task parameters while streaming is active");
878 LOGI(
"Starting RTP Stream with task");
889 LOGE(
"Failed to create streaming task");
903 LOGI(
"Streaming task started successfully");
904 LOGI(
"Task: stack=%d bytes, priority=%d, core=%d, period=%d us",
906 this->m_timer_period_us);
908 LOGI(
"Free heap size: %i KB", esp_get_free_heap_size() / 1000);
911 LOGE(
"Failed to start streaming task");
931 LOGI(
"Stopping RTP Stream with task");
947 LOGI(
"RTP Stream with task stopped - ready for restart");
1050 LOGD(
"Streaming task loop iteration");
1052 auto iterationStartUs = micros();
1083 LOGI(
"Timer period updated; resetting throttle window to %u us",
1084 (
unsigned)this->m_timer_period_us);
1092 uint64_t expectedUs =
1094 unsigned long nowUs = micros();
1097 if (actualUs < expectedUs) {
1098 uint32_t remainingUs = (uint32_t)(expectedUs - actualUs);
1100 if (remainingUs >= 1000) {
1101 delay(remainingUs / 1000);
1103 uint32_t remUs = remainingUs % 1000;
1105 delayMicroseconds(remUs);
1107 }
else if (actualUs > expectedUs + 1000) {
1108 LOGW(
"Throttling behind by %llu us over %u sends",
1109 (
unsigned long long)(actualUs - expectedUs),