Skip to content

Instantly share code, notes, and snippets.

@SCG82
Forked from mysteryjeans/Conductor.cc
Created February 24, 2020 23:56
Show Gist options
  • Save SCG82/fd9a9398310b3bcacd641d3a47d88911 to your computer and use it in GitHub Desktop.
Save SCG82/fd9a9398310b3bcacd641d3a47d88911 to your computer and use it in GitHub Desktop.
WebRTC Conductor using custom Audio & Video source
#include "audiorenderer.h"
#include "videorenderer.h"
#include "audiocapturemodule.h"
#include "yuvframecapture.h"
#include "conductor.h"
#include "webrtc/api/test/fakeconstraints.h"
#include "webrtc/video_encoder.h"
#include "webrtc/modules/video_coding/codecs/vp8/simulcast_encoder_adapter.h"
#include "webrtc/modules/video_coding/codecs/vp8/include/vp8.h"
#include "webrtc/modules/video_capture/video_capture_factory.h"
#include "webrtc/media/engine/webrtcvideocapturerfactory.h"
// for servers
#include "webrtc/p2p/base/relayserver.h"
#include "webrtc/p2p/base/stunserver.h"
#include "webrtc/p2p/base/basicpacketsocketfactory.h"
#include "webrtc/p2p/base/turnserver.h"
#include "webrtc/base/asyncudpsocket.h"
#include "webrtc/base/optionsfile.h"
#include "webrtc/base/stringencode.h"
#include "webrtc/base/thread.h"
namespace nr_webrtc
{
const char kAudioLabel[] = "audio_label";
const char kVideoLabel[] = "video_label";
const char kStreamLabel[] = "stream_label";
const char kSoftware[] = "libjingle TurnServer";
class TurnFileAuth : public cricket::TurnAuthInterface
{
public:
explicit TurnFileAuth(const std::string& path) : file_(path)
{
}
bool Load()
{
return file_.Load();
}
virtual bool GetKey(const std::string& username, const std::string& realm, std::string* key)
{
// File is stored as lines of <username>=<HA1>.
// Generate HA1 via "echo -n "<username>:<realm>:<password>" | md5sum"
std::string hex;
bool ret = file_.GetStringValue(username, &hex);
if (ret)
{
char buf[32];
size_t len = rtc::hex_decode(buf, sizeof(buf), hex);
*key = std::string(buf, len);
}
return ret;
}
private:
rtc::OptionsFile file_;
};
Conductor::Conductor()
{
this->OnErrorHook = nullptr;
this->OnSuccessHook = nullptr;
this->OnFailureHook = nullptr;
this->OnIceCandidateHook = nullptr;
this->OnDataMessageHook = nullptr;
this->OnDataBinaryMessageHook = nullptr;
this->width = 640;
this->height = 360;
this->caputureFps = 5;
this->turn_server = nullptr;
this->stun_server = nullptr;
this->data_channel = nullptr;
this->video_capture = nullptr;
this->audio_capture = nullptr;
this->worker_thread = nullptr;
this->signaling_thread = nullptr;
}
Conductor::~Conductor()
{
this->DeInitialize();
ASSERT(peer_connection == nullptr);
this->signaling_thread = nullptr;
if (this->worker_thread)
{
this->worker_thread->Quit();
delete this->worker_thread;
}
if (turn_server)
turn_server->disconnect_all();
if (stun_server)
stun_server->disconnect_all();
if (turn_server || stun_server)
rtc::Thread::Current()->Quit();
}
bool Conductor::Initialize(bool audio_stream, bool video_stream) {
ASSERT(pc_factory == nullptr);
ASSERT(peer_connection == nullptr);
if (audio_stream)
this->audio_capture = new AudioCaptureModule();
this->signaling_thread = new rtc::Thread();
bool wrap = this->signaling_thread->WrapCurrent();
ASSERT(wrap);
/* this->worker_thread = new rtc::Thread();
bool start = this->worker_thread->Start();
ASSERT(start);*/
this->pc_factory = webrtc::CreatePeerConnectionFactory(
this->signaling_thread,
this->signaling_thread,
this->audio_capture,
nullptr,
nullptr);
if (!this->pc_factory)
return false;
webrtc::PeerConnectionFactoryInterface::Options opt;
{
//opt.disable_encryption = true;
//opt.disable_network_monitor = true;
//opt.disable_sctp_data_channels = true;
this->pc_factory->SetOptions(opt);
}
if (!this->CreatePeerConnection(true))
return false;
return this->AddStreams(audio_stream, video_stream) && this->peer_connection != nullptr;
}
void Conductor::DeInitialize()
{
if (this->data_channel)
{
this->data_channel->UnregisterObserver();
this->data_channel = nullptr;
}
this->audio_capture = nullptr;
this->video_capture = nullptr;
this->local_video.reset(nullptr);
if (this->peer_connection.get())
{
for (auto it = this->active_streams.begin(); it != this->active_streams.end(); ++it) {
this->peer_connection->RemoveStream(it->second);
}
this->active_streams.clear();
this->peer_connection->Close();
this->peer_connection = nullptr;
}
this->serverConfigs.clear();
this->pc_factory = nullptr;
}
bool Conductor::CreatePeerConnection(bool dtls)
{
ASSERT(pc_factory != nullptr);
ASSERT(peer_connection == nullptr);
webrtc::PeerConnectionInterface::RTCConfiguration config;
config.tcp_candidate_policy = webrtc::PeerConnectionInterface::kTcpCandidatePolicyDisabled;
config.disable_ipv6 = true;
config.enable_dtls_srtp = rtc::Optional<bool>(dtls);
config.rtcp_mux_policy = webrtc::PeerConnectionInterface::kRtcpMuxPolicyRequire;
for each (auto server in this->serverConfigs) {
config.servers.push_back(server);
}
webrtc::FakeConstraints constraints;
constraints.SetAllowDtlsSctpDataChannels();
constraints.SetMandatoryReceiveVideo(false);
constraints.SetMandatoryReceiveAudio(false);
constraints.SetMandatoryIceRestart(true);
constraints.SetMandatoryUseRtpMux(true);
constraints.AddMandatory(webrtc::MediaConstraintsInterface::kVoiceActivityDetection, "false");
constraints.AddMandatory(webrtc::MediaConstraintsInterface::kEnableIPv6, "false");
this->peer_connection = this->pc_factory->CreatePeerConnection(config, &constraints, NULL, NULL, this);
return this->peer_connection != nullptr;
}
bool Conductor::AddStreams(bool audio_stream, bool video_stream)
{
if (this->active_streams.find(kStreamLabel) != this->active_streams.end())
return false; // Already added.
auto stream = this->pc_factory->CreateLocalMediaStream(kStreamLabel);
if (audio_stream) {
auto a = this->pc_factory->CreateAudioSource(NULL);
auto audio_track = this->pc_factory->CreateAudioTrack(kAudioLabel, a);
stream->AddTrack(audio_track);
}
if (video_stream) {
this->video_capture = new nr_webrtc::YuvFrameCapture(*this);
auto v = this->pc_factory->CreateVideoSource(this->video_capture);
auto video_track = pc_factory->CreateVideoTrack(kVideoLabel, v);
stream->AddTrack(video_track);
this->local_video.reset(new VideoRenderer(*this, false, video_track));
}
if (!this->peer_connection->AddStream(stream))
{
stream = nullptr;
return false;
}
typedef std::pair<std::string, rtc::scoped_refptr<webrtc::MediaStreamInterface>> MediaStreamPair;
this->active_streams.insert(MediaStreamPair(stream->label(), stream));
return true;
}
bool Conductor::ProcessMessages(int delay) {
return rtc::Thread::Current()->ProcessMessages(delay);
}
uint8_t * Conductor::VideoCapturerI420Buffer() {
if (this->video_capture)
return (uint8_t*)this->video_capture->video_buffer->DataY();
return nullptr;
}
void Conductor::PushVideoFrame(uint8_t * rgbBuffer, int bits) {
auto yuv = this->VideoCapturerI420Buffer();
if (yuv)
{
Conductor::RGBToYUVI420(this->width, this->height, bits, rgbBuffer, yuv);
this->video_capture->PushFrame();
}
}
void Conductor::PushAudioFrame(const void* audio_data, int bits_per_sample, int sample_rate, int number_of_channels, int number_of_frames) {
if (this->audio_capture)
this->audio_capture->PushFrame(audio_data, bits_per_sample, sample_rate, number_of_channels, number_of_frames);
}
bool Conductor::IsRecordingAudio() {
return this->audio_capture && this->audio_capture->Recording();
}
bool Conductor::IsRecordingVideo() {
return this->video_capture && this->video_capture->IsRunning();
}
void Conductor::AddServerConfig(std::string uri, std::string username, std::string password)
{
webrtc::PeerConnectionInterface::IceServer server;
server.uri = uri;
server.username = username;
server.password = password;
serverConfigs.push_back(server);
}
void Conductor::CreateOffer()
{
peer_connection->CreateOffer(this, nullptr);
}
void Conductor::OnOfferReply(std::string type, std::string sdp)
{
webrtc::SdpParseError error;
webrtc::SessionDescriptionInterface* session_description(webrtc::CreateSessionDescription(type, sdp, &error));
if (!session_description)
{
LOG(WARNING) << "Can't parse received session description message. " << "SdpParseError was: " << error.description;
return;
}
peer_connection->SetRemoteDescription(this, session_description);
}
void Conductor::OnOfferRequest(std::string sdp)
{
webrtc::SdpParseError error;
webrtc::SessionDescriptionInterface* session_description(webrtc::CreateSessionDescription("offer", sdp, &error));
if (!session_description)
{
LOG(WARNING) << "Can't parse received session description message. " << "SdpParseError was: " << error.description;
return;
}
peer_connection->SetRemoteDescription(this, session_description);
webrtc::PeerConnectionInterface::RTCOfferAnswerOptions o;
{
o.voice_activity_detection = false;
o.offer_to_receive_audio = false;
o.offer_to_receive_video = webrtc::PeerConnectionInterface::RTCOfferAnswerOptions::kOfferToReceiveMediaTrue;
}
peer_connection->CreateAnswer(this, o);
}
bool Conductor::AddIceCandidate(std::string sdp_mid, int sdp_mlineindex, std::string sdp)
{
webrtc::SdpParseError error;
webrtc::IceCandidateInterface * candidate = webrtc::CreateIceCandidate(sdp_mid, sdp_mlineindex, sdp, &error);
if (!candidate)
{
LOG(WARNING) << "Can't parse received candidate message. " << "SdpParseError was: " << error.description;
return false;
}
if (!peer_connection)
return false;
if (!peer_connection->AddIceCandidate(candidate))
{
LOG(WARNING) << "Failed to apply the received candidate";
return false;
}
return true;
}
std::vector<std::string> Conductor::GetVideoDevices()
{
std::vector<std::string> device_names;
{
std::unique_ptr<webrtc::VideoCaptureModule::DeviceInfo> info(webrtc::VideoCaptureFactory::CreateDeviceInfo());
if (info)
{
int num_devices = info->NumberOfDevices();
for (int i = 0; i < num_devices; ++i)
{
const uint32_t kSize = 256;
char name[kSize] = { 0 };
char id[kSize] = { 0 };
if (info->GetDeviceName(i, name, kSize, id, kSize) != -1)
{
device_names.push_back(name);
}
}
}
}
return device_names;
}
// Called when a remote stream is added
void Conductor::OnAddStream(rtc::scoped_refptr<webrtc::MediaStreamInterface> stream)
{
LOG(INFO) << __FUNCTION__ << " " << stream->label();
if (this->OnRenderVideoHook)
{
webrtc::VideoTrackVector vtracks = stream->GetVideoTracks();
if (!vtracks.empty())
{
webrtc::VideoTrackInterface* track = vtracks[0];
remote_video.reset(new nr_webrtc::VideoRenderer(*this, true, track));
}
}
if (this->OnRenderAudioHook)
{
webrtc::AudioTrackVector atracks = stream->GetAudioTracks();
if (!atracks.empty())
{
webrtc::AudioTrackInterface* track = atracks[0];
remote_audio.reset(new nr_webrtc::AudioRenderer(*this, true, track));
}
}
}
void Conductor::OnRemoveStream(rtc::scoped_refptr<webrtc::MediaStreamInterface> stream)
{
LOG(INFO) << __FUNCTION__ << " " << stream->label();
remote_video.reset(nullptr);
remote_audio.reset(nullptr);
// lost ownership, do not delete
/*capturer = nullptr;
capturer_internal = nullptr;*/
}
void Conductor::OnIceCandidate(const webrtc::IceCandidateInterface* candidate)
{
LOG(INFO) << __FUNCTION__ << " " << candidate->sdp_mline_index();
std::string sdp;
if (!candidate->ToString(&sdp))
{
LOG(LS_ERROR) << "Failed to serialize candidate";
return;
}
if (this->OnIceCandidateHook != nullptr)
this->OnIceCandidateHook(candidate->sdp_mid().c_str(), candidate->sdp_mline_index(), sdp.c_str());
}
void Conductor::OnSuccess(webrtc::SessionDescriptionInterface* desc)
{
peer_connection->SetLocalDescription(this, desc);
std::string sdp;
desc->ToString(&sdp);
if (this->OnSuccessHook != nullptr)
this->OnSuccessHook(desc->type().c_str(), sdp.c_str());
}
void Conductor::OnFailure(const std::string& error)
{
LOG(LERROR) << error;
if (this->OnFailureHook != nullptr)
this->OnFailureHook(error.c_str());
}
void Conductor::OnError()
{
if (this->OnErrorHook != nullptr)
this->OnErrorHook();
}
void Conductor::CreateDataChannel(const std::string & label)
{
if (!this->peer_connection)
return;
webrtc::DataChannelInit dc_options;
//dc_options.id = 1;
dc_options.maxRetransmits = 1;
dc_options.negotiated = false;
dc_options.ordered = false;
this->data_channel = this->peer_connection->CreateDataChannel(label, &dc_options);
this->data_channel->RegisterObserver(this);
}
void Conductor::OnDataChannel(rtc::scoped_refptr<webrtc::DataChannelInterface> channel)
{
LOG(INFO) << __FUNCTION__ << " " << channel->label();
this->data_channel = channel.get();
this->data_channel->RegisterObserver(this);
}
bool Conductor::DataChannelSendText(const std::string & text)
{
return this->data_channel && this->data_channel->Send(webrtc::DataBuffer(text));
}
bool Conductor::DataChannelSendData(const webrtc::DataBuffer & data)
{
return this->data_channel && this->data_channel->Send(data);
}
bool Conductor::IsDataChannelConnected() {
return this->data_channel != nullptr;
}
// A data buffer was successfully received.
void Conductor::OnMessage(const webrtc::DataBuffer& buffer)
{
LOG(INFO) << __FUNCTION__;
if (buffer.binary)
{
if (this->OnDataBinaryMessageHook != nullptr)
{
auto * data = buffer.data.data();
this->OnDataBinaryMessageHook(data, buffer.size());
}
}
else
{
if (this->OnDataMessageHook != nullptr)
{
std::string msg(buffer.data.data<char>(), buffer.size());
this->OnDataMessageHook(msg.c_str());
}
}
}
bool Conductor::RunStunServer(const std::string & bindIp)
{
rtc::SocketAddress server_addr;
if (!server_addr.FromString(bindIp))
{
LOG(LERROR) << "Unable to parse IP address: " << bindIp;
return false;
}
rtc::Thread * main = rtc::Thread::Current();
rtc::AsyncUDPSocket* server_socket = rtc::AsyncUDPSocket::Create(main->socketserver(), server_addr);
if (!server_socket)
{
LOG(LERROR) << "Failed to create a UDP socket" << std::endl;
return false;
}
stun_server.reset(new cricket::StunServer(server_socket));
LOG(INFO) << "Listening at " << server_addr.ToString() << std::endl;
return true;
}
bool Conductor::RunTurnServer(const std::string & bindIp, const std::string & ip,
const std::string & realm, const std::string & authFile)
{
rtc::SocketAddress int_addr;
if (!int_addr.FromString(bindIp))
{
LOG(LERROR) << "Unable to parse IP address: " << bindIp << std::endl;
return false;
}
rtc::IPAddress ext_addr;
if (!IPFromString(ip, &ext_addr))
{
LOG(LERROR) << "Unable to parse IP address: " << ip << std::endl;
return false;
}
rtc::Thread* main = rtc::Thread::Current();
rtc::AsyncUDPSocket * int_socket = rtc::AsyncUDPSocket::Create(main->socketserver(), int_addr);
if (!int_socket)
{
LOG(LERROR) << "Failed to create a UDP socket bound at" << int_addr.ToString() << std::endl;
return false;
}
TurnFileAuth * auth = new TurnFileAuth(authFile);
if (!auth->Load())
{
LOG(LERROR) << "Failed to load auth file " << authFile << std::endl;
return false;
}
auto t = new cricket::TurnServer(main);
turn_server.reset(t);
t->set_realm(realm);
t->set_software(kSoftware);
t->set_auth_hook(auth);
t->AddInternalSocket(int_socket, cricket::PROTO_UDP);
t->SetExternalSocketFactory(new rtc::BasicPacketSocketFactory(),
rtc::SocketAddress(ext_addr, 0));
LOG(INFO) << "Listening internally at " << int_addr.ToString() << std::endl;
return true;
}
void Conductor::RGBToYUVI420(int width, int height, int bits, uint8_t * image, uint8_t * yuv)
{
int pitch = bits / 8;
int stride = width * pitch;
int strideY = width;
int strideU = width / 2;
int strideV = width / 2;
uint8_t * pimageY = yuv;
uint8_t * pimageU = yuv + strideY * height;
uint8_t * pimageV = yuv + strideY * height + (strideU * ((height + 1) / 2));
int i = 0;
for (int y = 0; y < height; y += 2)
for (int x = 0; x < width; x += 2)
{
int xi = pitch * x;
int yi = height - y - 1;
int xyi = yi * stride + xi;
int yxi = (yi - 1) * stride + xi;
uint8_t r00 = image[xyi + 0];
uint8_t g00 = image[xyi + 1];
uint8_t b00 = image[xyi + 2];
uint8_t r01 = image[xyi + 4];
uint8_t g01 = image[xyi + 5];
uint8_t b01 = image[xyi + 6];
uint8_t r10 = image[yxi + 0];
uint8_t g10 = image[yxi + 1];
uint8_t b10 = image[yxi + 2];
uint8_t r11 = image[yxi + 4];
uint8_t g11 = image[yxi + 5];
uint8_t b11 = image[yxi + 6];
pimageY[y * width + x] = (((66 * r00 + 129 * g00 + 25 * b00 + 128) >> 8) + 16);
pimageY[y * width + x + 1] = (((66 * r01 + 129 * g01 + 25 * b01 + 128) >> 8) + 16);
pimageY[(y + 1) * width + x] = (((66 * r10 + 129 * g10 + 25 * b10 + 128) >> 8) + 16);
pimageY[(y + 1) * width + x + 1] = (((66 * r11 + 129 * g11 + 25 * b11 + 128) >> 8) + 16);
uint8_t u00 = (((112 * r00 - 94 * g00 - 18 * b00 + 128) >> 8) + 128);
uint8_t u01 = (((112 * r01 - 94 * g01 - 18 * b01 + 128) >> 8) + 128);
uint8_t u10 = (((112 * r10 - 94 * g10 - 18 * b10 + 128) >> 8) + 128);
uint8_t u11 = (((112 * r11 - 94 * g11 - 18 * b11 + 128) >> 8) + 128);
uint8_t v00 = (((-38 * r00 - 74 * g00 + 112 * b00 + 128) >> 8) + 128);
uint8_t v01 = (((-38 * r01 - 74 * g01 + 112 * b01 + 128) >> 8) + 128);
uint8_t v10 = (((-38 * r10 - 74 * g10 + 112 * b10 + 128) >> 8) + 128);
uint8_t v11 = (((-38 * r11 - 74 * g11 + 112 * b11 + 128) >> 8) + 128);
pimageU[i] = ((u00 + u01 + u10 + u11) / 4);
pimageV[i++] = ((v00 + v01 + v10 + v11) / 4);
}
}
void Conductor::YUVI420ToRGB(int width, int height, int bits, uint8_t * image, uint8_t * yuv)
{
int i = 0;
int pitch = bits / 8;
int pixels = width * height;
int stride = width * pitch;
int strideY = width;
int strideU = width / 2;
int strideV = width / 2;
uint8_t * imageY = yuv;
uint8_t * imageU = yuv + strideY * height;
uint8_t * imageV = yuv + strideY * height + (strideU * ((height + 1) / 2));
for (int yCord = 0; yCord < height; yCord++)
{
for (int xCord = 0; xCord < width; xCord += 2)
{
int c1 = imageY[yCord * strideY + xCord] - 16;
int c2 = imageY[yCord * strideY + xCord + 1] - 16;
int d = imageU[yCord / 2 * strideU + xCord / 2] - 128;
int e = imageV[yCord / 2 * strideV + xCord / 2] - 128;
image[i++] = std::min(255, std::max(0, (298 * c1 + 409 * e + 128) >> 8));//r
image[i++] = std::min(255, std::max(0, (298 * c1 - 100 * d - 208 * e + 128) >> 8));//g
image[i++] = std::min(255, std::max(0, (298 * c1 + 516 * d + 128) >> 8));//b
image[i++] = std::min(255, std::max(0, (298 * c2 + 409 * e + 128) >> 8));//r
image[i++] = std::min(255, std::max(0, (298 * c2 - 100 * d - 208 * e + 128) >> 8));//g
image[i++] = std::min(255, std::max(0, (298 * c2 + 516 * d + 128) >> 8));//b
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment