Skip to content

Instantly share code, notes, and snippets.

@mattronix
Created April 20, 2020 15:15
Show Gist options
  • Save mattronix/9be56dc88d0a0f69c582149b812f8c8a to your computer and use it in GitHub Desktop.
Save mattronix/9be56dc88d0a0f69c582149b812f8c8a to your computer and use it in GitHub Desktop.
import traceback
import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject # noqa:F401,F402
# Initializes Gstreamer, it's variables, paths
Gst.init(sys.argv)
def on_message(bus: Gst.Bus, message: Gst.Message, loop: GObject.MainLoop):
mtype = message.type
"""
Gstreamer Message Types and how to parse
https://lazka.github.io/pgi-docs/Gst-1.0/flags.html#Gst.MessageType
"""
if mtype == Gst.MessageType.EOS:
print("End of stream")
loop.quit()
elif mtype == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(err, debug)
loop.quit()
elif mtype == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
print(err, debug)
return True
# Gst.Pipeline https://lazka.github.io/pgi-docs/Gst-1.0/classes/Pipeline.html
pipeline = Gst.Pipeline()
# Creates element by name
# https://lazka.github.io/pgi-docs/Gst-1.0/classes/ElementFactory.html#Gst.ElementFactory.make
src_name = "my_video_test_src"
src = Gst.ElementFactory.make("videotestsrc", "my_video_test_src")
src.set_property("do-timestamp", "true")
src.set_property("pattern", "ball")
sink = Gst.ElementFactory.make("gtksink")
encode = Gst.ElementFactory.make("x264enc")
encode.set_property("key-int-max", 12)
encode.set_property("byte-stream", "true")
mux = Gst.ElementFactory.make("mpegtsmux")
serve = Gst.ElementFactory.make("tcpserversink")
serve.set_property("port", 8888)
serve.set_property("host", "localhost")
pipeline.add(src)
#pipeline.add(sink)
pipeline.add(encode)
pipeline.add(mux)
pipeline.add(serve)
src.link(encode)
encode.link(mux)
mux.link(serve)
# https://lazka.github.io/pgi-docs/Gst-1.0/classes/Bus.html
bus = pipeline.get_bus()
# allow bus to emit messages to main thread
bus.add_signal_watch()
# Start pipeline
pipeline.set_state(Gst.State.PLAYING)
# Init GObject loop to handle Gstreamer Bus Events
loop = GObject.MainLoop()
# Add handler to specific signal
# https://lazka.github.io/pgi-docs/GObject-2.0/classes/Object.html#GObject.Object.connect
bus.connect("message", on_message, loop)
try:
loop.run()
except Exception:
traceback.print_exc()
loop.quit()
# Stop Pipeline
pipeline.set_state(Gst.State.NULL)
del pipeline
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment