Skip to content

Instantly share code, notes, and snippets.

@matthock
Created May 13, 2024 11:39
Show Gist options
  • Save matthock/eda09f08cfe2a9ffaea20da46c745e70 to your computer and use it in GitHub Desktop.
Save matthock/eda09f08cfe2a9ffaea20da46c745e70 to your computer and use it in GitHub Desktop.
gstreamer ipcpipeline segmentation fault
#include <unistd.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <gst/gst.h>
#include <iostream>
static GMainLoop *loop = NULL;
GstElement *make_element(const char *type, const char *name);
static gboolean
master_bus_msg (GstBus * bus, GstMessage * msg, gpointer data)
{
GstPipeline *pipeline = (GstPipeline *)data;
std::cout << getpid() << " Bus message " << std::endl << std::flush;
std::cout << getpid() << " Bus message: " << GST_MESSAGE_TYPE(msg) << std::endl << std::flush;
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_ERROR:{
GError *err;
gchar *dbg;
gst_message_parse_error (msg, &err, &dbg);
if (!strstr(err->message, "Output window was closed"))
{
g_printerr ("ERROR: %s\n", err->message);
if (dbg != NULL)
g_printerr ("ERROR debug information: %s\n", dbg);
g_error_free (err);
g_free (dbg);
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
GST_DEBUG_GRAPH_SHOW_ALL, "ipc.error");
}
g_main_loop_quit (loop);
break;
}
case GST_MESSAGE_WARNING:{
GError *err;
gchar *dbg;
gst_message_parse_warning (msg, &err, &dbg);
g_printerr ("WARNING: %s\n", err->message);
if (dbg != NULL)
g_printerr ("WARNING debug information: %s\n", dbg);
g_error_free (err);
g_free (dbg);
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
GST_DEBUG_GRAPH_SHOW_ALL, "ipc.warning");
break;
}
case GST_MESSAGE_ASYNC_DONE:
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
GST_DEBUG_GRAPH_SHOW_ALL, "ipc.async-done");
break;
case GST_MESSAGE_EOS:
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
g_main_loop_quit (loop);
break;
default:
break;
}
std::cout << getpid() << " Bus message handled " << GST_MESSAGE_TYPE(msg) << std::endl << std::flush;
return TRUE;
}
static gboolean
slave_bus_msg (GstBus * bus, GstMessage * msg, gpointer data)
{
GstPipeline *pipeline = (GstPipeline *)data;
std::cout << getpid() << " Slave Bus message " << std::endl << std::flush;
std::cout << getpid() << " Slave Bus message: " << GST_MESSAGE_TYPE(msg) << std::endl << std::flush;
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_ERROR:{
GError *err;
gchar *dbg;
gst_message_parse_error (msg, &err, &dbg);
g_printerr ("SLAVE: ERROR: %s\n", err->message);
if (dbg != NULL)
g_printerr ("SLAVE: ERROR debug information: %s\n", dbg);
g_error_free (err);
g_free (dbg);
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.error");
break;
}
case GST_MESSAGE_WARNING:{
GError *err;
gchar *dbg;
gst_message_parse_warning (msg, &err, &dbg);
g_printerr ("SLAVE: WARNING: %s\n", err->message);
if (dbg != NULL)
g_printerr ("SLAVE: WARNING debug information: %s\n", dbg);
g_error_free (err);
g_free (dbg);
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.warning");
break;
}
case GST_MESSAGE_ASYNC_START:
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
GST_DEBUG_GRAPH_SHOW_VERBOSE, "ipc.slave.async-start");
break;
case GST_MESSAGE_ASYNC_DONE:
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (pipeline),
GST_DEBUG_GRAPH_SHOW_ALL, "ipc.slave.async-done");
break;
default:
break;
}
return TRUE;
}
static void start_source (int fdin1, int fdout1, int fdin2, int fdout2)
{
GstElement *pipeline, *slave_pipeline, *source, *filter, *ipcpipelinesink1, *ipcpipelinesrc1, *xvimage, *convert, *queue2_1, *queue2_2;
//GstBus *bus;
GstMessage *msg;
GstCaps *caps;
GstStateChangeReturn ret;
/* Initialize GStreamer */
//gst_init (&argc, &argv);
/* Create the elements */
queue2_1 = make_element("queue2", "master_queue2_1");
queue2_2 = make_element("queue2", "master_queue2_2");
source = gst_element_factory_make ("videotestsrc", "source");
filter = gst_element_factory_make ("capsfilter", "convert_filter");
convert = gst_element_factory_make ("videoconvert", "convert");
caps = gst_caps_new_simple("video/x-raw",
"format", G_TYPE_STRING, "GRAY8",
"width", G_TYPE_INT, 640,
"height", G_TYPE_INT, 514,
"framerate", GST_TYPE_FRACTION, 30, 1,
NULL);
g_object_set(filter, "caps", caps, NULL);
ipcpipelinesink1 = gst_element_factory_make ("ipcpipelinesink", "ipc_sink_1");
g_object_set(ipcpipelinesink1, "fdin", fdin1, "fdout", fdout1, NULL);
ipcpipelinesrc1 = gst_element_factory_make ("ipcpipelinesrc", "ipc_src_1");
g_object_set(ipcpipelinesrc1, "fdin", fdin2, "fdout", fdout2, NULL);
xvimage = gst_element_factory_make ("xvimagesink", "xvimagesink");
/* Create the empty pipeline */
pipeline = gst_pipeline_new ("test-pipeline");
gst_bus_add_watch (gst_pipeline_get_bus(GST_PIPELINE(pipeline)), master_bus_msg, pipeline);
slave_pipeline = make_element("ipcslavepipeline", "master_ipcslavepipeline");
gst_bus_add_watch (gst_pipeline_get_bus(GST_PIPELINE(slave_pipeline)), slave_bus_msg, slave_pipeline);
if (!pipeline || !slave_pipeline || !source || !filter || !convert || !ipcpipelinesink1 || !ipcpipelinesrc1 || !xvimage || !queue2_1 || !queue2_2) {
g_printerr ("Not all elements could be created.\n");
return;
}
/* Build the pipeline */
gst_bin_add_many (GST_BIN (pipeline), source, filter, convert, ipcpipelinesink1, queue2_1, NULL);
gst_bin_add_many (GST_BIN (slave_pipeline), ipcpipelinesrc1, xvimage, queue2_2, NULL);
if (gst_element_link_many(source, filter, convert, queue2_1, ipcpipelinesink1, NULL) != TRUE) {
g_printerr ("Elements 1 could not be linked.\n");
gst_object_unref (pipeline);
return;
}
if (gst_element_link_many(ipcpipelinesrc1, queue2_2, xvimage, NULL) != TRUE) {
g_printerr ("Elements 2 could not be linked.\n");
gst_object_unref (pipeline);
return;
}
/* Modify the source's properties */
g_object_set (source, "pattern", 0, NULL);
std::cout << "Created master pipeline, starting playback..." << std::endl;
/* Start playing */
ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE) {
g_printerr ("Unable to set the pipeline to the playing state.\n");
gst_object_unref (pipeline);
return;
}
gst_caps_unref(caps);
std::cout << "Started Master" << std::endl;
}
GstElement *make_element(const char *type, const char *name)
{
GstElement *result = gst_element_factory_make(type, name);
if (!result) {
std::cout << "Unable to make element " << (name ? name : "null") << std::endl;
}
return result;
}
static void start_sink(int fdin, int fdout, int fdin2, int fdout2)
{
GstElement *pipeline, *ipcpipelinesrc, *ipcpipelinesink, *queue2;
pipeline = make_element("ipcslavepipeline", "ipcslavepipeline");
gst_bus_add_watch (GST_ELEMENT_BUS (pipeline), slave_bus_msg, pipeline);
ipcpipelinesrc = make_element("ipcpipelinesrc", "slave_ipc_src");
g_object_set (ipcpipelinesrc, "fdin", fdin, "fdout", fdout, NULL);
ipcpipelinesink = make_element("ipcpipelinesink", "slave_ipc_sink");
g_object_set (ipcpipelinesink, "fdin", fdin2, "fdout", fdout2, NULL);
queue2 = make_element("queue2", "slave_queue2");
if (!pipeline || !ipcpipelinesrc || !ipcpipelinesink || !queue2) {
g_printerr("Not all slave elements could be created");
return;
}
gst_bin_add_many(GST_BIN(pipeline), ipcpipelinesrc, ipcpipelinesink, queue2, NULL);
if (!gst_element_link_many(ipcpipelinesrc, queue2, ipcpipelinesink, NULL)) {
g_printerr("Slave elements could not be linked.\n");
g_object_unref(pipeline);
return;
}
std::cout << "Started slave" << std::endl;
}
static void
run (pid_t pid)
{
loop = g_main_loop_new (NULL, FALSE);
g_main_loop_run (loop);
if (pid > 0)
kill (pid, SIGTERM);
}
int
main (int argc, char **argv)
{
int sockets[2][2];
pid_t pid;
for (int i = 0; i < 2; i++) {
if (socketpair (AF_UNIX, SOCK_STREAM, 0, sockets[i])) {
fprintf (stderr, "Error creating sockets: %s\n", strerror (errno));
return 1;
}
if (fcntl (sockets[i][0], F_SETFL, O_NONBLOCK) < 0 ||
fcntl (sockets[i][1], F_SETFL, O_NONBLOCK) < 0) {
fprintf (stderr, "Error setting O_NONBLOCK on sockets: %s\n",
strerror (errno));
return 1;
}
}
pid = fork ();
if (pid < 0) {
fprintf (stderr, "Error forking: %s\n", strerror (errno));
return 1;
} else if (pid > 0) {
std::cout << "Parent PID: " << getpid() << std::endl;
setenv ("GST_DEBUG_FILE", "gstsrc.log", 1);
setenv ("GST_DEBUG", "7", 1);
gst_init (&argc, &argv);
start_source (sockets[0][0], sockets[0][0], sockets[1][0], sockets[1][0]);
} else {
std::cout << "Child PID: " << getpid() << std::endl;
setenv ("GST_DEBUG_FILE", "gstsink.log", 1);
setenv ("GST_DEBUG", "7", 1);
gst_init (&argc, &argv);
start_sink (sockets[0][1], sockets[0][1], sockets[1][1], sockets[1][1]);
}
run (pid);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment