Skip to content

Instantly share code, notes, and snippets.

@nocarryr
Created June 27, 2024 23:09
Show Gist options
  • Save nocarryr/c2c6ef1abaec9cafd1d416db27c9c2b3 to your computer and use it in GitHub Desktop.
Save nocarryr/c2c6ef1abaec9cafd1d416db27c9c2b3 to your computer and use it in GitHub Desktop.
cyndilib sender example
from __future__ import annotations
from typing import NamedTuple, cast
from typing_extensions import Self
import enum
import io
import subprocess
import shlex
from fractions import Fraction
from contextlib import contextmanager
import click
from cyndilib.wrapper.ndi_structs import FourCC
from cyndilib.video_frame import VideoSendFrame
from cyndilib.sender import Sender
FF_CMD = '{ffmpeg} -f lavfi -i testsrc2=size={xres}x{yres}:rate={fps} -pix_fmt {pix_fmt.name} -f rawvideo pipe: '
"""ffmpeg command line format to generate frames and send raw data through a pipe"""
class PixFmt(enum.Enum):
uyvy422 = FourCC.UYVY
nv12 = FourCC.NV12
rgba = FourCC.RGBA
bgra = FourCC.BGRA
@classmethod
def from_str(cls, name: str) -> Self:
return cls.__members__[name]
class Options(NamedTuple):
pix_fmt: PixFmt
xres: int
yres: int
fps: str
sender_name: str = 'ffmpeg_sender'
ffmpeg: str = 'ffmpeg'
def parse_frame_rate(fr: str) -> Fraction:
"""Helper for NTSC frame rates (29.97, 59.94)
"""
if '/' in fr:
n, d = [int(s) for s in fr.split('/')]
elif '.' in fr:
n = round(float(fr)) * 1000
d = 1001
else:
n = int(fr)
d = 1
return Fraction(n, d)
def build_video_frame(opts: Options) -> VideoSendFrame:
"""Build a :class:`VideoSendFrame <cyndilib.video_frame.VideoSendFrame>`
with the given parameters for frame rate, resolution and pixel format
"""
vf = VideoSendFrame()
vf.set_resolution(opts.xres, opts.yres)
fr = parse_frame_rate(opts.fps)
vf.set_frame_rate(fr)
vf.set_fourcc(opts.pix_fmt.value)
return vf
@contextmanager
def ffmpeg_proc(opts: Options):
"""Context manager for the ffmpeg subprocess generating frames
"""
ff_cmd = FF_CMD.format(**opts._asdict())
ff_proc = subprocess.Popen(shlex.split(ff_cmd), stdout=subprocess.PIPE)
try:
ff_proc.poll()
if ff_proc.returncode is None:
yield ff_proc
finally:
ff_proc.kill()
def send(opts: Options):
sender = Sender(opts.sender_name)
vf = build_video_frame(opts)
sender.set_video_frame(vf)
frame_size_bytes = vf.get_data_size()
click.echo(f'{frame_size_bytes=}')
# Pre-allocate a bytearray to hold frame data and create a view of it
# So we can buffer into it from ffmpeg then pass directly to the sender
ba = bytearray(frame_size_bytes)
mv = memoryview(ba)
i = 0
with sender:
with ffmpeg_proc(opts) as ff_proc:
stdout = cast(io.BytesIO, ff_proc.stdout)
while True:
if ff_proc.returncode is not None:
break
# Read from the ffmpeg process into a view of the bytearray
num_read = stdout.readinto(mv)
# The first few reads might be empty, ignore
if num_read == 0:
continue
# Pass the memoryview directly to the sender
# (using the buffer protocol)
sender.write_video_async(mv)
i += 1
if i % 10 == 0:
ff_proc.poll()
@click.command()
@click.option(
'--pix-fmt',
type=click.Choice(choices=[m.name for m in PixFmt]),
default=PixFmt.uyvy422.name,
show_default=True,
show_choices=True,
)
@click.option('-x', '--x-res', type=int, default=1920, show_default=True)
@click.option('-y', '--y-res', type=int, default=1080, show_default=True)
@click.option('--fps', type=str, default='30', show_default=True)
@click.option(
'-n', '--sender-name',
type=str,
default='ffmpeg_sender',
show_default=True,
help='NDI name for the sender',
)
@click.option(
'--ffmpeg',
type=str,
default='ffmpeg',
show_default=True,
help='Name/Path of the "ffmpeg" executable',
)
def main(pix_fmt: str, x_res: int, y_res: int, fps: str, sender_name: str, ffmpeg: str):
opts = Options(
pix_fmt=PixFmt.from_str(pix_fmt),
xres=x_res,
yres=y_res,
fps=fps,
sender_name=sender_name,
ffmpeg=ffmpeg,
)
send(opts)
if __name__ == '__main__':
main()
from __future__ import annotations
from typing import NamedTuple
from typing_extensions import Self
import enum
import time
import subprocess
import shlex
import socket
from fractions import Fraction
from contextlib import contextmanager
import click
from cyndilib.wrapper.ndi_structs import FourCC
from cyndilib.wrapper.ndi_recv import RecvColorFormat, RecvBandwidth
from cyndilib.video_frame import VideoFrameSync
from cyndilib.receiver import Receiver
from cyndilib.finder import Finder, Source
FF_PLAY = '{ffplay} -video_size {xres}x{yres} -pixel_format {pix_fmt} -f rawvideo -i pipe:'
"""ffplay command line format"""
pix_fmts = {
FourCC.UYVY: 'uyvy422',
FourCC.NV12: 'nv12',
FourCC.RGBA: 'rgba',
FourCC.BGRA: 'bgra',
FourCC.RGBX: 'rgba',
FourCC.BGRX: 'bgra',
}
"""Mapping of :class:`FourCC <cyndilib.wrapper.ndi_structs.FourCC>` types to
ffmpeg's ``pix_fmt`` definitions
"""
class RecvFmt(enum.Enum):
uyvy = RecvColorFormat.UYVY_RGBA
rgb = RecvColorFormat.RGBX_RGBA
bgr = RecvColorFormat.BGRX_BGRA
@classmethod
def from_str(cls, name: str) -> Self:
return cls.__members__[name]
class Bandwidth(enum.Enum):
lowest = RecvBandwidth.lowest
highest = RecvBandwidth.highest
@classmethod
def from_str(cls, name: str) -> Self:
return cls.__members__[name]
class Options(NamedTuple):
sender_name: str = 'ffmpeg_sender'
recv_fmt: RecvFmt = RecvFmt.uyvy
recv_bandwidth: Bandwidth = Bandwidth.highest
ffplay: str = 'ffplay'
def build_receiver(options: Options, source: Source):
"""Create the receiver and video frame, then connect it to the given
ndi source
"""
receiver = Receiver(
color_format=options.recv_fmt.value,
bandwidth=options.recv_bandwidth.value,
)
vf = VideoFrameSync()
receiver.frame_sync.set_video_frame(vf)
receiver.set_source(source)
click.echo(f'connecting to "{source.name}"...')
i = 0
while not receiver.is_connected():
if i > 30:
raise Exception('timeout waiting for connection')
time.sleep(.5)
i += 1
click.echo('connected')
return receiver
@contextmanager
def get_source(name: str):
"""Use the Finder to search for an NDI source by name
Note that this is a context manager (use in a :keyword:`with` block).
This must remain open since :class:`Finder <cyndilib.finder.Finder>`
and its :class:`Source <cyndilib.finder.Source>` objects hold references to
C pointers (``NDIlib_source_t*``) owned by the ``NDIlib_find_instance_t``
struct.
"""
finder = Finder()
finder.open()
try:
click.echo('waiting for ndi sources...')
finder.wait_for_sources(10)
sources: list[str] = finder.get_source_names()
click.echo(f'{sources=}')
if name not in sources:
# If the source exists on the local machine, try its full NDI name:
# '<HOSTNAME> (<name>)'
hostname = socket.gethostname().upper()
_name = f'{hostname} ({name})'
click.echo(f'"{name}" not found. searching for "{_name}"')
if _name in sources:
click.echo(f'found source name: "{_name}"')
name = _name
else:
raise Exception('source not found')
source: Source = finder.get_source(name)
yield source
finally:
finder.close()
def wait_for_first_frame(receiver: Receiver):
"""The first few frames contain no data. Capture frames until the first
non-empty one
"""
vf = receiver.frame_sync.video_frame
frame_rate: Fraction = vf.get_frame_rate()
wait_time = float(1 / frame_rate)
click.echo('waiting for frame...')
while receiver.is_connected():
receiver.frame_sync.capture_video()
resolution = vf.get_resolution()
if min(resolution) > 0 and vf.get_data_size() > 0:
click.echo('have frame')
return
time.sleep(wait_time)
def play(options: Options):
# Get the NDI source and keep the Finder open until exit
with get_source(options.sender_name) as source:
receiver = build_receiver(options, source)
vf = receiver.frame_sync.video_frame
proc: subprocess.Popen|None = None
try:
wait_for_first_frame(receiver)
# At this point we should have received a frame, so the pixel format,
# resolution and frame rate should be populated.
fourcc = vf.get_fourcc()
frame_rate: Fraction = vf.get_frame_rate()
wait_time = float(1 / frame_rate)
xres, yres = vf.get_resolution()
cmd_str = FF_PLAY.format(
xres=xres,
yres=yres,
pix_fmt=pix_fmts[fourcc],
ffplay=options.ffplay,
)
click.echo(f'{cmd_str=}')
proc = subprocess.Popen(shlex.split(cmd_str), stdin=subprocess.PIPE)
assert proc.stdin is not None
# Since we already have a frame with data, write it to ffplay
# Note that the frame object itself is directly used as the data source
# (since `VideoFrameSync` supports the buffer protocol)
proc.stdin.write(vf)
while receiver.is_connected():
# Not the best timing method, but we're using `FrameSync` to
# capture frames, so it'll correct things for us.
time.sleep(wait_time)
receiver.frame_sync.capture_video()
proc.poll()
if proc.returncode is not None:
break
proc.stdin.write(vf)
finally:
if proc is not None:
proc.kill()
@click.command()
@click.option(
'-s', '--sender-name',
type=str,
default='ffmpeg_sender',
show_default=True,
help='The NDI source name to connect to',
)
@click.option(
'-f', '--recv-fmt',
type=click.Choice(choices=[m.name for m in RecvFmt]),
default='uyvy',
show_default=True,
show_choices=True,
help='Pixel format'
)
@click.option(
'-b', '--recv-bandwidth',
type=click.Choice(choices=[m.name for m in Bandwidth]),
default='highest',
show_default=True,
show_choices=True,
)
@click.option(
'--ffplay',
type=str,
default='ffplay',
show_default=True,
help='Name/Path of the "ffplay" executable',
)
def main(sender_name: str, recv_fmt: str, recv_bandwidth: str, ffplay: str):
options = Options(
sender_name=sender_name,
recv_fmt=RecvFmt.from_str(recv_fmt),
recv_bandwidth=Bandwidth.from_str(recv_bandwidth),
ffplay=ffplay,
)
play(options)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment