Skip to content

Instantly share code, notes, and snippets.

@fernandoc1
Created September 2, 2020 18:08
Show Gist options
  • Save fernandoc1/364291f7335c83ac5a5d2b807ac3059e to your computer and use it in GitHub Desktop.
Save fernandoc1/364291f7335c83ac5a5d2b807ac3059e to your computer and use it in GitHub Desktop.
Kinesis Common.c for alternate TURN servers.
#define LOG_CLASS "WebRtcSamples"
#include "Samples.h"
PSampleConfiguration gSampleConfiguration = NULL;
VOID sigintHandler(INT32 sigNum)
{
UNUSED_PARAM(sigNum);
if (gSampleConfiguration != NULL) {
ATOMIC_STORE_BOOL(&gSampleConfiguration->interrupted, TRUE);
CVAR_BROADCAST(gSampleConfiguration->cvar);
}
}
VOID onDataChannelMessage(UINT64 customData, PRtcDataChannel pDataChannel, BOOL isBinary, PBYTE pMessage, UINT32 pMessageLen)
{
UNUSED_PARAM(customData);
UNUSED_PARAM(pDataChannel);
if (isBinary) {
DLOGI("DataChannel Binary Message");
} else {
DLOGI("DataChannel String Message: %.*s\n", pMessageLen, pMessage);
}
}
VOID onDataChannel(UINT64 customData, PRtcDataChannel pRtcDataChannel)
{
DLOGI("New DataChannel has been opened %s \n", pRtcDataChannel->name);
dataChannelOnMessage(pRtcDataChannel, customData, onDataChannelMessage);
}
VOID onConnectionStateChange(UINT64 customData, RTC_PEER_CONNECTION_STATE newState)
{
PSampleStreamingSession pSampleStreamingSession = (PSampleStreamingSession) customData;
STATUS retStatus = STATUS_SUCCESS;
DLOGI("New connection state %u", newState);
if (newState == RTC_PEER_CONNECTION_STATE_FAILED || newState == RTC_PEER_CONNECTION_STATE_CLOSED ||
newState == RTC_PEER_CONNECTION_STATE_DISCONNECTED) {
ATOMIC_STORE_BOOL(&pSampleStreamingSession->terminateFlag, TRUE);
CVAR_BROADCAST(pSampleStreamingSession->pSampleConfiguration->cvar);
} else if (newState == RTC_PEER_CONNECTION_STATE_CONNECTED) {
if (STATUS_FAILED(retStatus = logSelectedIceCandidatesInformation(pSampleStreamingSession))) {
DLOGW("Failed to get information about selected Ice candidates: 0x%08x", retStatus);
}
}
}
STATUS signalingClientStateChanged(UINT64 customData, SIGNALING_CLIENT_STATE state)
{
UNUSED_PARAM(customData);
STATUS retStatus = STATUS_SUCCESS;
PCHAR pStateStr;
signalingClientGetStateString(state, &pStateStr);
DLOGV("Signaling client state changed to %d - '%s'", state, pStateStr);
// Return success to continue
return retStatus;
}
STATUS signalingClientError(UINT64 customData, STATUS status, PCHAR msg, UINT32 msgLen)
{
PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData;
DLOGW("Signaling client generated an error 0x%08x - '%.*s'", status, msgLen, msg);
// We will force re-create the signaling client on the following errors
if (status == STATUS_SIGNALING_ICE_CONFIG_REFRESH_FAILED || status == STATUS_SIGNALING_RECONNECT_FAILED) {
ATOMIC_STORE_BOOL(&pSampleConfiguration->recreateSignalingClient, TRUE);
CVAR_BROADCAST(pSampleConfiguration->cvar);
}
return STATUS_SUCCESS;
}
STATUS logSelectedIceCandidatesInformation(PSampleStreamingSession pSampleStreamingSession)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
RtcStats rtcMetrics;
CHK(pSampleStreamingSession != NULL, STATUS_NULL_ARG);
rtcMetrics.requestedTypeOfStats = RTC_STATS_TYPE_LOCAL_CANDIDATE;
CHK_STATUS(rtcPeerConnectionGetMetrics(pSampleStreamingSession->pPeerConnection, NULL, &rtcMetrics));
DLOGD("Local Candidate IP Address: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.address);
DLOGD("Local Candidate type: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.candidateType);
DLOGD("Local Candidate port: %d", rtcMetrics.rtcStatsObject.localIceCandidateStats.port);
DLOGD("Local Candidate priority: %d", rtcMetrics.rtcStatsObject.localIceCandidateStats.priority);
DLOGD("Local Candidate transport protocol: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.protocol);
DLOGD("Local Candidate relay protocol: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.relayProtocol);
DLOGD("Local Candidate Ice server source: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.url);
rtcMetrics.requestedTypeOfStats = RTC_STATS_TYPE_REMOTE_CANDIDATE;
CHK_STATUS(rtcPeerConnectionGetMetrics(pSampleStreamingSession->pPeerConnection, NULL, &rtcMetrics));
DLOGD("Remote Candidate IP Address: %s", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.address);
DLOGD("Remote Candidate type: %s", rtcMetrics.rtcStatsObject.localIceCandidateStats.candidateType);
DLOGD("Remote Candidate port: %d", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.port);
DLOGD("Remote Candidate priority: %d", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.priority);
DLOGD("Remote Candidate transport protocol: %s", rtcMetrics.rtcStatsObject.remoteIceCandidateStats.protocol);
CleanUp:
LEAVES();
return retStatus;
}
STATUS handleAnswer(PSampleConfiguration pSampleConfiguration, PSampleStreamingSession pSampleStreamingSession, PSignalingMessage pSignalingMessage)
{
UNUSED_PARAM(pSampleConfiguration);
STATUS retStatus = STATUS_SUCCESS;
RtcSessionDescriptionInit answerSessionDescriptionInit;
MEMSET(&answerSessionDescriptionInit, 0x00, SIZEOF(RtcSessionDescriptionInit));
CHK_STATUS(deserializeSessionDescriptionInit(pSignalingMessage->payload, pSignalingMessage->payloadLen, &answerSessionDescriptionInit));
CHK_STATUS(setRemoteDescription(pSampleStreamingSession->pPeerConnection, &answerSessionDescriptionInit));
CleanUp:
CHK_LOG_ERR(retStatus);
return retStatus;
}
STATUS handleOffer(PSampleConfiguration pSampleConfiguration, PSampleStreamingSession pSampleStreamingSession, PSignalingMessage pSignalingMessage)
{
STATUS retStatus = STATUS_SUCCESS;
RtcSessionDescriptionInit offerSessionDescriptionInit;
BOOL locked = FALSE;
NullableBool canTrickle;
CHK(pSampleConfiguration != NULL && pSignalingMessage != NULL, STATUS_NULL_ARG);
MEMSET(&offerSessionDescriptionInit, 0x00, SIZEOF(RtcSessionDescriptionInit));
MEMSET(&pSampleStreamingSession->answerSessionDescriptionInit, 0x00, SIZEOF(RtcSessionDescriptionInit));
CHK_STATUS(deserializeSessionDescriptionInit(pSignalingMessage->payload, pSignalingMessage->payloadLen, &offerSessionDescriptionInit));
CHK_STATUS(setRemoteDescription(pSampleStreamingSession->pPeerConnection, &offerSessionDescriptionInit));
canTrickle = canTrickleIceCandidates(pSampleStreamingSession->pPeerConnection);
/* cannot be null after setRemoteDescription */
CHECK(!NULLABLE_CHECK_EMPTY(canTrickle));
pSampleStreamingSession->remoteCanTrickleIce = canTrickle.value;
CHK_STATUS(createAnswer(pSampleStreamingSession->pPeerConnection, &pSampleStreamingSession->answerSessionDescriptionInit));
CHK_STATUS(setLocalDescription(pSampleStreamingSession->pPeerConnection, &pSampleStreamingSession->answerSessionDescriptionInit));
/*
* If remote support trickle ice, send answer now. Otherwise answer will be sent once ice candidate gathering is complete.
*/
if (pSampleStreamingSession->remoteCanTrickleIce) {
CHK_STATUS(respondWithAnswer(pSampleStreamingSession));
DLOGD("time taken to send answer %" PRIu64 " ms",
(GETTIME() - pSampleStreamingSession->offerReceiveTime) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
}
if (!ATOMIC_LOAD_BOOL(&pSampleConfiguration->mediaThreadStarted)) {
ATOMIC_STORE_BOOL(&pSampleConfiguration->mediaThreadStarted, TRUE);
if (pSampleConfiguration->videoSource != NULL) {
THREAD_CREATE(&pSampleConfiguration->videoSenderTid, pSampleConfiguration->videoSource, (PVOID) pSampleConfiguration);
}
if (pSampleConfiguration->audioSource != NULL) {
THREAD_CREATE(&pSampleConfiguration->audioSenderTid, pSampleConfiguration->audioSource, (PVOID) pSampleConfiguration);
}
if ((retStatus = timerQueueAddTimer(pSampleConfiguration->timerQueueHandle, SAMPLE_STATS_DURATION, SAMPLE_STATS_DURATION,
getIceCandidatePairStatsCallback, (UINT64) pSampleConfiguration,
&pSampleConfiguration->iceCandidatePairStatsTimerId)) != STATUS_SUCCESS) {
DLOGW("Failed to add getIceCandidatePairStatsCallback to add to timer queue (code 0x%08x). Cannot pull ice candidate pair metrics "
"periodically",
retStatus);
}
}
// The audio video receive routine should be per streaming session
if (pSampleConfiguration->receiveAudioVideoSource != NULL) {
THREAD_CREATE(&pSampleStreamingSession->receiveAudioVideoSenderTid, pSampleConfiguration->receiveAudioVideoSource,
(PVOID) pSampleStreamingSession);
}
CleanUp:
CHK_LOG_ERR(retStatus);
if (locked) {
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
}
return retStatus;
}
STATUS respondWithAnswer(PSampleStreamingSession pSampleStreamingSession)
{
STATUS retStatus = STATUS_SUCCESS;
SignalingMessage message;
UINT32 buffLen = 0;
CHK_STATUS(serializeSessionDescriptionInit(&pSampleStreamingSession->answerSessionDescriptionInit, NULL, &buffLen));
CHK_STATUS(serializeSessionDescriptionInit(&pSampleStreamingSession->answerSessionDescriptionInit, message.payload, &buffLen));
message.version = SIGNALING_MESSAGE_CURRENT_VERSION;
message.messageType = SIGNALING_MESSAGE_TYPE_ANSWER;
STRCPY(message.peerClientId, pSampleStreamingSession->peerId);
message.payloadLen = (UINT32) STRLEN(message.payload);
message.correlationId[0] = '\0';
retStatus = signalingClientSendMessageSync(pSampleStreamingSession->pSampleConfiguration->signalingClientHandle, &message);
CleanUp:
CHK_LOG_ERR(retStatus);
return retStatus;
}
VOID onIceCandidateHandler(UINT64 customData, PCHAR candidateJson)
{
STATUS retStatus = STATUS_SUCCESS;
PSampleStreamingSession pSampleStreamingSession = (PSampleStreamingSession) customData;
SignalingMessage message;
CHK(pSampleStreamingSession != NULL, STATUS_NULL_ARG);
if (candidateJson == NULL) {
DLOGD("ice candidate gathering finished");
// if application is master and non-trickle ice, send answer now.
if (pSampleStreamingSession->pSampleConfiguration->channelInfo.channelRoleType == SIGNALING_CHANNEL_ROLE_TYPE_MASTER &&
!pSampleStreamingSession->remoteCanTrickleIce) {
CHK_STATUS(respondWithAnswer(pSampleStreamingSession));
DLOGD("time taken to send answer %" PRIu64 " ms",
(GETTIME() - pSampleStreamingSession->offerReceiveTime) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
}
ATOMIC_STORE_BOOL(&pSampleStreamingSession->candidateGatheringDone, TRUE);
} else if (pSampleStreamingSession->remoteCanTrickleIce && ATOMIC_LOAD_BOOL(&pSampleStreamingSession->peerIdReceived)) {
message.version = SIGNALING_MESSAGE_CURRENT_VERSION;
message.messageType = SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE;
STRCPY(message.peerClientId, pSampleStreamingSession->peerId);
message.payloadLen = (UINT32) STRLEN(candidateJson);
STRCPY(message.payload, candidateJson);
message.correlationId[0] = '\0';
CHK_STATUS(signalingClientSendMessageSync(pSampleStreamingSession->pSampleConfiguration->signalingClientHandle, &message));
}
CleanUp:
CHK_LOG_ERR(retStatus);
}
STATUS initializePeerConnection(PSampleConfiguration pSampleConfiguration, PRtcPeerConnection* ppRtcPeerConnection)
{
STATUS retStatus = STATUS_SUCCESS;
RtcConfiguration configuration;
UINT32 i, j, iceConfigCount, uriCount;
PIceConfigInfo pIceConfigInfo;
const UINT32 maxTurnServer = 1;
uriCount = 0;
CHK(pSampleConfiguration != NULL && ppRtcPeerConnection != NULL, STATUS_NULL_ARG);
MEMSET(&configuration, 0x00, SIZEOF(RtcConfiguration));
// Set this to custom callback to enable filtering of interfaces
configuration.kvsRtcConfiguration.iceSetInterfaceFilterFunc = NULL;
// Set the STUN server
//SNPRINTF(configuration.iceServers[0].urls, MAX_ICE_CONFIG_URI_LEN, KINESIS_VIDEO_STUN_URL, pSampleConfiguration->channelInfo.pRegion);
STRNCPY(configuration.iceServers[0].urls, getenv("STUN_URL"), MAX_ICE_CONFIG_URI_LEN);
if (pSampleConfiguration->useTurn) {
//// Set the URIs from the configuration
//CHK_STATUS(awaitGetIceConfigInfoCount(pSampleConfiguration->signalingClientHandle, &iceConfigCount));
///* signalingClientGetIceConfigInfoCount can return more than one turn server. Use only one to optimize
// * candidate gathering latency. But user can also choose to use more than 1 turn server. */
//for (uriCount = 0, i = 0; i < maxTurnServer; i++) {
// CHK_STATUS(signalingClientGetIceConfigInfo(pSampleConfiguration->signalingClientHandle, i, &pIceConfigInfo));
// for (j = 0; j < pIceConfigInfo->uriCount; j++) {
// CHECK(uriCount < MAX_ICE_SERVERS_COUNT);
// /*
// * if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=udp" then ICE will try TURN over UDP
// * if configuration.iceServers[uriCount + 1].urls is "turn:ip:port?transport=tcp" then ICE will try TURN over TCP/TLS
// * if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=udp", it's currently ignored because sdk dont do TURN
// * over DTLS yet. if configuration.iceServers[uriCount + 1].urls is "turns:ip:port?transport=tcp" then ICE will try TURN over TCP/TLS
// * if configuration.iceServers[uriCount + 1].urls is "turn:ip:port" then ICE will try both TURN over UPD and TCP/TLS
// *
// * It's recommended to not pass too many TURN iceServers to configuration because it will slow down ice gathering in non-trickle mode.
// */
// STRNCPY(configuration.iceServers[uriCount + 1].urls, pIceConfigInfo->uris[j], MAX_ICE_CONFIG_URI_LEN);
// STRNCPY(configuration.iceServers[uriCount + 1].credential, pIceConfigInfo->password, MAX_ICE_CONFIG_CREDENTIAL_LEN);
// STRNCPY(configuration.iceServers[uriCount + 1].username, pIceConfigInfo->userName, MAX_ICE_CONFIG_USER_NAME_LEN);
// uriCount++;
// }
//}
STRNCPY(configuration.iceServers[1].urls, getenv("TURN_URL"), MAX_ICE_CONFIG_URI_LEN);
STRNCPY(configuration.iceServers[1].username, getenv("USERNAME"), MAX_ICE_CONFIG_USER_NAME_LEN);
STRNCPY(configuration.iceServers[1].credential, getenv("CREDENTIAL"), MAX_ICE_CONFIG_CREDENTIAL_LEN);
uriCount++;
}
pSampleConfiguration->iceUriCount = uriCount + 1;
CHK_STATUS(createPeerConnection(&configuration, ppRtcPeerConnection));
CleanUp:
return retStatus;
}
// Return ICE server stats for a specific streaming session
STATUS gatherIceServerStats(PSampleStreamingSession pSampleStreamingSession)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
RtcStats rtcmetrics;
UINT32 j = 0;
rtcmetrics.requestedTypeOfStats = RTC_STATS_TYPE_ICE_SERVER;
for (; j < pSampleStreamingSession->pSampleConfiguration->iceUriCount; j++) {
rtcmetrics.rtcStatsObject.iceServerStats.iceServerIndex = j;
CHK_STATUS(rtcPeerConnectionGetMetrics(pSampleStreamingSession->pPeerConnection, NULL, &rtcmetrics));
DLOGD("ICE Server URL: %s", rtcmetrics.rtcStatsObject.iceServerStats.url);
DLOGD("ICE Server port: %d", rtcmetrics.rtcStatsObject.iceServerStats.port);
DLOGD("ICE Server protocol: %s", rtcmetrics.rtcStatsObject.iceServerStats.protocol);
DLOGD("Total requests sent:%" PRIu64, rtcmetrics.rtcStatsObject.iceServerStats.totalRequestsSent);
DLOGD("Total responses received: %" PRIu64, rtcmetrics.rtcStatsObject.iceServerStats.totalResponsesReceived);
DLOGD("Total round trip time: %" PRIu64 "ms",
rtcmetrics.rtcStatsObject.iceServerStats.totalRoundTripTime / HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
}
CleanUp:
LEAVES();
return retStatus;
}
STATUS awaitGetIceConfigInfoCount(SIGNALING_CLIENT_HANDLE signalingClientHandle, PUINT32 pIceConfigInfoCount)
{
STATUS retStatus = STATUS_SUCCESS;
UINT64 elapsed = 0;
CHK(IS_VALID_SIGNALING_CLIENT_HANDLE(signalingClientHandle) && pIceConfigInfoCount != NULL, STATUS_NULL_ARG);
while (TRUE) {
// Get the configuration count
CHK_STATUS(signalingClientGetIceConfigInfoCount(signalingClientHandle, pIceConfigInfoCount));
// Return OK if we have some ice configs
CHK(*pIceConfigInfoCount == 0, retStatus);
// Check for timeout
CHK_ERR(elapsed <= ASYNC_ICE_CONFIG_INFO_WAIT_TIMEOUT, STATUS_OPERATION_TIMED_OUT, "Couldn't retrieve ICE configurations in alotted time.");
THREAD_SLEEP(ICE_CONFIG_INFO_POLL_PERIOD);
elapsed += ICE_CONFIG_INFO_POLL_PERIOD;
}
CleanUp:
return retStatus;
}
STATUS createSampleStreamingSession(PSampleConfiguration pSampleConfiguration, PCHAR peerId, BOOL isMaster,
PSampleStreamingSession* ppSampleStreamingSession)
{
STATUS retStatus = STATUS_SUCCESS;
RtcMediaStreamTrack videoTrack, audioTrack;
PSampleStreamingSession pSampleStreamingSession = NULL;
MEMSET(&videoTrack, 0x00, SIZEOF(RtcMediaStreamTrack));
MEMSET(&audioTrack, 0x00, SIZEOF(RtcMediaStreamTrack));
CHK(pSampleConfiguration != NULL && ppSampleStreamingSession != NULL, STATUS_NULL_ARG);
CHK((isMaster && peerId != NULL) || !isMaster, STATUS_INVALID_ARG);
pSampleStreamingSession = (PSampleStreamingSession) MEMCALLOC(1, SIZEOF(SampleStreamingSession));
CHK(pSampleStreamingSession != NULL, STATUS_NOT_ENOUGH_MEMORY);
if (isMaster) {
STRCPY(pSampleStreamingSession->peerId, peerId);
} else {
STRCPY(pSampleStreamingSession->peerId, SAMPLE_VIEWER_CLIENT_ID);
}
ATOMIC_STORE_BOOL(&pSampleStreamingSession->peerIdReceived, TRUE);
pSampleStreamingSession->pSampleConfiguration = pSampleConfiguration;
pSampleStreamingSession->rtcMetricsHistory.prevTs = GETTIME();
pSampleStreamingSession->remoteCanTrickleIce = FALSE;
ATOMIC_STORE_BOOL(&pSampleStreamingSession->terminateFlag, FALSE);
ATOMIC_STORE_BOOL(&pSampleStreamingSession->candidateGatheringDone, FALSE);
CHK_STATUS(initializePeerConnection(pSampleConfiguration, &pSampleStreamingSession->pPeerConnection));
CHK_STATUS(peerConnectionOnIceCandidate(pSampleStreamingSession->pPeerConnection, (UINT64) pSampleStreamingSession, onIceCandidateHandler));
CHK_STATUS(
peerConnectionOnConnectionStateChange(pSampleStreamingSession->pPeerConnection, (UINT64) pSampleStreamingSession, onConnectionStateChange));
if (pSampleConfiguration->onDataChannel != NULL) {
CHK_STATUS(peerConnectionOnDataChannel(pSampleStreamingSession->pPeerConnection, (UINT64) pSampleStreamingSession,
pSampleConfiguration->onDataChannel));
}
// Declare that we support H264,Profile=42E01F,level-asymmetry-allowed=1,packetization-mode=1 and Opus
CHK_STATUS(addSupportedCodec(pSampleStreamingSession->pPeerConnection, RTC_CODEC_H264_PROFILE_42E01F_LEVEL_ASYMMETRY_ALLOWED_PACKETIZATION_MODE));
CHK_STATUS(addSupportedCodec(pSampleStreamingSession->pPeerConnection, RTC_CODEC_OPUS));
// Add a SendRecv Transceiver of type video
videoTrack.kind = MEDIA_STREAM_TRACK_KIND_VIDEO;
videoTrack.codec = RTC_CODEC_H264_PROFILE_42E01F_LEVEL_ASYMMETRY_ALLOWED_PACKETIZATION_MODE;
STRCPY(videoTrack.streamId, "myKvsVideoStream");
STRCPY(videoTrack.trackId, "myVideoTrack");
CHK_STATUS(addTransceiver(pSampleStreamingSession->pPeerConnection, &videoTrack, NULL, &pSampleStreamingSession->pVideoRtcRtpTransceiver));
CHK_STATUS(transceiverOnBandwidthEstimation(pSampleStreamingSession->pVideoRtcRtpTransceiver, (UINT64) pSampleStreamingSession,
sampleBandwidthEstimationHandler));
// Add a SendRecv Transceiver of type video
audioTrack.kind = MEDIA_STREAM_TRACK_KIND_AUDIO;
audioTrack.codec = RTC_CODEC_OPUS;
STRCPY(audioTrack.streamId, "myKvsVideoStream");
STRCPY(audioTrack.trackId, "myAudioTrack");
CHK_STATUS(addTransceiver(pSampleStreamingSession->pPeerConnection, &audioTrack, NULL, &pSampleStreamingSession->pAudioRtcRtpTransceiver));
CHK_STATUS(transceiverOnBandwidthEstimation(pSampleStreamingSession->pAudioRtcRtpTransceiver, (UINT64) pSampleStreamingSession,
sampleBandwidthEstimationHandler));
pSampleStreamingSession->firstFrame = TRUE;
pSampleStreamingSession->startUpLatency = 0;
CleanUp:
if (STATUS_FAILED(retStatus) && pSampleStreamingSession != NULL) {
freeSampleStreamingSession(&pSampleStreamingSession);
pSampleStreamingSession = NULL;
}
if (ppSampleStreamingSession != NULL) {
*ppSampleStreamingSession = pSampleStreamingSession;
}
return retStatus;
}
STATUS freeSampleStreamingSession(PSampleStreamingSession* ppSampleStreamingSession)
{
STATUS retStatus = STATUS_SUCCESS;
PSampleStreamingSession pSampleStreamingSession = NULL;
CHK(ppSampleStreamingSession != NULL, STATUS_NULL_ARG);
pSampleStreamingSession = *ppSampleStreamingSession;
CHK(pSampleStreamingSession != NULL, retStatus);
DLOGD("Freeing streaming session with peer id: %s ", pSampleStreamingSession->peerId);
ATOMIC_STORE_BOOL(&pSampleStreamingSession->terminateFlag, TRUE);
if (pSampleStreamingSession->shutdownCallback != NULL) {
pSampleStreamingSession->shutdownCallback(pSampleStreamingSession->shutdownCallbackCustomData, pSampleStreamingSession);
}
if (IS_VALID_TID_VALUE(pSampleStreamingSession->receiveAudioVideoSenderTid)) {
THREAD_JOIN(pSampleStreamingSession->receiveAudioVideoSenderTid, NULL);
}
CHK_LOG_ERR(closePeerConnection(pSampleStreamingSession->pPeerConnection));
CHK_LOG_ERR(freePeerConnection(&pSampleStreamingSession->pPeerConnection));
SAFE_MEMFREE(pSampleStreamingSession);
CleanUp:
CHK_LOG_ERR(retStatus);
return retStatus;
}
STATUS streamingSessionOnShutdown(PSampleStreamingSession pSampleStreamingSession, UINT64 customData,
StreamSessionShutdownCallback streamSessionShutdownCallback)
{
STATUS retStatus = STATUS_SUCCESS;
CHK(pSampleStreamingSession != NULL && streamSessionShutdownCallback != NULL, STATUS_NULL_ARG);
pSampleStreamingSession->shutdownCallbackCustomData = customData;
pSampleStreamingSession->shutdownCallback = streamSessionShutdownCallback;
CleanUp:
return retStatus;
}
VOID sampleFrameHandler(UINT64 customData, PFrame pFrame)
{
UNUSED_PARAM(customData);
DLOGV("Frame received. TrackId: %" PRIu64 ", Size: %u, Flags %u", pFrame->trackId, pFrame->size, pFrame->flags);
PSampleStreamingSession pSampleStreamingSession = (PSampleStreamingSession) customData;
if (pSampleStreamingSession->firstFrame) {
pSampleStreamingSession->firstFrame = FALSE;
pSampleStreamingSession->startUpLatency = (GETTIME() - pSampleStreamingSession->offerReceiveTime) / HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
printf("Start up latency from offer to first frame: %" PRIu64 "ms\n", pSampleStreamingSession->startUpLatency);
}
}
VOID sampleBandwidthEstimationHandler(UINT64 customData, DOUBLE maxiumBitrate)
{
UNUSED_PARAM(customData);
DLOGV("received bitrate suggestion: %f", maxiumBitrate);
}
STATUS handleRemoteCandidate(PSampleStreamingSession pSampleStreamingSession, PSignalingMessage pSignalingMessage)
{
STATUS retStatus = STATUS_SUCCESS;
RtcIceCandidateInit iceCandidate;
CHK_STATUS(deserializeRtcIceCandidateInit(pSignalingMessage->payload, pSignalingMessage->payloadLen, &iceCandidate));
CHK_STATUS(addIceCandidate(pSampleStreamingSession->pPeerConnection, iceCandidate.candidate));
CleanUp:
CHK_LOG_ERR(retStatus);
return retStatus;
}
STATUS traverseDirectoryPEMFileScan(UINT64 customData, DIR_ENTRY_TYPES entryType, PCHAR fullPath, PCHAR fileName)
{
UNUSED_PARAM(entryType);
UNUSED_PARAM(fullPath);
PCHAR certName = (PCHAR) customData;
UINT32 fileNameLen = STRLEN(fileName);
if (fileNameLen > ARRAY_SIZE(CA_CERT_PEM_FILE_EXTENSION) + 1 &&
(STRCMPI(CA_CERT_PEM_FILE_EXTENSION, &fileName[fileNameLen - ARRAY_SIZE(CA_CERT_PEM_FILE_EXTENSION) + 1]) == 0)) {
certName[0] = FPATHSEPARATOR;
certName++;
STRCPY(certName, fileName);
}
return STATUS_SUCCESS;
}
STATUS lookForSslCert(PSampleConfiguration* ppSampleConfiguration)
{
STATUS retStatus = STATUS_SUCCESS;
struct stat pathStat;
CHAR certName[MAX_PATH_LEN];
PSampleConfiguration pSampleConfiguration = *ppSampleConfiguration;
MEMSET(certName, 0x0, ARRAY_SIZE(certName));
pSampleConfiguration->pCaCertPath = getenv(CACERT_PATH_ENV_VAR);
// if ca cert path is not set from the environment, try to use the one that cmake detected
if (pSampleConfiguration->pCaCertPath == NULL) {
CHK_ERR(STRNLEN(DEFAULT_KVS_CACERT_PATH, MAX_PATH_LEN) > 0, STATUS_INVALID_OPERATION, "No ca cert path given (error:%s)", strerror(errno));
pSampleConfiguration->pCaCertPath = DEFAULT_KVS_CACERT_PATH;
} else {
// Check if the environment variable is a path
CHK(0 == FSTAT(pSampleConfiguration->pCaCertPath, &pathStat), STATUS_DIRECTORY_ENTRY_STAT_ERROR);
if (S_ISDIR(pathStat.st_mode)) {
CHK_STATUS(traverseDirectory(pSampleConfiguration->pCaCertPath, (UINT64) &certName, /* iterate */ FALSE, traverseDirectoryPEMFileScan));
if (certName[0] != 0x0) {
STRCAT(pSampleConfiguration->pCaCertPath, certName);
} else {
DLOGW("Cert not found in path set...checking if CMake detected a path\n");
CHK_ERR(STRNLEN(DEFAULT_KVS_CACERT_PATH, MAX_PATH_LEN) > 0, STATUS_INVALID_OPERATION, "No ca cert path given (error:%s)",
strerror(errno));
DLOGI("CMake detected cert path\n");
pSampleConfiguration->pCaCertPath = DEFAULT_KVS_CACERT_PATH;
}
}
}
CleanUp:
CHK_LOG_ERR(retStatus);
return retStatus;
}
STATUS createSampleConfiguration(PCHAR channelName, SIGNALING_CHANNEL_ROLE_TYPE roleType, BOOL trickleIce, BOOL useTurn,
PSampleConfiguration* ppSampleConfiguration)
{
STATUS retStatus = STATUS_SUCCESS;
PCHAR pAccessKey, pSecretKey, pSessionToken, pLogLevel;
PSampleConfiguration pSampleConfiguration = NULL;
UINT32 logLevel;
CHK(ppSampleConfiguration != NULL, STATUS_NULL_ARG);
CHK(NULL != (pSampleConfiguration = (PSampleConfiguration) MEMCALLOC(1, SIZEOF(SampleConfiguration))), STATUS_NOT_ENOUGH_MEMORY);
CHK_ERR((pAccessKey = getenv(ACCESS_KEY_ENV_VAR)) != NULL, STATUS_INVALID_OPERATION, "AWS_ACCESS_KEY_ID must be set");
CHK_ERR((pSecretKey = getenv(SECRET_KEY_ENV_VAR)) != NULL, STATUS_INVALID_OPERATION, "AWS_SECRET_ACCESS_KEY must be set");
pSessionToken = getenv(SESSION_TOKEN_ENV_VAR);
pSampleConfiguration->enableFileLogging = FALSE;
if (NULL != getenv(ENABLE_FILE_LOGGING)) {
pSampleConfiguration->enableFileLogging = TRUE;
}
if ((pSampleConfiguration->channelInfo.pRegion = getenv(DEFAULT_REGION_ENV_VAR)) == NULL) {
pSampleConfiguration->channelInfo.pRegion = DEFAULT_AWS_REGION;
}
CHK_STATUS(lookForSslCert(&pSampleConfiguration));
// Set the logger log level
if (NULL == (pLogLevel = getenv(DEBUG_LOG_LEVEL_ENV_VAR)) || (STATUS_SUCCESS != STRTOUI32(pLogLevel, NULL, 10, &logLevel))) {
logLevel = LOG_LEVEL_WARN;
}
SET_LOGGER_LOG_LEVEL(logLevel);
CHK_STATUS(
createStaticCredentialProvider(pAccessKey, 0, pSecretKey, 0, pSessionToken, 0, MAX_UINT64, &pSampleConfiguration->pCredentialProvider));
pSampleConfiguration->audioSenderTid = INVALID_TID_VALUE;
pSampleConfiguration->videoSenderTid = INVALID_TID_VALUE;
pSampleConfiguration->signalingClientHandle = INVALID_SIGNALING_CLIENT_HANDLE_VALUE;
pSampleConfiguration->sampleConfigurationObjLock = MUTEX_CREATE(TRUE);
pSampleConfiguration->cvar = CVAR_CREATE();
pSampleConfiguration->streamingSessionListReadLock = MUTEX_CREATE(FALSE);
/* This is ignored for master. Master can extract the info from offer. Viewer has to know if peer can trickle or
* not ahead of time. */
pSampleConfiguration->trickleIce = trickleIce;
pSampleConfiguration->useTurn = useTurn;
pSampleConfiguration->channelInfo.version = CHANNEL_INFO_CURRENT_VERSION;
pSampleConfiguration->channelInfo.pChannelName = channelName;
pSampleConfiguration->channelInfo.pKmsKeyId = NULL;
pSampleConfiguration->channelInfo.tagCount = 0;
pSampleConfiguration->channelInfo.pTags = NULL;
pSampleConfiguration->channelInfo.channelType = SIGNALING_CHANNEL_TYPE_SINGLE_MASTER;
pSampleConfiguration->channelInfo.channelRoleType = roleType;
pSampleConfiguration->channelInfo.cachingPolicy = SIGNALING_API_CALL_CACHE_TYPE_FILE;
pSampleConfiguration->channelInfo.cachingPeriod = SIGNALING_API_CALL_CACHE_TTL_SENTINEL_VALUE;
pSampleConfiguration->channelInfo.asyncIceServerConfig = TRUE;
pSampleConfiguration->channelInfo.retry = TRUE;
pSampleConfiguration->channelInfo.reconnect = TRUE;
pSampleConfiguration->channelInfo.pCertPath = pSampleConfiguration->pCaCertPath;
pSampleConfiguration->channelInfo.messageTtl = 0; // Default is 60 seconds
pSampleConfiguration->signalingClientCallbacks.version = SIGNALING_CLIENT_CALLBACKS_CURRENT_VERSION;
pSampleConfiguration->signalingClientCallbacks.errorReportFn = signalingClientError;
pSampleConfiguration->signalingClientCallbacks.stateChangeFn = signalingClientStateChanged;
pSampleConfiguration->signalingClientCallbacks.customData = (UINT64) pSampleConfiguration;
pSampleConfiguration->clientInfo.version = SIGNALING_CLIENT_INFO_CURRENT_VERSION;
pSampleConfiguration->clientInfo.loggingLevel = logLevel;
pSampleConfiguration->iceCandidatePairStatsTimerId = MAX_UINT32;
ATOMIC_STORE_BOOL(&pSampleConfiguration->interrupted, FALSE);
ATOMIC_STORE_BOOL(&pSampleConfiguration->mediaThreadStarted, FALSE);
ATOMIC_STORE_BOOL(&pSampleConfiguration->appTerminateFlag, FALSE);
ATOMIC_STORE_BOOL(&pSampleConfiguration->recreateSignalingClient, FALSE);
CHK_STATUS(timerQueueCreate(&pSampleConfiguration->timerQueueHandle));
pSampleConfiguration->iceUriCount = 0;
CHK_STATUS(hashTableCreateWithParams(SAMPLE_HASH_TABLE_BUCKET_COUNT, SAMPLE_HASH_TABLE_BUCKET_LENGTH,
&pSampleConfiguration->pPendingSignalingMessageForRemoteClient));
CHK_STATUS(hashTableCreateWithParams(SAMPLE_HASH_TABLE_BUCKET_COUNT, SAMPLE_HASH_TABLE_BUCKET_LENGTH,
&pSampleConfiguration->pRtcPeerConnectionForRemoteClient));
CleanUp:
if (STATUS_FAILED(retStatus)) {
freeSampleConfiguration(&pSampleConfiguration);
}
if (ppSampleConfiguration != NULL) {
*ppSampleConfiguration = pSampleConfiguration;
}
return retStatus;
}
STATUS logSignalingClientStats(PSignalingClientMetrics pSignalingClientMetrics)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
CHK(pSignalingClientMetrics != NULL, STATUS_NULL_ARG);
DLOGD("Signaling client connection duration: %" PRIu64 " ms",
(pSignalingClientMetrics->signalingClientStats.connectionDuration / HUNDREDS_OF_NANOS_IN_A_MILLISECOND));
DLOGD("Number of signaling client API errors: %d", pSignalingClientMetrics->signalingClientStats.numberOfErrors);
DLOGD("Number of runtime errors in the session: %d", pSignalingClientMetrics->signalingClientStats.numberOfRuntimeErrors);
DLOGD("Signaling client uptime: %" PRIu64 " ms",
(pSignalingClientMetrics->signalingClientStats.connectionDuration / HUNDREDS_OF_NANOS_IN_A_MILLISECOND));
// This gives the EMA of the createChannel, describeChannel, getChannelEndpoint and deleteChannel calls
DLOGD("Control Plane API call latency: %" PRIu64 " ms",
(pSignalingClientMetrics->signalingClientStats.cpApiCallLatency / HUNDREDS_OF_NANOS_IN_A_MILLISECOND));
// This gives the EMA of the getIceConfig() call.
DLOGD("Data Plane API call latency: %" PRIu64 " ms",
(pSignalingClientMetrics->signalingClientStats.dpApiCallLatency / HUNDREDS_OF_NANOS_IN_A_MILLISECOND));
CleanUp:
LEAVES();
return retStatus;
}
STATUS getIceCandidatePairStatsCallback(UINT32 timerId, UINT64 currentTime, UINT64 customData)
{
UNUSED_PARAM(timerId);
UNUSED_PARAM(currentTime);
STATUS retStatus = STATUS_SUCCESS;
PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData;
UINT32 i;
pSampleConfiguration->rtcIceCandidatePairMetrics.requestedTypeOfStats = RTC_STATS_TYPE_CANDIDATE_PAIR;
UINT64 currentMeasureDuration = 0;
DOUBLE averagePacketsDiscardedOnSend = 0.0;
DOUBLE averageNumberOfPacketsSentPerSecond = 0.0;
DOUBLE averageNumberOfPacketsReceivedPerSecond = 0.0;
DOUBLE outgoingBitrate = 0.0;
DOUBLE incomingBitrate = 0.0;
if (pSampleConfiguration == NULL) {
DLOGW("[KVS Master] getPeriodicStats(): operation returned status code: 0x%08x \n", STATUS_NULL_ARG);
goto CleanUp;
}
for (i = 0; i < pSampleConfiguration->streamingSessionCount; ++i) {
if (STATUS_SUCCEEDED(rtcPeerConnectionGetMetrics(pSampleConfiguration->sampleStreamingSessionList[i]->pPeerConnection, NULL,
&pSampleConfiguration->rtcIceCandidatePairMetrics))) {
currentMeasureDuration = (pSampleConfiguration->rtcIceCandidatePairMetrics.timestamp -
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevTs) /
HUNDREDS_OF_NANOS_IN_A_SECOND;
DLOGD("Current duration: %" PRIu64 " seconds", currentMeasureDuration);
if (currentMeasureDuration > 0) {
DLOGD("Selected local candidate ID: %s",
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.localCandidateId);
DLOGD("Selected remote candidate ID: %s",
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.remoteCandidateId);
// TODO: Display state as a string for readability
DLOGD("Ice Candidate Pair state: %d", pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.state);
DLOGD("Nomination state: %s",
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.nominated ? "nominated"
: "not nominated");
averageNumberOfPacketsSentPerSecond =
(DOUBLE)(pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsSent -
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfPacketsSent) /
(DOUBLE) currentMeasureDuration;
DLOGD("Packet send rate: %lf pkts/sec", averageNumberOfPacketsSentPerSecond);
averageNumberOfPacketsReceivedPerSecond =
(DOUBLE)(pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsReceived -
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfPacketsReceived) /
(DOUBLE) currentMeasureDuration;
DLOGD("Packet receive rate: %lf pkts/sec", averageNumberOfPacketsReceivedPerSecond);
outgoingBitrate = (DOUBLE)((pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.bytesSent -
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfBytesSent) *
8.0) /
currentMeasureDuration;
DLOGD("Outgoing bit rate: %lf bps", outgoingBitrate);
incomingBitrate = (DOUBLE)((pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.bytesReceived -
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfBytesReceived) *
8.0) /
currentMeasureDuration;
DLOGD("Incoming bit rate: %lf bps", incomingBitrate);
averagePacketsDiscardedOnSend =
(DOUBLE)(pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsDiscardedOnSend -
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevPacketsDiscardedOnSend) /
(DOUBLE) currentMeasureDuration;
DLOGD("Packet discard rate: %lf pkts/sec", averagePacketsDiscardedOnSend);
DLOGD("Current STUN request round trip time: %lf sec",
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.currentRoundTripTime);
DLOGD("Number of STUN responses received: %llu",
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.responsesReceived);
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevTs =
pSampleConfiguration->rtcIceCandidatePairMetrics.timestamp;
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfPacketsSent =
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsSent;
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfPacketsReceived =
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsReceived;
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfBytesSent =
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.bytesSent;
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevNumberOfBytesReceived =
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.bytesReceived;
pSampleConfiguration->sampleStreamingSessionList[i]->rtcMetricsHistory.prevPacketsDiscardedOnSend =
pSampleConfiguration->rtcIceCandidatePairMetrics.rtcStatsObject.iceCandidatePairStats.packetsDiscardedOnSend;
}
}
}
CleanUp:
return retStatus;
}
STATUS freePendingSignalingMessageQueue(UINT64 customData, PHashEntry pHashEntry)
{
UNUSED_PARAM(customData);
PStackQueue pStackQueue = (PStackQueue) pHashEntry->value;
stackQueueClear(pStackQueue, TRUE);
stackQueueFree(pStackQueue);
return STATUS_SUCCESS;
}
STATUS freeSampleConfiguration(PSampleConfiguration* ppSampleConfiguration)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PSampleConfiguration pSampleConfiguration;
UINT32 i;
BOOL locked = FALSE;
CHK(ppSampleConfiguration != NULL, STATUS_NULL_ARG);
pSampleConfiguration = *ppSampleConfiguration;
hashTableIterateEntries(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, (UINT64) NULL, freePendingSignalingMessageQueue);
hashTableClear(pSampleConfiguration->pPendingSignalingMessageForRemoteClient);
hashTableFree(pSampleConfiguration->pPendingSignalingMessageForRemoteClient);
hashTableClear(pSampleConfiguration->pRtcPeerConnectionForRemoteClient);
hashTableFree(pSampleConfiguration->pRtcPeerConnectionForRemoteClient);
CHK(pSampleConfiguration != NULL, retStatus);
if (IS_VALID_MUTEX_VALUE(pSampleConfiguration->sampleConfigurationObjLock)) {
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = TRUE;
}
for (i = 0; i < pSampleConfiguration->streamingSessionCount; ++i) {
retStatus = gatherIceServerStats(pSampleConfiguration->sampleStreamingSessionList[i]);
if (STATUS_FAILED(retStatus)) {
DLOGW("Failed to ICE Server Stats for streaming session %d: %08x", i, retStatus);
}
freeSampleStreamingSession(&pSampleConfiguration->sampleStreamingSessionList[i]);
}
if (locked) {
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
}
deinitKvsWebRtc();
SAFE_MEMFREE(pSampleConfiguration->pVideoFrameBuffer);
SAFE_MEMFREE(pSampleConfiguration->pAudioFrameBuffer);
if (IS_VALID_CVAR_VALUE(pSampleConfiguration->cvar) && IS_VALID_MUTEX_VALUE(pSampleConfiguration->sampleConfigurationObjLock)) {
CVAR_BROADCAST(pSampleConfiguration->cvar);
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
}
if (IS_VALID_MUTEX_VALUE(pSampleConfiguration->sampleConfigurationObjLock)) {
MUTEX_FREE(pSampleConfiguration->sampleConfigurationObjLock);
}
if (IS_VALID_MUTEX_VALUE(pSampleConfiguration->streamingSessionListReadLock)) {
MUTEX_FREE(pSampleConfiguration->streamingSessionListReadLock);
}
if (IS_VALID_CVAR_VALUE(pSampleConfiguration->cvar)) {
CVAR_FREE(pSampleConfiguration->cvar);
}
freeStaticCredentialProvider(&pSampleConfiguration->pCredentialProvider);
if (pSampleConfiguration->iceCandidatePairStatsTimerId != MAX_UINT32) {
CHK_STATUS(timerQueueCancelTimer(pSampleConfiguration->timerQueueHandle, pSampleConfiguration->iceCandidatePairStatsTimerId,
(UINT64) pSampleConfiguration));
pSampleConfiguration->iceCandidatePairStatsTimerId = MAX_UINT32;
}
if (IS_VALID_TIMER_QUEUE_HANDLE(pSampleConfiguration->timerQueueHandle)) {
timerQueueFree(&pSampleConfiguration->timerQueueHandle);
}
MEMFREE(*ppSampleConfiguration);
*ppSampleConfiguration = NULL;
CleanUp:
LEAVES();
return retStatus;
}
STATUS sessionCleanupWait(PSampleConfiguration pSampleConfiguration)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PSampleStreamingSession pSampleStreamingSession = NULL;
UINT32 i;
BOOL locked = FALSE;
SIGNALING_CLIENT_STATE signalingClientState;
CHK(pSampleConfiguration != NULL, STATUS_NULL_ARG);
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = TRUE;
while (!ATOMIC_LOAD_BOOL(&pSampleConfiguration->interrupted)) {
// scan and cleanup terminated streaming session
for (i = 0; i < pSampleConfiguration->streamingSessionCount; ++i) {
if (ATOMIC_LOAD_BOOL(&pSampleConfiguration->sampleStreamingSessionList[i]->terminateFlag)) {
pSampleStreamingSession = pSampleConfiguration->sampleStreamingSessionList[i];
MUTEX_LOCK(pSampleConfiguration->streamingSessionListReadLock);
// swap with last element and decrement count
pSampleConfiguration->streamingSessionCount--;
pSampleConfiguration->sampleStreamingSessionList[i] =
pSampleConfiguration->sampleStreamingSessionList[pSampleConfiguration->streamingSessionCount];
MUTEX_UNLOCK(pSampleConfiguration->streamingSessionListReadLock);
CHK_STATUS(freeSampleStreamingSession(&pSampleStreamingSession));
}
}
// periodically wake up and clean up terminated streaming session
CVAR_WAIT(pSampleConfiguration->cvar, pSampleConfiguration->sampleConfigurationObjLock, 5 * HUNDREDS_OF_NANOS_IN_A_SECOND);
// Check if we need to re-create the signaling client on-the-fly
if (ATOMIC_LOAD_BOOL(&pSampleConfiguration->recreateSignalingClient) &&
STATUS_SUCCEEDED(freeSignalingClient(&pSampleConfiguration->signalingClientHandle)) &&
STATUS_SUCCEEDED(createSignalingClientSync(&pSampleConfiguration->clientInfo, &pSampleConfiguration->channelInfo,
&pSampleConfiguration->signalingClientCallbacks, pSampleConfiguration->pCredentialProvider,
&pSampleConfiguration->signalingClientHandle))) {
// Re-set the variable again
ATOMIC_STORE_BOOL(&pSampleConfiguration->recreateSignalingClient, FALSE);
}
// Check the signaling client state and connect if needed
if (IS_VALID_SIGNALING_CLIENT_HANDLE(pSampleConfiguration->signalingClientHandle)) {
CHK_STATUS(signalingClientGetCurrentState(pSampleConfiguration->signalingClientHandle, &signalingClientState));
if (signalingClientState == SIGNALING_CLIENT_STATE_READY) {
UNUSED_PARAM(signalingClientConnectSync(pSampleConfiguration->signalingClientHandle));
}
}
}
CleanUp:
CHK_LOG_ERR(retStatus);
if (locked) {
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
}
LEAVES();
return retStatus;
}
STATUS submitPendingIceCandidate(PStackQueue pPendingMessageQueue, PSampleStreamingSession pSampleStreamingSession)
{
STATUS retStatus = STATUS_SUCCESS;
BOOL noPendingSignalingMessageForClient = FALSE;
PReceivedSignalingMessage pReceivedSignalingMessage = NULL;
do {
CHK_STATUS(stackQueueIsEmpty(pPendingMessageQueue, &noPendingSignalingMessageForClient));
if (!noPendingSignalingMessageForClient) {
CHK_STATUS(stackQueueDequeue(pPendingMessageQueue, (PUINT64) &pReceivedSignalingMessage));
if (pReceivedSignalingMessage->signalingMessage.messageType == SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE) {
CHK_STATUS(handleRemoteCandidate(pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage));
}
MEMFREE(pReceivedSignalingMessage);
}
} while (!noPendingSignalingMessageForClient);
CHK_STATUS(stackQueueFree(pPendingMessageQueue));
CleanUp:
CHK_LOG_ERR(retStatus);
return retStatus;
}
STATUS signalingMessageReceived(UINT64 customData, PReceivedSignalingMessage pReceivedSignalingMessage)
{
STATUS retStatus = STATUS_SUCCESS;
PSampleConfiguration pSampleConfiguration = (PSampleConfiguration) customData;
BOOL peerConnectionFound = FALSE;
BOOL locked = TRUE;
UINT32 clientIdHash;
PStackQueue pPendingMessageQueue = NULL;
PSampleStreamingSession pSampleStreamingSession = NULL;
PReceivedSignalingMessage pReceivedSignalingMessageCopy = NULL;
CHK(pSampleConfiguration != NULL, STATUS_NULL_ARG);
MUTEX_LOCK(pSampleConfiguration->sampleConfigurationObjLock);
locked = TRUE;
clientIdHash = COMPUTE_CRC32((PBYTE) pReceivedSignalingMessage->signalingMessage.peerClientId,
(UINT32) STRLEN(pReceivedSignalingMessage->signalingMessage.peerClientId));
CHK_STATUS(hashTableContains(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, &peerConnectionFound));
if (peerConnectionFound) {
CHK_STATUS(hashTableGet(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, (PUINT64) &pSampleStreamingSession));
}
switch (pReceivedSignalingMessage->signalingMessage.messageType) {
case SIGNALING_MESSAGE_TYPE_OFFER:
/*
* Create new streaming session for each offer, then insert the client id and streaming session into
* pRtcPeerConnectionForRemoteClient for subsequent ice candidate messages. Lastly check if there is
* any ice candidate messages queued in pPendingSignalingMessageForRemoteClient. If so then submit
* all of them.
*/
if (pSampleConfiguration->streamingSessionCount == SIZEOF(pSampleConfiguration->sampleStreamingSessionList)) {
DLOGW("Max simultaneous streaming session count reached.");
CHK(FALSE, retStatus);
}
CHK_STATUS(createSampleStreamingSession(pSampleConfiguration, pReceivedSignalingMessage->signalingMessage.peerClientId, TRUE,
&pSampleStreamingSession));
pSampleStreamingSession->offerReceiveTime = GETTIME();
MUTEX_LOCK(pSampleConfiguration->streamingSessionListReadLock);
pSampleConfiguration->sampleStreamingSessionList[pSampleConfiguration->streamingSessionCount++] = pSampleStreamingSession;
MUTEX_UNLOCK(pSampleConfiguration->streamingSessionListReadLock);
CHK_STATUS(handleOffer(pSampleConfiguration, pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage));
CHK_STATUS(hashTablePut(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pSampleStreamingSession));
// If there are any ice candidate messages in the queue for this client id, submit them now.
if (STATUS_SUCCEEDED(
hashTableGet(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, (PUINT64) &pPendingMessageQueue))) {
CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pSampleStreamingSession));
CHK_STATUS(hashTableRemove(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash));
}
break;
case SIGNALING_MESSAGE_TYPE_ANSWER:
/*
* for viewer, pSampleStreamingSession should've already been created. insert the client id and
* streaming session into pRtcPeerConnectionForRemoteClient for subsequent ice candidate messages.
* Lastly check if there is any ice candidate messages queued in pPendingSignalingMessageForRemoteClient.
* If so then submit all of them.
*/
pSampleStreamingSession = pSampleConfiguration->sampleStreamingSessionList[0];
CHK_STATUS(handleAnswer(pSampleConfiguration, pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage));
CHK_STATUS(hashTablePut(pSampleConfiguration->pRtcPeerConnectionForRemoteClient, clientIdHash, (UINT64) pSampleStreamingSession));
// If there are any ice candidate messages in the queue for this client id, submit them now.
if (STATUS_SUCCEEDED(
hashTableGet(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, (PUINT64) &pPendingMessageQueue))) {
CHK_STATUS(submitPendingIceCandidate(pPendingMessageQueue, pSampleStreamingSession));
CHK_STATUS(hashTableRemove(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash));
}
break;
case SIGNALING_MESSAGE_TYPE_ICE_CANDIDATE:
/*
* if peer connection hasn't been created, create an queue to store the ice candidate message. Otherwise
* submit the signaling message into the corresponding streaming session.
*/
if (!peerConnectionFound) {
if (STATUS_HASH_KEY_NOT_PRESENT ==
hashTableGet(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, (PUINT64) &pPendingMessageQueue)) {
CHK_STATUS(stackQueueCreate(&pPendingMessageQueue));
CHK_STATUS(
hashTablePut(pSampleConfiguration->pPendingSignalingMessageForRemoteClient, clientIdHash, (UINT64) pPendingMessageQueue));
}
pReceivedSignalingMessageCopy = (ReceivedSignalingMessage*)MEMCALLOC(1, SIZEOF(ReceivedSignalingMessage));
*pReceivedSignalingMessageCopy = *pReceivedSignalingMessage;
CHK_STATUS(stackQueueEnqueue(pPendingMessageQueue, (UINT64) pReceivedSignalingMessageCopy));
} else {
CHK_STATUS(handleRemoteCandidate(pSampleStreamingSession, &pReceivedSignalingMessage->signalingMessage));
}
break;
default:
DLOGD("Unhandled signaling message type %u", pReceivedSignalingMessage->signalingMessage.messageType);
break;
}
CleanUp:
if (locked) {
MUTEX_UNLOCK(pSampleConfiguration->sampleConfigurationObjLock);
}
CHK_LOG_ERR(retStatus);
return retStatus;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment