Last active
December 10, 2022 00:36
-
-
Save gwanryo/51fdee11c5a8e3313671267fc2cd0fd4 to your computer and use it in GitHub Desktop.
twitch-recorder.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
#-*- coding: utf-8 -*- | |
# https://github.com/ancalentari/twitch-stream-recorder | |
import datetime | |
import enum | |
import getopt | |
import logging | |
import os | |
import subprocess | |
import sys | |
import shutil | |
import time | |
import telegram | |
import requests | |
class Config: | |
root_path = "" | |
username = "" | |
client_id = "" | |
client_secret = "" | |
# telegram alarm bot | |
telegram_token = "" | |
channel_token = 0 | |
class TwitchResponseStatus(enum.Enum): | |
ONLINE = 0 | |
OFFLINE = 1 | |
NOT_FOUND = 2 | |
UNAUTHORIZED = 3 | |
ERROR = 4 | |
class TwitchRecorder: | |
def __init__(self): | |
# global configuration | |
self.ffmpeg_path = "/usr/bin/ffmpeg" | |
self.disable_ffmpeg = False | |
self.refresh = 15 | |
self.root_path = Config.root_path | |
# user configuration | |
self.username = Config.username | |
self.quality = "best" | |
# twitch configuration | |
self.client_id = Config.client_id | |
self.client_secret = Config.client_secret | |
self.token_url = "https://id.twitch.tv/oauth2/token?client_id=" + self.client_id + "&client_secret=" + self.client_secret + "&grant_type=client_credentials" | |
self.url = "https://api.twitch.tv/helix/streams" | |
self.access_token = self.fetch_access_token() | |
# telegram alarm bot | |
self.telegram_token = Config.telegram_token | |
self.channel_token = Config.channel_token | |
self.bot = telegram.Bot(token=self.telegram_token) | |
def sendMessage(self, msg): | |
return self.bot.sendMessage(chat_id=self.channel_token, text=msg) | |
def fetch_access_token(self): | |
try: | |
token_response = requests.post(self.token_url, timeout=15) | |
token_response.raise_for_status() | |
token = token_response.json() | |
except requests.exceptions.RequestException as e: | |
if hasattr("response", e): | |
self.sendMessage(f"Request token fail with RequestException {e.response} {e.response.reason} : {e.response.text}") | |
logging.error(f"Request token fail with RequestException {e.response} {e.response.reason} : {e.response.text}") | |
else: | |
self.sendMessage("Request token fail with RequestException, no response") | |
logging.error("Request token fail with RequestException, no response") | |
return token["access_token"] | |
def run(self): | |
# path to recorded stream | |
recorded_path = os.path.join(self.root_path, "recorded", self.username) | |
# path to finished video, errors removed | |
processed_path = os.path.join(self.root_path, "processed", self.username) | |
# create directory for recordedPath and processedPath if not exist | |
if os.path.isdir(recorded_path) is False: | |
os.makedirs(recorded_path) | |
if os.path.isdir(processed_path) is False: | |
os.makedirs(processed_path) | |
# make sure the interval to check user availability is not less than 15 seconds | |
if self.refresh < 15: | |
logging.warning("check interval should not be lower than 15 seconds") | |
self.refresh = 15 | |
logging.info("system set check interval to 15 seconds") | |
# fix videos from previous recording session | |
try: | |
video_list = [f for f in os.listdir(recorded_path) if os.path.isfile(os.path.join(recorded_path, f))] | |
if len(video_list) > 0: | |
logging.info("processing previously recorded files") | |
for f in video_list: | |
recorded_filename = os.path.join(recorded_path, f) | |
processed_filename = os.path.join(processed_path, f) | |
self.process_recorded_file(recorded_filename, processed_filename) | |
except Exception as e: | |
logging.error(e) | |
self.sendMessage( | |
"Error while fix videos from previous recording session") | |
self.sendMessage(e) | |
logging.info(f"checking for {self.username} every {self.refresh} seconds, recording with {self.quality} quality") | |
self.sendMessage(f"checking for {self.username} every {self.refresh} seconds, recording with {self.quality} quality") | |
self.loop_check(recorded_path, processed_path) | |
def process_recorded_file(self, recorded_filename, processed_filename): | |
if self.disable_ffmpeg: | |
logging.info(f"Moving: {recorded_filename}") | |
self.sendMessage(f"Moving: {recorded_filename}") | |
shutil.move(recorded_filename, processed_filename) | |
else: | |
logging.info(f"Fixing: {recorded_filename}") | |
self.sendMessage(f"Fixing: {recorded_filename}") | |
self.ffmpeg_copy_and_fix_errors(recorded_filename, processed_filename) | |
def ffmpeg_copy_and_fix_errors(self, recorded_filename, processed_filename): | |
try: | |
subprocess.call([self.ffmpeg_path, "-err_detect", "ignore_err", "-i", recorded_filename, "-c", "copy", processed_filename]) | |
os.remove(recorded_filename) | |
except Exception as e: | |
logging.error(e) | |
self.sendMessage(f"Error while fixing error in {recorded_filename} with ffmpeg.") | |
self.sendMessage(e) | |
def check_user(self): | |
info = None | |
status = TwitchResponseStatus.ERROR | |
try: | |
headers = {"Client-ID": self.client_id, "Authorization": "Bearer " + self.access_token} | |
r = requests.get(self.url + "?user_login=" + self.username, headers=headers, timeout=15) | |
r.raise_for_status() | |
info = r.json() | |
if info is None or not info["data"]: | |
status = TwitchResponseStatus.OFFLINE | |
else: | |
status = TwitchResponseStatus.ONLINE | |
except requests.exceptions.RequestException as e: | |
if e.response: | |
if e.response.status_code == 401: | |
status = TwitchResponseStatus.UNAUTHORIZED | |
if e.response.status_code == 404: | |
status = TwitchResponseStatus.NOT_FOUND | |
return status, info | |
def loop_check(self, recorded_path, processed_path): | |
while True: | |
status, info = self.check_user() | |
if status == TwitchResponseStatus.NOT_FOUND: | |
logging.error("username not found, invalid username or typo") | |
self.sendMessage("username not found, invalid username or typo") | |
time.sleep(self.refresh) | |
elif status == TwitchResponseStatus.ERROR: | |
logging.error(f"{datetime.datetime.now().strftime('%Hh%Mm%Ss')} unexpected error. will try again in 5 minutes") | |
self.sendMessage(f"{datetime.datetime.now().strftime('%Hh%Mm%Ss')} unexpected error. will try again in 5 minutes") | |
time.sleep(self.refresh) | |
elif status == TwitchResponseStatus.OFFLINE: | |
logging.info(f"{self.username} currently offline, checking again in {self.refresh} seconds") | |
time.sleep(self.refresh) | |
elif status == TwitchResponseStatus.UNAUTHORIZED: | |
logging.info("Unauthorized, will attempt to log back in immediately") | |
self.sendMessage("Unauthorized, will attempt to log back in immediately") | |
self.access_token = self.fetch_access_token() | |
elif status == TwitchResponseStatus.ONLINE: | |
logging.info(f"{self.username} online, stream recording in session") | |
self.sendMessage(f"{self.username} online, stream recording in session") | |
channels = info["data"] | |
channel = next(iter(channels), None) | |
filename = self.username + "-" + \ | |
datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + ".mp4" | |
# clean filename from unnecessary characters | |
filename = "".join(x for x in filename if x.isalnum() or x in [" ", "-", "_", "."]) | |
recorded_filename = os.path.join(recorded_path, filename) | |
processed_filename = os.path.join(processed_path, filename) | |
# start streamlink process | |
subprocess.call( | |
["streamlink", "--twitch-disable-ads", "--twitch-disable-hosting", "twitch.tv/" + self.username, self.quality, "-o", recorded_filename]) | |
logging.info("Recording stream is done, processing video file") | |
self.sendMessage( | |
"Recording stream is done, processing video file") | |
if os.path.exists(recorded_filename) is True: | |
self.process_recorded_file(recorded_filename, processed_filename) | |
else: | |
logging.info("Skip fixing, file not found") | |
self.sendMessage("Skip fixing, file not found") | |
logging.info(f"Processing {recorded_filename} is done, going back to checking...") | |
self.sendMessage(f"Processing {recorded_filename} is done, going back to checking...") | |
time.sleep(self.refresh) | |
def main(argv): | |
twitch_recorder = TwitchRecorder() | |
usage_message = "twitch-recorder.py -u <username> -q <quality>" | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s') | |
# logging.basicConfig(filename="twitch-recorder.log", level=logging.INFO) | |
# logging.getLogger().addHandler(logging.StreamHandler()) | |
try: | |
opts, args = getopt.getopt(argv, "hu:q:l:", ["username=", "quality=", "log=", "logging=", "disable-ffmpeg"]) | |
except getopt.GetoptError: | |
print(usage_message) | |
sys.exit(2) | |
for opt, arg in opts: | |
if opt == "-h": | |
print(usage_message) | |
sys.exit() | |
elif opt in ("-u", "--username"): | |
twitch_recorder.username = arg | |
elif opt in ("-q", "--quality"): | |
twitch_recorder.quality = arg | |
elif opt in ("-l", "--log", "--logging"): | |
logging_level = getattr(logging, arg.upper(), None) | |
if not isinstance(logging_level, int): | |
raise ValueError("invalid log level: %s" % logging_level) | |
logging.basicConfig(level=logging_level) | |
logging.info("logging configured to %s", arg.upper()) | |
elif opt == "--disable-ffmpeg": | |
twitch_recorder.disable_ffmpeg = True | |
logging.info("ffmpeg disabled") | |
twitch_recorder.run() | |
if __name__ == "__main__": | |
main(sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment