Skip to content

Instantly share code, notes, and snippets.

@nosoop
Last active September 27, 2024 10:05
Show Gist options
  • Save nosoop/9252b22bc0bfc6b0adcf54709e0d6d1a to your computer and use it in GitHub Desktop.
Save nosoop/9252b22bc0bfc6b0adcf54709e0d6d1a to your computer and use it in GitHub Desktop.
#!/usr/bin/python3
"""
This is a test script that programatically streams a test source video to YouTube in varying
resolutions and framerates to try and force YouTube to issue new stream manifests.
Run this script and pass your YouTube stream key as the only parameter:
python test_stream.py xxxx-xxxx-xxxx-xxxx-xxxx
The stream should be configured as low-latency. Ultra low-latency may be fine as well.
You may need to run it multiple times.
"""
import argparse
import asyncio
# https://stackoverflow.com/a/44807231
# https://stackoverflow.com/a/64033741
def testsrc_params(
size: tuple[int, int], duration: int, framerate: int, bitrate: int, interval: int = 1
) -> tuple[str, ...]:
width, height = size
return (
"-f",
"lavfi",
"-i",
f"testsrc=duration={duration}:size={width}x{height}:rate={framerate}:decimals=2",
"-f",
"lavfi",
"-i",
"anullsrc",
"-shortest",
"-c:v",
"libx264",
"-b:v",
f"{bitrate}k",
"-profile:v",
"baseline",
"-g",
f"{duration * interval}",
"-c:a",
"aac",
"-vf",
"format=yuv420p,realtime",
"-f",
"flv",
)
async def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument("streamkey")
args = parser.parse_args()
target = f"rtmp://a.rtmp.youtube.com/live2/{args.streamkey}"
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
*testsrc_params(size=(1920, 1080), duration=120, framerate=30, bitrate=8000),
target,
)
await proc.wait()
await asyncio.sleep(45)
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
*testsrc_params(size=(1280, 720), duration=120, framerate=60, bitrate=12000),
target,
)
await proc.wait()
await asyncio.sleep(45)
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
*testsrc_params(size=(1920, 1080), duration=90, framerate=60, bitrate=8000),
target,
)
await proc.wait()
proc = await asyncio.create_subprocess_exec(
"ffmpeg",
*testsrc_params(size=(1080, 1920), duration=90, framerate=30, bitrate=8000),
target,
)
await proc.wait()
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment