16#include "AudioTools/CoreAudio/AudioBasic/Collections/Vector.h"
17#include "RTSPAudioStreamer.h"
18#include "RTSPPlatform.h"
21#define RTSP_BUFFER_SIZE 10000
23#define RTSP_PARAM_STRING_MAX 100
25#define MAX_HOSTNAME_LEN 256
27#define RTSP_RESPONSE_BUFFER_SIZE 2251
29#define RTSP_SDP_BUFFER_SIZE 1024
31#define RTSP_URL_BUFFER_SIZE 1024
33#define RTSP_SMALL_BUFFER_SIZE 256
76template <
typename Platform>
99 : m_Client(aClient), m_Streamer(&aStreamer) {
100 m_RtspClient = &m_Client;
101 m_RtspSessionID = random(65536);
102 LOGI(
"RTSP session created");
114 LOGI(
"RTSP session destructor");
117 if (m_streaming && m_Streamer) {
118 LOGI(
"Final cleanup: stopping streamer in destructor");
120 m_Streamer->releaseUdpTransport();
125 if (m_RtspClient && m_RtspClient->connected()) {
126 Platform::closeSocket(m_RtspClient);
129 LOGI(
"RTSP session cleanup completed");
149 LOGD(
"handleRequests");
153 if (!m_sessionOpen) {
158 memset(mRecvBuf.data(), 0x00, RTSP_BUFFER_SIZE);
159 int res =
readSocket(m_RtspClient, mRecvBuf.data(), RTSP_BUFFER_SIZE,
164 if ((mRecvBuf[0] ==
'O') || (mRecvBuf[0] ==
'D') ||
165 (mRecvBuf[0] ==
'S') || (mRecvBuf[0] ==
'P') ||
166 (mRecvBuf[0] ==
'T')) {
168 if (!m_sessionOpen) {
173 if (C == RTSP_PLAY) {
175 }
else if (C == RTSP_PAUSE) {
177 }
else if (C == RTSP_TEARDOWN) {
178 m_sessionOpen =
false;
181 if (m_streaming && m_Streamer) {
182 LOGI(
"Stopping streamer due to TEARDOWN");
184 m_Streamer->releaseUdpTransport();
190 }
else if (res == 0) {
191 LOGW(
"client closed socket, exiting");
192 m_sessionOpen =
false;
195 if (m_streaming && m_Streamer) {
196 LOGI(
"Stopping streamer due to client disconnect");
198 m_Streamer->releaseUdpTransport();
210 bool isSessionOpen() {
return m_sessionOpen; }
212 bool isStreaming() {
return m_streaming; }
226 m_onSessionPath = cb;
227 m_onSessionPathRef = ref;
238 const char* STD_URL_PRE_SUFFIX =
"trackID";
242 typename Platform::TcpClientType m_Client;
243 typename Platform::TcpClientType* m_RtspClient =
246 uint16_t m_ClientRTPPort;
247 uint16_t m_ClientRTCPPort;
258 unsigned m_ContentLength;
259 uint16_t m_RtpClientPort =
261 uint16_t m_RtcpClientPort =
264 bool m_TransportIsTcp =
false;
265 int m_InterleavedRtp = -1;
266 int m_InterleavedRtcp = -1;
277 bool m_is_init =
false;
278 bool m_streaming =
false;
279 volatile bool m_sessionOpen =
true;
280 bool m_pathNotified =
false;
281 bool (*m_onSessionPath)(
const char* path,
void* ref) =
nullptr;
282 void* m_onSessionPathRef =
nullptr;
292 if (m_is_init)
return;
297 m_sessionOpen =
true;
300 if (mRecvBuf.size() == 0) {
301 mRecvBuf.resize(RTSP_BUFFER_SIZE);
303 if (mCurRequest.size() == 0) {
304 mCurRequest.resize(RTSP_BUFFER_SIZE);
306 if (m_URLPreSuffix.size() == 0) {
307 m_URLPreSuffix.resize(RTSP_PARAM_STRING_MAX);
309 if (m_URLSuffix.size() == 0) {
310 m_URLSuffix.resize(RTSP_PARAM_STRING_MAX);
312 if (m_CSeq.size() == 0) {
313 m_CSeq.resize(RTSP_PARAM_STRING_MAX);
315 if (m_URLHostPort.size() == 0) {
316 m_URLHostPort.resize(MAX_HOSTNAME_LEN);
318 if (m_URLPath.size() == 0) {
319 m_URLPath.resize(RTSP_URL_BUFFER_SIZE);
321 if (m_Response.size() == 0) {
322 m_Response.resize(RTSP_RESPONSE_BUFFER_SIZE);
324 if (m_SDPBuf.size() == 0) {
325 m_SDPBuf.resize(RTSP_SDP_BUFFER_SIZE);
327 if (m_URLBuf.size() == 0) {
328 m_URLBuf.resize(RTSP_URL_BUFFER_SIZE);
330 if (m_Buf1.size() == 0) {
331 m_Buf1.resize(RTSP_SMALL_BUFFER_SIZE);
333 if (m_Buf2.size() == 0) {
334 m_Buf2.resize(RTSP_SMALL_BUFFER_SIZE);
336 if (m_CmdName.size() == 0) {
337 m_CmdName.resize(RTSP_PARAM_STRING_MAX);
340 m_RtspCmdType = RTSP_UNKNOWN;
341 memset(m_URLPreSuffix.data(), 0x00, m_URLPreSuffix.size());
342 memset(m_URLSuffix.data(), 0x00, m_URLSuffix.size());
343 memset(m_CSeq.data(), 0x00, m_CSeq.size());
344 memset(m_URLHostPort.data(), 0x00, m_URLHostPort.size());
345 if (m_URLPath.size() > 0) memset(m_URLPath.data(), 0x00, m_URLPath.size());
347 m_TransportIsTcp =
false;
348 m_InterleavedRtp = -1;
349 m_InterleavedRtcp = -1;
351 m_pathNotified =
false;
362 unsigned aRequestSize) {
364 switch (m_RtspCmdType) {
369 case RTSP_DESCRIBE: {
385 case RTSP_TEARDOWN: {
393 return m_RtspCmdType;
404 LOGI(
"aRequest: ------------------------\n%s\n-------------------------",
407 const unsigned CurRequestSize = aRequestSize;
408 memcpy(mCurRequest.data(), aRequest, aRequestSize);
411 parseClientPorts(mCurRequest.data());
412 parseTransportHeader(mCurRequest.data());
415 unsigned idxAfterCmd = 0;
416 if (!parseCommandName(mCurRequest.data(), CurRequestSize, idxAfterCmd))
418 determineCommandType();
419 parseUrlHostPortAndSuffix(mCurRequest.data(), CurRequestSize, idxAfterCmd);
420 if (!m_sessionOpen) {
426 if (!parseCSeq(mCurRequest.data(), CurRequestSize, idxAfterCmd))
428 parseContentLength(mCurRequest.data(), CurRequestSize, idxAfterCmd);
431 detectClientHeaderPreference(mCurRequest.data());
437 void parseClientPorts(
char* req) {
438 char* ClientPortPtr = strstr(req,
"client_port");
439 if (!ClientPortPtr)
return;
440 char* lineEnd = strstr(ClientPortPtr,
"\r\n");
441 if (!lineEnd)
return;
442 char* CP = m_Response.data();
443 memset(CP, 0, m_Response.size());
444 char saved = lineEnd[0];
446 strcpy(CP, ClientPortPtr);
447 char* eq = strstr(CP,
"=");
451 char* dash = strstr(CP,
"-");
454 m_ClientRTPPort = atoi(CP);
455 m_ClientRTCPPort = m_ClientRTPPort + 1;
461 void parseTransportHeader(
char* req) {
462 char* TransportPtr = strstr(req,
"Transport:");
463 if (!TransportPtr)
return;
464 char* lineEnd = strstr(TransportPtr,
"\r\n");
465 if (!lineEnd)
return;
466 char* CP = m_Response.data();
467 memset(CP, 0, m_Response.size());
468 char saved = lineEnd[0];
470 strncpy(CP, TransportPtr, m_Response.size() - 1);
471 CP[m_Response.size() - 1] =
'\0';
472 if (strstr(CP,
"RTP/AVP/TCP") || strstr(CP,
"/TCP")) m_TransportIsTcp =
true;
473 char* inter = strstr(CP,
"interleaved=");
475 inter += strlen(
"interleaved=");
477 if (sscanf(inter,
"%d-%d", &a, &b) == 2) {
478 m_InterleavedRtp = a;
479 m_InterleavedRtcp = b;
480 }
else if (sscanf(inter,
"%d,%d", &a, &b) == 2) {
481 m_InterleavedRtp = a;
482 m_InterleavedRtcp = b;
483 }
else if (sscanf(inter,
"%d", &a) == 1) {
484 m_InterleavedRtp = a;
485 m_InterleavedRtcp = a + 1;
491 bool parseCommandName(
char* req,
unsigned reqSize,
unsigned& outIdx) {
494 for (i = 0; i < m_CmdName.size() - 1 && i < reqSize; ++i) {
496 if (c ==
' ' || c ==
'\t') {
504 LOGE(
"failed to parse RTSP");
507 LOGI(
"RTSP received %s", m_CmdName.data());
512 void determineCommandType() {
513 if (strstr(m_CmdName.data(),
"OPTIONS"))
514 m_RtspCmdType = RTSP_OPTIONS;
515 else if (strstr(m_CmdName.data(),
"DESCRIBE"))
516 m_RtspCmdType = RTSP_DESCRIBE;
517 else if (strstr(m_CmdName.data(),
"SETUP"))
518 m_RtspCmdType = RTSP_SETUP;
519 else if (strstr(m_CmdName.data(),
"PLAY"))
520 m_RtspCmdType = RTSP_PLAY;
521 else if (strstr(m_CmdName.data(),
"PAUSE"))
522 m_RtspCmdType = RTSP_PAUSE;
523 else if (strstr(m_CmdName.data(),
"TEARDOWN"))
524 m_RtspCmdType = RTSP_TEARDOWN;
526 LOGE(
"Error: Unsupported Command received (%s)!", m_CmdName.data());
529 void parseUrlHostPortAndSuffix(
char* req,
unsigned reqSize,
unsigned& i) {
531 while (j < reqSize && (req[j] ==
' ' || req[j] ==
'\t')) ++j;
532 for (; (int)j < (
int)(reqSize - 8); ++j) {
533 if ((req[j] ==
'r' || req[j] ==
'R') && (req[j + 1] ==
't' || req[j + 1] ==
'T') &&
534 (req[j + 2] ==
's' || req[j + 2] ==
'S') && (req[j + 3] ==
'p' || req[j + 3] ==
'P') &&
535 req[j + 4] ==
':' && req[j + 5] ==
'/') {
540 while (j < reqSize && req[j] !=
'/' && req[j] !=
' ' && uidx < m_URLHostPort.size() - 1) {
541 m_URLHostPort[uidx++] = req[j++];
550 LOGD(
"m_URLHostPort: %s", m_URLHostPort.data());
553 if (i < reqSize && req[i] ==
'/') {
556 while (k < reqSize && req[k] !=
' ' && p < m_URLPath.size() - 1) {
557 m_URLPath[p++] = req[k++];
560 LOGD(
"m_URLPath: %s", m_URLPath.data());
561 if (!m_pathNotified && m_onSessionPath) {
562 bool ok = m_onSessionPath(m_URLPath.data(), m_onSessionPathRef);
563 m_pathNotified =
true;
565 LOGW(
"Session rejected by onSessionPath callback");
566 m_sessionOpen =
false;
573 for (
unsigned k = i + 1; (int)k < (
int)(reqSize - 5); ++k) {
574 if (req[k] ==
'R' && req[k + 1] ==
'T' && req[k + 2] ==
'S' && req[k + 3] ==
'P' && req[k + 4] ==
'/') {
575 while (--k >= i && req[k] ==
' ') {}
577 while (k1 > i && req[k1] !=
'=') --k1;
578 if (k - k1 + 1 <= m_URLSuffix.size()) {
579 unsigned n = 0, k2 = k1 + 1;
580 while (k2 <= k) m_URLSuffix[n++] = req[k2++];
581 m_URLSuffix[n] =
'\0';
582 if (k1 - i <= m_URLPreSuffix.size()) ok =
true;
585 while (k2 <= k1 - 1) m_URLPreSuffix[n++] = req[k2++];
586 m_URLPreSuffix[n] =
'\0';
592 LOGD(
"m_URLSuffix: %s", m_URLSuffix.data());
593 LOGD(
"m_URLPreSuffix: %s", m_URLPreSuffix.data());
594 LOGD(
"URL Suffix parse succeeded: %i", ok);
597 bool parseCSeq(
char* req,
unsigned reqSize,
unsigned startIdx) {
599 for (
unsigned j = startIdx; (int)j < (
int)(reqSize - 5); ++j) {
600 if (req[j] ==
'C' && req[j + 1] ==
'S' && req[j + 2] ==
'e' && req[j + 3] ==
'q' && req[j + 4] ==
':') {
602 while (j < reqSize && (req[j] ==
' ' || req[j] ==
'\t')) ++j;
604 for (n = 0; n < m_CSeq.size() - 1 && j < reqSize; ++n, ++j) {
606 if (c ==
'\r' || c ==
'\n') {
616 LOGD(
"Look for CSeq success: %i", ok);
620 void parseContentLength(
char* req,
unsigned reqSize,
unsigned startIdx) {
621 for (
unsigned j = startIdx; (int)j < (
int)(reqSize - 15); ++j) {
622 if (req[j] ==
'C' && req[j + 1] ==
'o' && req[j + 2] ==
'n' && req[j + 3] ==
't' &&
623 req[j + 4] ==
'e' && req[j + 5] ==
'n' && req[j + 6] ==
't' && req[j + 7] ==
'-' &&
624 (req[j + 8] ==
'L' || req[j + 8] ==
'l') && req[j + 9] ==
'e' && req[j + 10] ==
'n' &&
625 req[j + 11] ==
'g' && req[j + 12] ==
't' && req[j + 13] ==
'h' && req[j + 14] ==
':') {
627 while (j < reqSize && (req[j] ==
' ' || req[j] ==
'\t')) ++j;
629 if (sscanf(&req[j],
"%u", &num) == 1) m_ContentLength = num;
634 void detectClientHeaderPreference(
char* req) {
635 char* ua = strstr(req,
"User-Agent:");
636 bool want_rfc2250 =
false;
638 if (strcasestr(ua,
"ffmpeg") || strcasestr(ua,
"ffplay") || strcasestr(ua,
"libavformat") ||
639 strcasestr(ua,
"Lavf")) {
642 if (strcasestr(ua,
"vlc")) want_rfc2250 =
false;
644 char* qm = strchr(req,
'?');
646 if (strstr(qm,
"mpa_hdr=1")) want_rfc2250 =
true;
647 if (strstr(qm,
"mpa_hdr=0")) want_rfc2250 =
false;
659 snprintf(m_Response.data(), m_Response.size(),
660 "RTSP/1.0 200 OK\r\nCSeq: %s\r\n"
661 "Public: DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE\r\n\r\n",
664 sendSocket(m_RtspClient, m_Response.data(), strlen(m_Response.data()));
674 if (strcmp(m_URLPreSuffix.data(), STD_URL_PRE_SUFFIX) == 0) {
676 m_StreamID = strtol(m_URLSuffix.data(), &end, 10);
677 if (*end !=
'\0') m_StreamID = -1;
682 strncpy(m_Buf1.data(), m_URLHostPort.data(), 256);
683 ColonPtr = strstr(m_Buf1.data(),
":");
684 if (ColonPtr !=
nullptr) ColonPtr[0] = 0x00;
687 m_SDPBuf.data(), m_SDPBuf.size(),
689 "o=- %d 0 IN IP4 %s\r\n"
692 rand() & 0xFF, m_Buf1.data(),
696 snprintf(m_URLBuf.data(), m_URLBuf.size(),
"rtsp://%s",
697 m_URLHostPort.data());
699 snprintf(m_Response.data(), m_Response.size(),
700 "RTSP/1.0 200 OK\r\nCSeq: %s\r\n"
702 "Content-Base: %s/\r\n"
703 "Content-Type: application/sdp\r\n"
704 "Content-Length: %d\r\n\r\n"
707 (
int)strlen(m_SDPBuf.data()), m_SDPBuf.data());
710 Serial.println(
"------------------------------");
711 Serial.println((
const char*)m_Response.data());
712 Serial.println(
"------------------------------");
713 sendSocket(m_RtspClient, m_Response.data(), strlen(m_Response.data()));
721 if (m_TransportIsTcp) {
723 int ch0 = (m_InterleavedRtp >= 0) ? m_InterleavedRtp : 0;
724 int ch1 = (m_InterleavedRtcp >= 0) ? m_InterleavedRtcp : (ch0 + 1);
728 snprintf(m_Buf1.data(), m_Buf1.size(),
729 "RTP/AVP/TCP;unicast;interleaved=%d-%d", ch0, ch1);
734 snprintf(m_Buf1.data(), m_Buf1.size(),
735 "RTP/AVP;unicast;client_port=%i-%i;server_port=%i-%i;ssrc=%08X",
736 m_ClientRTPPort, m_ClientRTCPPort,
740 snprintf(m_Response.data(), m_Response.size(),
741 "RTSP/1.0 200 OK\r\n"
747 m_CSeq.data(),
dateHeader(), m_RtspSessionID, m_Buf1.data());
749 Serial.println(
"------------------------------");
750 Serial.println((
char*)m_Response.data());
751 Serial.println(
"------------------------------");
753 sendSocket(m_RtspClient, m_Response.data(), strlen(m_Response.data()));
762 snprintf(m_URLBuf.data(), m_URLBuf.size(),
"rtsp://%s/%s=0",
763 m_URLHostPort.data(), STD_URL_PRE_SUFFIX);
766 uint16_t seq = m_Streamer ? m_Streamer->
currentSeq() : 0;
770 snprintf(m_Response.data(), m_Response.size(),
771 "RTSP/1.0 200 OK\r\n"
773 "Range: npt=0.000-\r\n"
775 "RTP-Info: url=%s;seq=%u;rtptime=%u\r\n\r\n",
776 m_CSeq.data(), m_RtspSessionID, m_URLBuf.data(), (
unsigned)seq,
779 Serial.println(
"------------------------------");
780 Serial.println((
char*)m_Response.data());
781 Serial.println(
"------------------------------");
783 sendSocket(m_RtspClient, m_Response.data(), strlen(m_Response.data()));
793 if (m_streaming && m_Streamer) {
796 snprintf(m_Response.data(), m_Response.size(),
797 "RTSP/1.0 200 OK\r\n"
799 "Session: %i\r\n\r\n",
800 m_CSeq.data(), m_RtspSessionID);
801 sendSocket(m_RtspClient, m_Response.data(), strlen(m_Response.data()));
811 snprintf(m_Response.data(), m_Response.size(),
812 "RTSP/1.0 200 OK\r\n"
816 sendSocket(m_RtspClient, m_Response.data(), strlen(m_Response.data()));
818 m_sessionOpen =
false;
833 m_RtpClientPort = aRtpPort;
834 m_RtcpClientPort = aRtcpPort;
838 Platform::getSocketPeerAddr(m_RtspClient, &clientIP, &clientPort);
840 LOGI(
"SETUP peer resolved: %s:%u (RTP client_port=%u)",
841 clientIP.toString().c_str(), (
unsigned)clientPort,
842 (
unsigned)m_RtpClientPort);
847 typename Platform::TcpClientType*& getClient() {
return m_RtspClient; }
849 uint16_t getRtpClientPort() {
return m_RtpClientPort; }
859 inline int readSocket(
typename Platform::TcpClientType* sock,
char* buf,
860 size_t buflen,
int timeoutmsec) {
861 return Platform::readSocket(sock, buf, buflen, timeoutmsec);
871 inline ssize_t
sendSocket(
typename Platform::TcpClientType* sock,
872 const void* buf,
size_t len) {
873 return Platform::sendSocket(sock, buf, len);
881 static char buf[200];
882 time_t tt = time(NULL);
883 strftime(buf,
sizeof(buf),
"Date: %a, %b %d %Y %H:%M:%S GMT", gmtime(&tt));