Skip to content

Instantly share code, notes, and snippets.

@mysteryjeans
Created June 1, 2017 16:57
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save mysteryjeans/0508f5ca87652a830b0c3f281a77ee0e to your computer and use it in GitHub Desktop.
Save mysteryjeans/0508f5ca87652a830b0c3f281a77ee0e to your computer and use it in GitHub Desktop.
WebRTC Conductor using custom Audio & Video source
#include "audiorenderer.h"
#include "videorenderer.h"
#include "audiocapturemodule.h"
#include "yuvframecapture.h"
#include "conductor.h"
#include "webrtc/api/test/fakeconstraints.h"
#include "webrtc/video_encoder.h"
#include "webrtc/modules/video_coding/codecs/vp8/simulcast_encoder_adapter.h"
#include "webrtc/modules/video_coding/codecs/vp8/include/vp8.h"
#include "webrtc/modules/video_capture/video_capture_factory.h"
#include "webrtc/media/engine/webrtcvideocapturerfactory.h"
// for servers
#include "webrtc/p2p/base/relayserver.h"
#include "webrtc/p2p/base/stunserver.h"
#include "webrtc/p2p/base/basicpacketsocketfactory.h"
#include "webrtc/p2p/base/turnserver.h"
#include "webrtc/base/asyncudpsocket.h"
#include "webrtc/base/optionsfile.h"
#include "webrtc/base/stringencode.h"
#include "webrtc/base/thread.h"
namespace nr_webrtc
{
const char kAudioLabel[] = "audio_label";
const char kVideoLabel[] = "video_label";
const char kStreamLabel[] = "stream_label";
const char kSoftware[] = "libjingle TurnServer";
class TurnFileAuth : public cricket::TurnAuthInterface
{
public:
explicit TurnFileAuth(const std::string& path) : file_(path)
{
}
bool Load()
{
return file_.Load();
}
virtual bool GetKey(const std::string& username, const std::string& realm, std::string* key)
{
// File is stored as lines of <username>=<HA1>.
// Generate HA1 via "echo -n "<username>:<realm>:<password>" | md5sum"
std::string hex;
bool ret = file_.GetStringValue(username, &hex);
if (ret)
{
char buf[32];
size_t len = rtc::hex_decode(buf, sizeof(buf), hex);
*key = std::string(buf, len);
}
return ret;
}
private:
rtc::OptionsFile file_;
};
Conductor::Conductor()
{
this->OnErrorHook = nullptr;
this->OnSuccessHook = nullptr;
this->OnFailureHook = nullptr;
this->OnIceCandidateHook = nullptr;
this->OnDataMessageHook = nullptr;
this->OnDataBinaryMessageHook = nullptr;
this->width = 640;
this->height = 360;
this->caputureFps = 5;
this->turn_server = nullptr;
this->stun_server = nullptr;
this->data_channel = nullptr;
this->video_capture = nullptr;
this->audio_capture = nullptr;
this->worker_thread = nullptr;
this->signaling_thread = nullptr;
}
Conductor::~Conductor()
{
this->DeInitialize();
ASSERT(peer_connection == nullptr);
this->signaling_thread = nullptr;
if (this->worker_thread)
{
this->worker_thread->Quit();
delete this->worker_thread;
}
if (turn_server)
turn_server->disconnect_all();
if (stun_server)
stun_server->disconnect_all();
if (turn_server || stun_server)
rtc::Thread::Current()->Quit();
}
bool Conductor::Initialize(bool audio_stream, bool video_stream) {
ASSERT(pc_factory == nullptr);
ASSERT(peer_connection == nullptr);
if (audio_stream)
this->audio_capture = new AudioCaptureModule();
this->signaling_thread = new rtc::Thread();
bool wrap = this->signaling_thread->WrapCurrent();
ASSERT(wrap);
/* this->worker_thread = new rtc::Thread();
bool start = this->worker_thread->Start();
ASSERT(start);*/
this->pc_factory = webrtc::CreatePeerConnectionFactory(
this->signaling_thread,
this->signaling_thread,
this->audio_capture,
nullptr,
nullptr);
if (!this->pc_factory)
return false;
webrtc::PeerConnectionFactoryInterface::Options opt;
{
//opt.disable_encryption = true;
//opt.disable_network_monitor = true;
//opt.disable_sctp_data_channels = true;
this->pc_factory->SetOptions(opt);
}
if (!this->CreatePeerConnection(true))
return false;
return this->AddStreams(audio_stream, video_stream) && this->peer_connection != nullptr;
}
void Conductor::DeInitialize()
{
if (this->data_channel)
{
this->data_channel->UnregisterObserver();
this->data_channel = nullptr;
}
this->audio_capture = nullptr;
this->video_capture = nullptr;
this->local_video.reset(nullptr);
if (this->peer_connection.get())
{
for (auto it = this->active_streams.begin(); it != this->active_streams.end(); ++it) {
this->peer_connection->RemoveStream(it->second);
}
this->active_streams.clear();
this->peer_connection->Close();
this->peer_connection = nullptr;
}
this->serverConfigs.clear();
this->pc_factory = nullptr;
}
bool Conductor::CreatePeerConnection(bool dtls)
{
ASSERT(pc_factory != nullptr);
ASSERT(peer_connection == nullptr);
webrtc::PeerConnectionInterface::RTCConfiguration config;
config.tcp_candidate_policy = webrtc::PeerConnectionInterface::kTcpCandidatePolicyDisabled;
config.disable_ipv6 = true;
config.enable_dtls_srtp = rtc::Optional<bool>(dtls);
config.rtcp_mux_policy = webrtc::PeerConnectionInterface::kRtcpMuxPolicyRequire;
for each (auto server in this->serverConfigs) {
config.servers.push_back(server);
}
webrtc::FakeConstraints constraints;
constraints.SetAllowDtlsSctpDataChannels();
constraints.SetMandatoryReceiveVideo(false);
constraints.SetMandatoryReceiveAudio(false);
constraints.SetMandatoryIceRestart(true);
constraints.SetMandatoryUseRtpMux(true);
constraints.AddMandatory(webrtc::MediaConstraintsInterface::kVoiceActivityDetection, "false");
constraints.AddMandatory(webrtc::MediaConstraintsInterface::kEnableIPv6, "false");
this->peer_connection = this->pc_factory->CreatePeerConnection(config, &constraints, NULL, NULL, this);
return this->peer_connection != nullptr;
}
bool Conductor::AddStreams(bool audio_stream, bool video_stream)
{
if (this->active_streams.find(kStreamLabel) != this->active_streams.end())
return false; // Already added.
auto stream = this->pc_factory->CreateLocalMediaStream(kStreamLabel);
if (audio_stream) {
auto a = this->pc_factory->CreateAudioSource(NULL);
auto audio_track = this->pc_factory->CreateAudioTrack(kAudioLabel, a);
stream->AddTrack(audio_track);
}
if (video_stream) {
this->video_capture = new nr_webrtc::YuvFrameCapture(*this);
auto v = this->pc_factory->CreateVideoSource(this->video_capture);
auto video_track = pc_factory->CreateVideoTrack(kVideoLabel, v);
stream->AddTrack(video_track);
this->local_video.reset(new VideoRenderer(*this, false, video_track));
}
if (!this->peer_connection->AddStream(stream))
{
stream = nullptr;
return false;
}
typedef std::pair<std::string, rtc::scoped_refptr<webrtc::MediaStreamInterface>> MediaStreamPair;
this->active_streams.insert(MediaStreamPair(stream->label(), stream));
return true;
}
bool Conductor::ProcessMessages(int delay) {
return rtc::Thread::Current()->ProcessMessages(delay);
}
uint8_t * Conductor::VideoCapturerI420Buffer() {
if (this->video_capture)
return (uint8_t*)this->video_capture->video_buffer->DataY();
return nullptr;
}
void Conductor::PushVideoFrame(uint8_t * rgbBuffer, int bits) {
auto yuv = this->VideoCapturerI420Buffer();
if (yuv)
{
Conductor::RGBToYUVI420(this->width, this->height, bits, rgbBuffer, yuv);
this->video_capture->PushFrame();
}
}
void Conductor::PushAudioFrame(const void* audio_data, int bits_per_sample, int sample_rate, int number_of_channels, int number_of_frames) {
if (this->audio_capture)
this->audio_capture->PushFrame(audio_data, bits_per_sample, sample_rate, number_of_channels, number_of_frames);
}
bool Conductor::IsRecordingAudio() {
return this->audio_capture && this->audio_capture->Recording();
}
bool Conductor::IsRecordingVideo() {
return this->video_capture && this->video_capture->IsRunning();
}
void Conductor::AddServerConfig(std::string uri, std::string username, std::string password)
{
webrtc::PeerConnectionInterface::IceServer server;
server.uri = uri;
server.username = username;
server.password = password;
serverConfigs.push_back(server);
}
void Conductor::CreateOffer()
{
peer_connection->CreateOffer(this, nullptr);
}
void Conductor::OnOfferReply(std::string type, std::string sdp)
{
webrtc::SdpParseError error;
webrtc::SessionDescriptionInterface* session_description(webrtc::CreateSessionDescription(type, sdp, &error));
if (!session_description)
{
LOG(WARNING) << "Can't parse received session description message. " << "SdpParseError was: " << error.description;
return;
}
peer_connection->SetRemoteDescription(this, session_description);
}
void Conductor::OnOfferRequest(std::string sdp)
{
webrtc::SdpParseError error;
webrtc::SessionDescriptionInterface* session_description(webrtc::CreateSessionDescription("offer", sdp, &error));
if (!session_description)
{
LOG(WARNING) << "Can't parse received session description message. " << "SdpParseError was: " << error.description;
return;
}
peer_connection->SetRemoteDescription(this, session_description);
webrtc::PeerConnectionInterface::RTCOfferAnswerOptions o;
{
o.voice_activity_detection = false;
o.offer_to_receive_audio = false;
o.offer_to_receive_video = webrtc::PeerConnectionInterface::RTCOfferAnswerOptions::kOfferToReceiveMediaTrue;
}
peer_connection->CreateAnswer(this, o);
}
bool Conductor::AddIceCandidate(std::string sdp_mid, int sdp_mlineindex, std::string sdp)
{
webrtc::SdpParseError error;
webrtc::IceCandidateInterface * candidate = webrtc::CreateIceCandidate(sdp_mid, sdp_mlineindex, sdp, &error);
if (!candidate)
{
LOG(WARNING) << "Can't parse received candidate message. " << "SdpParseError was: " << error.description;
return false;
}
if (!peer_connection)
return false;
if (!peer_connection->AddIceCandidate(candidate))
{
LOG(WARNING) << "Failed to apply the received candidate";
return false;
}
return true;
}
std::vector<std::string> Conductor::GetVideoDevices()
{
std::vector<std::string> device_names;
{
std::unique_ptr<webrtc::VideoCaptureModule::DeviceInfo> info(webrtc::VideoCaptureFactory::CreateDeviceInfo());
if (info)
{
int num_devices = info->NumberOfDevices();
for (int i = 0; i < num_devices; ++i)
{
const uint32_t kSize = 256;
char name[kSize] = { 0 };
char id[kSize] = { 0 };
if (info->GetDeviceName(i, name, kSize, id, kSize) != -1)
{
device_names.push_back(name);
}
}
}
}
return device_names;
}
// Called when a remote stream is added
void Conductor::OnAddStream(rtc::scoped_refptr<webrtc::MediaStreamInterface> stream)
{
LOG(INFO) << __FUNCTION__ << " " << stream->label();
if (this->OnRenderVideoHook)
{
webrtc::VideoTrackVector vtracks = stream->GetVideoTracks();
if (!vtracks.empty())
{
webrtc::VideoTrackInterface* track = vtracks[0];
remote_video.reset(new nr_webrtc::VideoRenderer(*this, true, track));
}
}
if (this->OnRenderAudioHook)
{
webrtc::AudioTrackVector atracks = stream->GetAudioTracks();
if (!atracks.empty())
{
webrtc::AudioTrackInterface* track = atracks[0];
remote_audio.reset(new nr_webrtc::AudioRenderer(*this, true, track));
}
}
}
void Conductor::OnRemoveStream(rtc::scoped_refptr<webrtc::MediaStreamInterface> stream)
{
LOG(INFO) << __FUNCTION__ << " " << stream->label();
remote_video.reset(nullptr);
remote_audio.reset(nullptr);
// lost ownership, do not delete
/*capturer = nullptr;
capturer_internal = nullptr;*/
}
void Conductor::OnIceCandidate(const webrtc::IceCandidateInterface* candidate)
{
LOG(INFO) << __FUNCTION__ << " " << candidate->sdp_mline_index();
std::string sdp;
if (!candidate->ToString(&sdp))
{
LOG(LS_ERROR) << "Failed to serialize candidate";
return;
}
if (this->OnIceCandidateHook != nullptr)
this->OnIceCandidateHook(candidate->sdp_mid().c_str(), candidate->sdp_mline_index(), sdp.c_str());
}
void Conductor::OnSuccess(webrtc::SessionDescriptionInterface* desc)
{
peer_connection->SetLocalDescription(this, desc);
std::string sdp;
desc->ToString(&sdp);
if (this->OnSuccessHook != nullptr)
this->OnSuccessHook(desc->type().c_str(), sdp.c_str());
}
void Conductor::OnFailure(const std::string& error)
{
LOG(LERROR) << error;
if (this->OnFailureHook != nullptr)
this->OnFailureHook(error.c_str());
}
void Conductor::OnError()
{
if (this->OnErrorHook != nullptr)
this->OnErrorHook();
}
void Conductor::CreateDataChannel(const std::string & label)
{
if (!this->peer_connection)
return;
webrtc::DataChannelInit dc_options;
//dc_options.id = 1;
dc_options.maxRetransmits = 1;
dc_options.negotiated = false;
dc_options.ordered = false;
this->data_channel = this->peer_connection->CreateDataChannel(label, &dc_options);
this->data_channel->RegisterObserver(this);
}
void Conductor::OnDataChannel(rtc::scoped_refptr<webrtc::DataChannelInterface> channel)
{
LOG(INFO) << __FUNCTION__ << " " << channel->label();
this->data_channel = channel.get();
this->data_channel->RegisterObserver(this);
}
bool Conductor::DataChannelSendText(const std::string & text)
{
return this->data_channel && this->data_channel->Send(webrtc::DataBuffer(text));
}
bool Conductor::DataChannelSendData(const webrtc::DataBuffer & data)
{
return this->data_channel && this->data_channel->Send(data);
}
bool Conductor::IsDataChannelConnected() {
return this->data_channel != nullptr;
}
// A data buffer was successfully received.
void Conductor::OnMessage(const webrtc::DataBuffer& buffer)
{
LOG(INFO) << __FUNCTION__;
if (buffer.binary)
{
if (this->OnDataBinaryMessageHook != nullptr)
{
auto * data = buffer.data.data();
this->OnDataBinaryMessageHook(data, buffer.size());
}
}
else
{
if (this->OnDataMessageHook != nullptr)
{
std::string msg(buffer.data.data<char>(), buffer.size());
this->OnDataMessageHook(msg.c_str());
}
}
}
bool Conductor::RunStunServer(const std::string & bindIp)
{
rtc::SocketAddress server_addr;
if (!server_addr.FromString(bindIp))
{
LOG(LERROR) << "Unable to parse IP address: " << bindIp;
return false;
}
rtc::Thread * main = rtc::Thread::Current();
rtc::AsyncUDPSocket* server_socket = rtc::AsyncUDPSocket::Create(main->socketserver(), server_addr);
if (!server_socket)
{
LOG(LERROR) << "Failed to create a UDP socket" << std::endl;
return false;
}
stun_server.reset(new cricket::StunServer(server_socket));
LOG(INFO) << "Listening at " << server_addr.ToString() << std::endl;
return true;
}
bool Conductor::RunTurnServer(const std::string & bindIp, const std::string & ip,
const std::string & realm, const std::string & authFile)
{
rtc::SocketAddress int_addr;
if (!int_addr.FromString(bindIp))
{
LOG(LERROR) << "Unable to parse IP address: " << bindIp << std::endl;
return false;
}
rtc::IPAddress ext_addr;
if (!IPFromString(ip, &ext_addr))
{
LOG(LERROR) << "Unable to parse IP address: " << ip << std::endl;
return false;
}
rtc::Thread* main = rtc::Thread::Current();
rtc::AsyncUDPSocket * int_socket = rtc::AsyncUDPSocket::Create(main->socketserver(), int_addr);
if (!int_socket)
{
LOG(LERROR) << "Failed to create a UDP socket bound at" << int_addr.ToString() << std::endl;
return false;
}
TurnFileAuth * auth = new TurnFileAuth(authFile);
if (!auth->Load())
{
LOG(LERROR) << "Failed to load auth file " << authFile << std::endl;
return false;
}
auto t = new cricket::TurnServer(main);
turn_server.reset(t);
t->set_realm(realm);
t->set_software(kSoftware);
t->set_auth_hook(auth);
t->AddInternalSocket(int_socket, cricket::PROTO_UDP);
t->SetExternalSocketFactory(new rtc::BasicPacketSocketFactory(),
rtc::SocketAddress(ext_addr, 0));
LOG(INFO) << "Listening internally at " << int_addr.ToString() << std::endl;
return true;
}
void Conductor::RGBToYUVI420(int width, int height, int bits, uint8_t * image, uint8_t * yuv)
{
int pitch = bits / 8;
int stride = width * pitch;
int strideY = width;
int strideU = width / 2;
int strideV = width / 2;
uint8_t * pimageY = yuv;
uint8_t * pimageU = yuv + strideY * height;
uint8_t * pimageV = yuv + strideY * height + (strideU * ((height + 1) / 2));
int i = 0;
for (int y = 0; y < height; y += 2)
for (int x = 0; x < width; x += 2)
{
int xi = pitch * x;
int yi = height - y - 1;
int xyi = yi * stride + xi;
int yxi = (yi - 1) * stride + xi;
uint8_t r00 = image[xyi + 0];
uint8_t g00 = image[xyi + 1];
uint8_t b00 = image[xyi + 2];
uint8_t r01 = image[xyi + 4];
uint8_t g01 = image[xyi + 5];
uint8_t b01 = image[xyi + 6];
uint8_t r10 = image[yxi + 0];
uint8_t g10 = image[yxi + 1];
uint8_t b10 = image[yxi + 2];
uint8_t r11 = image[yxi + 4];
uint8_t g11 = image[yxi + 5];
uint8_t b11 = image[yxi + 6];
pimageY[y * width + x] = (((66 * r00 + 129 * g00 + 25 * b00 + 128) >> 8) + 16);
pimageY[y * width + x + 1] = (((66 * r01 + 129 * g01 + 25 * b01 + 128) >> 8) + 16);
pimageY[(y + 1) * width + x] = (((66 * r10 + 129 * g10 + 25 * b10 + 128) >> 8) + 16);
pimageY[(y + 1) * width + x + 1] = (((66 * r11 + 129 * g11 + 25 * b11 + 128) >> 8) + 16);
uint8_t u00 = (((112 * r00 - 94 * g00 - 18 * b00 + 128) >> 8) + 128);
uint8_t u01 = (((112 * r01 - 94 * g01 - 18 * b01 + 128) >> 8) + 128);
uint8_t u10 = (((112 * r10 - 94 * g10 - 18 * b10 + 128) >> 8) + 128);
uint8_t u11 = (((112 * r11 - 94 * g11 - 18 * b11 + 128) >> 8) + 128);
uint8_t v00 = (((-38 * r00 - 74 * g00 + 112 * b00 + 128) >> 8) + 128);
uint8_t v01 = (((-38 * r01 - 74 * g01 + 112 * b01 + 128) >> 8) + 128);
uint8_t v10 = (((-38 * r10 - 74 * g10 + 112 * b10 + 128) >> 8) + 128);
uint8_t v11 = (((-38 * r11 - 74 * g11 + 112 * b11 + 128) >> 8) + 128);
pimageU[i] = ((u00 + u01 + u10 + u11) / 4);
pimageV[i++] = ((v00 + v01 + v10 + v11) / 4);
}
}
void Conductor::YUVI420ToRGB(int width, int height, int bits, uint8_t * image, uint8_t * yuv)
{
int i = 0;
int pitch = bits / 8;
int pixels = width * height;
int stride = width * pitch;
int strideY = width;
int strideU = width / 2;
int strideV = width / 2;
uint8_t * imageY = yuv;
uint8_t * imageU = yuv + strideY * height;
uint8_t * imageV = yuv + strideY * height + (strideU * ((height + 1) / 2));
for (int yCord = 0; yCord < height; yCord++)
{
for (int xCord = 0; xCord < width; xCord += 2)
{
int c1 = imageY[yCord * strideY + xCord] - 16;
int c2 = imageY[yCord * strideY + xCord + 1] - 16;
int d = imageU[yCord / 2 * strideU + xCord / 2] - 128;
int e = imageV[yCord / 2 * strideV + xCord / 2] - 128;
image[i++] = std::min(255, std::max(0, (298 * c1 + 409 * e + 128) >> 8));//r
image[i++] = std::min(255, std::max(0, (298 * c1 - 100 * d - 208 * e + 128) >> 8));//g
image[i++] = std::min(255, std::max(0, (298 * c1 + 516 * d + 128) >> 8));//b
image[i++] = std::min(255, std::max(0, (298 * c2 + 409 * e + 128) >> 8));//r
image[i++] = std::min(255, std::max(0, (298 * c2 - 100 * d - 208 * e + 128) >> 8));//g
image[i++] = std::min(255, std::max(0, (298 * c2 + 516 * d + 128) >> 8));//b
}
}
}
}
@harvestsure
Copy link

Can you provide all the source code? tks sooooooooo much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment