Skip to content

Instantly share code, notes, and snippets.

@ekaitz-zarraga
Forked from ZiTAL/db.py
Last active March 31, 2024 19:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ekaitz-zarraga/e7102449fef9243dc93c655a0990ae6f to your computer and use it in GitHub Desktop.
Save ekaitz-zarraga/e7102449fef9243dc93c655a0990ae6f to your computer and use it in GitHub Desktop.
Dragoi Bola: Twitch plataforman 24/7 martxan!
import os
import sys
import re
import tempfile
from ffmpeg import FFmpeg, Progress # use `python-ffmpeg`
from datetime import datetime, timedelta
from collections import OrderedDict
sys.path.append('../../')
from src.TwitchFfmpeg import TwitchFfmpeg, TwitchConfig
def str2ms(time):
t = datetime.strptime(time, "%H:%M:%S.%f")
ml = (t.hour * 3600 + t.minute * 60 + t.second) * 1000 + int(t.microsecond / 1000)
return ml
def getVideoInfo(file):
ffprobe = FFmpeg(executable="ffprobe").input(
file,
print_format="json", # ffprobe will output the results in JSON format
show_streams=None,
)
return json.loads(ffprobe.execute())
def getDuration(video_info):
return timedelta(ms=str2ms(video_info["streams"][0]["duration"]))
def getTitle(video_info):
return video_info["streams"][0]["title"].strip()
def getVideos(folder, ext):
videos = []
for file in os.listdir(folder):
if file.endswith(f".{ext}"):
videos.append(os.path.join(os.path.realpath(folder), file))
videos = sorted(videos)
return videos
def getVideoInfoFromVideoList(ms, videos_list):
for key,value in videos_list.items():
if ms < key:
value['ms'] = key
return value
def setTempFile(data):
with tempfile.NamedTemporaryFile(delete=False) as tmp_file:
tmp_file.write(data.encode('utf-8'))
tmp_file.flush()
return tmp_file.name
def setCurrentVideo(i):
with open("./current", 'w') as file:
file.write(i)
file.flush()
return True
def getCurrentVideo():
try:
with open("./current", 'r') as file:
for line in file:
return line.strip()
except:
return None
def removeCurrentVideo():
os.unlink("./current")
def removeConcatFile(concat_file):
os.unlink(concat_file)
def getTwitchUrl():
tc = TwitchConfig("../../config")
twitch_channel = tc.getChannel()
twitch_url = f"{twitch_channel['server']}{twitch_channel['key']}"
return twitch_url
def setStreamTitle(title):
tf = TwitchFfmpeg("../../config")
tf.Api.setStreamTitle(title)
def main():
videos_list = OrderedDict()
videos = getVideos("./", "mp4")
key = timedelta(ms=0)
concat_info = ""
current_video = getCurrentVideo()
_continue = False
for i in videos:
video_info = getVideoInfo(i)
if current_video == None or i == current_video:
_continue = True
if _continue:
duration = getDuration(video_info)
title = getTitle(video_info)
key = key + duration
videos_list[key] = {"title": title, "file": i}
concat_info = concat_info + f"file '{i}'\n"
concat_file = setTempFile(concat_info)
ffmpeg = (
FFmpeg()
.option("re")
.option("safe", "0")
.option("f", "flv")
.option("f", "concat")
.input(concat_file)
.output(
getTwitchUrl(),
{"codec": "copy"},
preset="ultrafast",
tune="zerolatency",
)
)
@ffmpeg.on("start")
def on_start(arguments: list[str]):
pass
@ffmpeg.on("progress")
def on_progress(progress: Progress):
delta_t = progress.time # Timedelta!
print(f"ms: {delta_t}")
video_tmp = getVideoInfoFromVideoList(delta_t, videos_list)
if title != video_tmp['title']:
title = video_tmp['title']
setCurrentVideo(video_tmp['file'])
setStreamTitle(title)
print(video_tmp)
@ffmpeg.on("completed")
def on_start(arguments: list[str]):
removeConcatFile(concat_file)
removeCurrentVideo()
ffmpeg.execute()
if __name__ == "__main__":
while True:
main()
# /etc/systemd/system/db.service/db.service
[Unit]
Description=Dragoi Bola Zuzenean
After=network.target
[Service]
WorkingDirectory=/home/projects/twitch-ffmpeg/multimedia/db
User=pi
Group=pi
Restart=on-failure
RestartSec=5
ExecStart=/usr/bin/python3 /home/projects/twitch-ffmpeg/multimedia/db/db.py
[Install]
WantedBy=multi-user.target
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment