Skip to content

Instantly share code, notes, and snippets.

@gwanryo

gwanryo/twitch-recorder.py

Last active Jun 20, 2020
Embed
What would you like to do?
twitch-recorder.py
#!/usr/bin/python3
# This code is based on tutorial by slicktechies modified as needed to use oauth token from Twitch.
# You can read more details at: https://www.junian.net/2017/01/how-to-record-twitch-streams.html
# original code is from https://slicktechies.com/how-to-watchrecord-twitch-streams-using-livestreamer/
#-*- coding: utf-8 -*-
import requests
import os
import time
import json
import sys
import subprocess
import datetime
import getopt
import logging
import telegram
class TwitchRecorder:
def __init__(self):
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s')
# global configuration
self.client_id = "kimne78kx3ncx6brgo4mv6wki5h1ko" # Don't change this
self.accept = "application/vnd.twitchtv.v5+json" # Don't edit this
self.headers = {"Client-ID": self.client_id, "Accept": self.accept}
# record configuration
self.ffmpeg_path = '<ffmpeg path>'
self.refresh = <refresh value> # min value is 15.0
self.root_path = "<save path>"
# user configuration
self.username = "<twitch username>"
self.quality = "<target quality>"
self.user_id = ""
# telegram alarm bot
self.telegram_token = "<telegram token>"
self.channel_token = <telegram channel token>
self.bot = telegram.Bot(token = self.telegram_token)
def sendMessage(self, msg):
return self.bot.sendMessage(chat_id=self.channel_token, text=msg)
def run(self):
# path to recorded stream
self.recorded_path = os.path.join(self.root_path, "recorded", self.username)
# path to finished video, errors removed
self.processed_path = os.path.join(self.root_path, "processed", self.username)
# create directory for recordedPath and processedPath if not exist
if(os.path.isdir(self.recorded_path) is False):
os.makedirs(self.recorded_path)
if(os.path.isdir(self.processed_path) is False):
os.makedirs(self.processed_path)
# make sure the interval to check user availability is not less than 15 seconds
if(self.refresh < 15):
logging.warn("Check interval should not be lower than 15 seconds.")
self.refresh = 15
logging.debug("System set check interval to 15 seconds.")
# fix videos from previous recording session
try:
video_list = [f for f in os.listdir(self.recorded_path) if os.path.isfile(os.path.join(self.recorded_path, f))]
if(len(video_list) > 0):
logging.info('# Fixing previously recorded files.')
for f in video_list:
recorded_filename = os.path.join(self.recorded_path, f)
logging.info('- Fixing ' + recorded_filename + '.')
try:
# https://stackoverflow.com/a/39722419
p = subprocess.Popen("{self.ffmpeg_path} -err_detect ignore_err -i {recorded_filename} -c copy {os.path.join(self.processed_path, f)}\n;rm -f {recorded_filename}\n", shell=True)
except Exception as e:
logging.error(e)
self.sendMessage(f"Error while fixing error in {recorded_filename} with ffmpeg.")
except Exception as e:
logging.error(e)
self.sendMessage(f"Checking for {self.username} every {self.refresh} second. Record with {self.quality} quality.")
logging.info("Checking for {} every {} second. Record with {} quality.".format(self.username, self.refresh, self.quality))
self.loopcheck()
def get_user_id(self, username):
url = 'https://api.twitch.tv/kraken/users?login=' + self.username
info = None
try:
r = requests.get(url, headers = self.headers, timeout = 15)
r.raise_for_status()
info = r.json()
if not info['_total'] == 0:
self.sendMessage(f"Get user id for {self.username} returns {info}")
self.user_id = info['users'][0]['_id']
else:
self.sendMessage(f"Get user id for {self.username} is failed")
except requests.exceptions.RequestException as e:
try:
self.sendMessage(f"RequestException {e.response} {e.response.reason} : {e.response.text}")
logging.error(f"RequestException {e.response} {e.response.reason} : {e.response.text}")
except:
self.sendMessage("RequestException with none response")
logging.error("RequestException with none response")
self.sendMessage(f"{self.username} is now {self.user_id}")
def check_user(self):
# 0: online,
# 1: offline,
# 2: not found,
# 3: error
if self.user_id == "":
self.get_user_id(self.username)
if self.user_id == "":
return
url = 'https://api.twitch.tv/kraken/streams/' + self.user_id
info = None
status = 3
try:
r = requests.get(url, headers = self.headers, timeout = 15)
r.raise_for_status()
info = r.json()
if not info['stream']:
status = 1
else:
status = 0
except requests.exceptions.RequestException as e:
try:
self.sendMessage(f"RequestException {e.response} {e.response.reason} : {e.response.text}")
logging.error(f"RequestException {e.response} {e.response.reason} : {e.response.text}")
except:
self.sendMessage("RequestException with none response")
logging.error("RequestException with none response")
if e.response:
if e.response.reason == 'Not Found' or e.response.reason == 'Unprocessable Entity':
status = 2
return status, info
def loopcheck(self):
while True:
status, info = self.check_user()
if status == 2:
logging.error(f"Username {self.username} not found. Invalid username or typo.")
self.sendMessage(f"Username {self.username} not found. Invalid username or typo.")
time.sleep(self.refresh)
elif status == 3:
self.sendMessage(f"{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')} unexpected error. will try again in 0.5 minutes.")
logging.error(f"{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')} unexpected error. will try again in 0.5 minutes.")
time.sleep(30)
elif status == 1:
logging.info(f"{self.username} currently offline, checking again in {self.refresh} seconds.")
time.sleep(self.refresh)
elif status == 0:
self.sendMessage(f"{self.username} online. Stream recording in session.")
logging.info(f"{self.username} online. Stream recording in session.")
filename = self.username + "-" + datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + ".mp4"
# clean filename from unecessary characters
filename = "".join(x for x in filename if x.isalnum() or x in [" ", "-", "_", "."])
recorded_filename = os.path.join(self.recorded_path, filename).encode('utf-8')
# start streamlink process
subprocess.call(["streamlink", "twitch.tv/" + self.username, self.quality, "--twitch-disable-hosting", "-o", recorded_filename])
self.sendMessage("Recording stream is done. Fixing video file.")
logging.info("# Recording stream is done. Fixing video file.")
if(os.path.exists(recorded_filename) is True):
try:
subprocess.call([self.ffmpeg_path, '-err_detect', 'ignore_err', '-i', recorded_filename, '-c', 'copy', os.path.join(self.processed_path, filename)])
os.remove(recorded_filename)
except Exception as e:
logging.error(e)
else:
self.sendMessage(f"Skip fixing {recorded_filename}. File not found.")
logging.info(f"- Skip fixing {recorded_filename}. File not found.")
self.sendMessage(f"Fixing {recorded_filename} is done. Going back to checking...")
logging.info(f"- Fixing {recorded_filename} is done. Going back to checking...")
time.sleep(self.refresh)
def main(argv):
twitch_recorder = TwitchRecorder()
usage_message = 'twitch-recorder.py -u <username> -q <quality>'
try:
opts, args = getopt.getopt(argv,"hu:q:",["username=","quality="])
except getopt.GetoptError:
print(usage_message)
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print(usage_message)
sys.exit()
elif opt in ("-u", "--username"):
twitch_recorder.username = arg
elif opt in ("-q", "--quality"):
twitch_recorder.quality = arg
twitch_recorder.run()
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment