Skip to content

Instantly share code, notes, and snippets.

@keivanmoazami
Last active May 18, 2024 11:42
Show Gist options
  • Save keivanmoazami/52b0ba73bd01d857a0d43bc96f4f15aa to your computer and use it in GitHub Desktop.
Save keivanmoazami/52b0ba73bd01d857a0d43bc96f4f15aa to your computer and use it in GitHub Desktop.
restream h264 rtsp stream without re-encoding in gstreamer
#include <iostream>
#include <mutex>
#include <string>
#include <thread>
#include <gst/gst.h>
#include <gst/rtsp-server/rtsp-server.h>
std::string cameraUrl = "rtsp://root:pass@192.168.46.122:554/ufirststream";
static void save_dot_file(GstElement* pipeline, std::string file_name, int wait_milliSec)
{
auto lambda = [](GstElement* pipeline, std::string file_name, int wait_milliSec)
{
std::this_thread::sleep_for(std::chrono::milliseconds(wait_milliSec));
GST_DEBUG_BIN_TO_DOT_FILE(GST_BIN(pipeline), GST_DEBUG_GRAPH_SHOW_ALL, file_name.c_str());
g_print("DOT File Saved!\n");
};
std::thread th(lambda, pipeline, file_name, wait_milliSec);
th.detach();
}
std::mutex rawBufferMutex;
struct RawFrame
{
std::shared_ptr<unsigned char> data;
unsigned long long size;
};
struct ReceiverContext
{
GstElement* pipeline;
GstElement* source;
bool is_setup;
GstCaps* caps;
int fps;
unsigned long long frame_counter;
};
ReceiverContext receiver_ctx;
std::mutex serverMutex;
struct ServerContext
{
GstElement* appSrc;
bool is_setup;
};
ServerContext server_ctx;
//***********************************************************
//********************* Receiver ****************************
//***********************************************************
static void sendVideoBuffer(RawFrame& raw_frame);
static GstFlowReturn newRawFrameReceived(GstElement* sink, gpointer data)
{
GstSample* sample;
g_signal_emit_by_name(sink, "pull-sample", &sample);
if (sample)
{
GstMapInfo map;
GstBuffer* buffer = gst_sample_get_buffer(sample);
if (receiver_ctx.caps == nullptr)
{
GstCaps* caps = gst_sample_get_caps(sample);
receiver_ctx.caps = std::move(caps);
}
if (gst_buffer_map(buffer, &map, GST_MAP_READ))
{
std::lock_guard<std::mutex> lock(rawBufferMutex);
auto size = map.size;
std::shared_ptr<unsigned char> data_ptr(new unsigned char[size], std::default_delete<unsigned char[]>());
memcpy(data_ptr.get(), map.data, size);
RawFrame raw_frame;
raw_frame.data = std::move(data_ptr);
raw_frame.size = size;
sendVideoBuffer(raw_frame);
gst_buffer_unmap(buffer, &map);
}
gst_sample_unref(sample);
return GST_FLOW_OK;
}
return GST_FLOW_ERROR;
}
static void source_padAdded_handler(GstElement* src, GstPad* new_pad, gpointer data)
{
auto new_pad_caps = gst_pad_get_current_caps(new_pad);
auto new_pad_struct = gst_caps_get_structure(new_pad_caps, 0);
auto new_pad_type = gst_structure_get_name(new_pad_struct);
bool isVideo = false;
const gchar* media_subtype = gst_structure_get_string(new_pad_struct, "media");
if (g_str_has_prefix(new_pad_type, "application/x-rtp"))
{
if (g_strcmp0(media_subtype, "video") == 0)
isVideo = true;
}
if (isVideo && !gst_pad_is_linked(new_pad))
{
auto depayElement = gst_element_factory_make("rtph264depay", "depay");
// auto parseElement = gst_element_factory_make("h264parse", "parse");
auto appSinkElement = gst_element_factory_make("appsink", "sink");
if (!depayElement || !appSinkElement)
{
g_printerr("Not all elements could be created 2.\n");
return;
}
gst_bin_add_many(GST_BIN(receiver_ctx.pipeline), depayElement, appSinkElement, NULL);
g_object_set(appSinkElement, "emit-signals", TRUE, NULL);
g_object_set(appSinkElement, "drop", FALSE, NULL);
g_object_set(appSinkElement, "sync", TRUE, NULL);
g_signal_connect(appSinkElement, "new-sample", G_CALLBACK(newRawFrameReceived), NULL);
if (!gst_element_link_many(receiver_ctx.source, depayElement, appSinkElement, NULL))
{
g_printerr("Elements could not be linked.\n");
return;
}
gst_element_set_state(depayElement, GST_STATE_PLAYING);
gst_element_set_state(appSinkElement, GST_STATE_PLAYING);
gint fps_n, fps_d;
gst_structure_get_fraction(new_pad_struct, "framerate", &fps_n, &fps_d);
double fps = (double)fps_n / fps_d;
receiver_ctx.fps = static_cast<int>(fps);
save_dot_file(receiver_ctx.pipeline, "receiver", 2000);
}
}
void startGrabPipeline()
{
auto pipeline = gst_pipeline_new("main-pipeline");
auto sourceElement = gst_element_factory_make("rtspsrc", "source");
if (!pipeline || !sourceElement)
{
g_printerr("Not all elements could be created 1.\n");
return;
}
gst_bin_add_many(GST_BIN(pipeline), sourceElement, NULL);
g_object_set(sourceElement, "location", cameraUrl.c_str(), NULL);
g_signal_connect(sourceElement, "pad-added", G_CALLBACK(source_padAdded_handler), NULL);
auto ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE)
{
g_printerr("Unable to set the pipeline to the playing state.\n");
return;
}
auto bus = gst_element_get_bus(pipeline);
auto mGstLoop = g_main_loop_new(NULL, FALSE);
gst_object_unref(bus);
receiver_ctx.pipeline = std::move(pipeline);
receiver_ctx.source = std::move(sourceElement);
receiver_ctx.is_setup = true;
g_main_loop_run(mGstLoop);
}
//***********************************************************
//********************** Rtsp Server ************************
//***********************************************************
static void sendVideoBuffer(RawFrame& raw_frame)
{
if(!receiver_ctx.is_setup || !server_ctx.is_setup)
return;
std::lock_guard<std::mutex> lock(serverMutex);
auto buffer = gst_buffer_new_allocate(NULL, raw_frame.size, NULL);
gst_buffer_fill(buffer, 0, raw_frame.data.get(), raw_frame.size);
GstClock* clock = gst_element_get_clock(server_ctx.appSrc);
GstClockTime current_time = gst_clock_get_time(clock) - gst_element_get_base_time(server_ctx.appSrc);
g_object_unref(clock);
/* Timestamp the buffer */
GST_BUFFER_PTS(buffer) = current_time;
GST_BUFFER_DTS(buffer) = current_time;
GST_BUFFER_DURATION(buffer) = gst_util_uint64_scale_int(1, GST_SECOND, receiver_ctx.fps);
GstFlowReturn ret;
g_signal_emit_by_name(server_ctx.appSrc, "push-buffer", buffer, &ret);
if (ret != GST_FLOW_OK) {
g_warning("Failed to push buffer to appsrc");
}
receiver_ctx.frame_counter++;
gst_buffer_unref(buffer);
}
static void media_configure(GstRTSPMediaFactory* factory, GstRTSPMedia* media, gpointer user_data)
{
GstElement* serverPipeline, * video_src;
serverPipeline = gst_rtsp_media_get_element(media);
video_src = gst_bin_get_by_name_recurse_up(GST_BIN(serverPipeline), "video_src");
g_object_set(G_OBJECT(video_src), "caps", receiver_ctx.caps);
server_ctx.appSrc = std::move(video_src);
server_ctx.is_setup = true;
save_dot_file(serverPipeline, "server", 3000);
// gst_object_unref(serverPipeline);
}
void startRtspServerPipeline()
{
std::cout << "Starting Server ... \n";
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
while (!receiver_ctx.is_setup)
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
GMainLoop* loop;
GstRTSPServer* server;
GstRTSPMountPoints* mounts;
GstRTSPMediaFactory* factory;
loop = g_main_loop_new(NULL, FALSE);
server = gst_rtsp_server_new();
mounts = gst_rtsp_server_get_mount_points(server);
factory = gst_rtsp_media_factory_new();
gst_rtsp_media_factory_set_shared(factory, TRUE);
gst_rtsp_media_factory_set_launch(factory, "( "
"appsrc name=video_src ! parsebin ! rtph264pay name=pay0 pt=96 "
")");
g_signal_connect(factory, "media-configure", (GCallback)media_configure, loop);
gst_rtsp_mount_points_add_factory(mounts, "/test", factory);
g_object_unref(mounts);
gst_rtsp_server_attach(server, NULL);
g_print("stream ready at rtsp://127.0.0.1:8554/test\n");
g_main_loop_run(loop);
}
int main(int argc, char* argv[])
{
gst_init(&argc, &argv);
std::thread receiverThread(startGrabPipeline);
receiverThread.detach();
std::thread serverThread(startRtspServerPipeline);
serverThread.detach();
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment