Skip to content

Instantly share code, notes, and snippets.

@Coldblackice
Forked from junian/twitch-recorder.py
Created March 31, 2023 10:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Coldblackice/2156c946b6f7fe5e36a12721bfd300d1 to your computer and use it in GitHub Desktop.
Save Coldblackice/2156c946b6f7fe5e36a12721bfd300d1 to your computer and use it in GitHub Desktop.
Record Twitch Streams Automatically in Python
# This code is based on tutorial by slicktechies modified as needed to use oauth token from Twitch.
# You can read more details at: https://www.junian.net/2017/01/how-to-record-twitch-streams.html
# original code is from https://slicktechies.com/how-to-watchrecord-twitch-streams-using-livestreamer/
import requests
import os
import time
import json
import sys
import subprocess
import datetime
import getopt
class TwitchRecorder:
def __init__(self):
# global configuration
self.client_id = "jzkbprff40iqj646a697cyrvl0zt2m6" # don't change this
# get oauth token value by typing `streamlink --twitch-oauth-authenticate` in terminal
self.oauth_token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
self.ffmpeg_path = 'ffmpeg'
self.refresh = 30.0
self.root_path = "/Users/junian/Documents/twitch"
# user configuration
self.username = "juniantr"
self.quality = "best"
def run(self):
# path to recorded stream
self.recorded_path = os.path.join(self.root_path, "recorded", self.username)
# path to finished video, errors removed
self.processed_path = os.path.join(self.root_path, "processed", self.username)
# create directory for recordedPath and processedPath if not exist
if(os.path.isdir(self.recorded_path) is False):
os.makedirs(self.recorded_path)
if(os.path.isdir(self.processed_path) is False):
os.makedirs(self.processed_path)
# make sure the interval to check user availability is not less than 15 seconds
if(self.refresh < 15):
print("Check interval should not be lower than 15 seconds.")
self.refresh = 15
print("System set check interval to 15 seconds.")
# fix videos from previous recording session
try:
video_list = [f for f in os.listdir(self.recorded_path) if os.path.isfile(os.path.join(self.recorded_path, f))]
if(len(video_list) > 0):
print('Fixing previously recorded files.')
for f in video_list:
recorded_filename = os.path.join(self.recorded_path, f)
print('Fixing ' + recorded_filename + '.')
try:
subprocess.call([self.ffmpeg_path, '-err_detect', 'ignore_err', '-i', recorded_filename, '-c', 'copy', os.path.join(self.processed_path,f)])
os.remove(recorded_filename)
except Exception as e:
print(e)
except Exception as e:
print(e)
print("Checking for", self.username, "every", self.refresh, "seconds. Record with", self.quality, "quality.")
self.loopcheck()
def check_user(self):
# 0: online,
# 1: offline,
# 2: not found,
# 3: error
url = 'https://api.twitch.tv/kraken/streams/' + self.username
info = None
status = 3
try:
r = requests.get(url, headers = {"Client-ID" : self.client_id}, timeout = 15)
r.raise_for_status()
info = r.json()
if info['stream'] == None:
status = 1
else:
status = 0
except requests.exceptions.RequestException as e:
if e.response:
if e.response.reason == 'Not Found' or e.response.reason == 'Unprocessable Entity':
status = 2
return status, info
def loopcheck(self):
while True:
status, info = self.check_user()
if status == 2:
print("Username not found. Invalid username or typo.")
time.sleep(self.refresh)
elif status == 3:
print(datetime.datetime.now().strftime("%Hh%Mm%Ss")," ","unexpected error. will try again in 5 minutes.")
time.sleep(300)
elif status == 1:
print(self.username, "currently offline, checking again in", self.refresh, "seconds.")
time.sleep(self.refresh)
elif status == 0:
print(self.username, "online. Stream recording in session.")
filename = self.username + " - " + datetime.datetime.now().strftime("%Y-%m-%d %Hh%Mm%Ss") + " - " + (info['stream']).get("channel").get("status") + ".mp4"
# clean filename from unecessary characters
filename = "".join(x for x in filename if x.isalnum() or x in [" ", "-", "_", "."])
recorded_filename = os.path.join(self.recorded_path, filename)
# start streamlink process
subprocess.call(["streamlink", "--twitch-oauth-token", self.oauth_token, "twitch.tv/" + self.username, self.quality, "-o", recorded_filename])
print("Recording stream is done. Fixing video file.")
if(os.path.exists(recorded_filename) is True):
try:
subprocess.call([self.ffmpeg_path, '-err_detect', 'ignore_err', '-i', recorded_filename, '-c', 'copy', os.path.join(self.processed_path, filename)])
os.remove(recorded_filename)
except Exception as e:
print(e)
else:
print("Skip fixing. File not found.")
print("Fixing is done. Going back to checking..")
time.sleep(self.refresh)
def main(argv):
twitch_recorder = TwitchRecorder()
usage_message = 'twitch-recorder.py -u <username> -q <quality>'
try:
opts, args = getopt.getopt(argv,"hu:q:",["username=","quality="])
except getopt.GetoptError:
print (usage_message)
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print(usage_message)
sys.exit()
elif opt in ("-u", "--username"):
twitch_recorder.username = arg
elif opt in ("-q", "--quality"):
twitch_recorder.quality = arg
twitch_recorder.run()
if __name__ == "__main__":
main(sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment