Skip to content

Instantly share code, notes, and snippets.

@yutsuku
Forked from tung/twitch-vod-chat.py
Last active January 15, 2022 01:48
Show Gist options
  • Save yutsuku/86e0d70793aa80d31e13e6f69d388c99 to your computer and use it in GitHub Desktop.
Save yutsuku/86e0d70793aa80d31e13e6f69d388c99 to your computer and use it in GitHub Desktop.
Download chat from a Twitch VOD and print it to a terminal.
#!/usr/bin/env python3
#
# A script to download chat from a Twitch VOD in SubRip (*.srt) format
# Chat will be downloaded all the way until it ends.
#
# Usage: TWITCH_CLIENT_ID=0123456789abcdef0123456789abcde twitch-vod-chat.py [video_id] [start]
#
# This script could break at any time, because Twitch's chat API is
# undocumented and likes to change at any time; in fact, this script was
# created because Twitch got rid of rechat.twitch.tv.
#
import json
import os
import requests
import sys
import time
import types
import math
def time_to_hhmmss(t):
hours = int(t // 3600)
minutes = int((t - hours * 3600) // 60)
seconds = int(t - hours * 3600 - minutes * 60)
milliseconds = int((t - hours * 3600 - minutes * 60 - seconds) * 1000)
return "{0}:{1:02}:{2:02}.{3:<03}".format(hours, minutes, seconds, milliseconds)
def time_to_srt(t):
hours = int(t // 3600)
minutes = int((t - hours * 3600) // 60)
seconds = int(t - hours * 3600 - minutes * 60)
milliseconds = int((t - hours * 3600 - minutes * 60 - seconds) * 1000)
return "{0:02}:{1:02}:{2:02},{3:<03}".format(hours, minutes, seconds, milliseconds)
def lighten_color(color):
r = color.r * 3 // 4 + 63
g = color.g * 3 // 4 + 63
b = color.b * 3 // 4 + 63
return types.SimpleNamespace(r=r, g=g, b=b)
def clamp(x):
return max(0, min(x, 255))
def rgb_to_hex(color):
return "#{0:02x}{1:02x}{2:02x}".format(clamp(color.r), clamp(color.g), clamp(color.b))
def message_color(comment):
color_by_name = {
'white': (255, 255, 255),
'black': (0, 0, 0),
'red': (255, 0, 0),
'green': (0, 255, 0),
'blue': (0, 0, 255),
'yellow': (255, 255, 0),
'gray': (128, 128, 128),
'magenta': (255, 0, 255),
'cyan': (0, 255, 255)
}
r = 128
g = 128
b = 128
if 'user_color' in comment['message']:
user_color = comment['message']['user_color']
if len(user_color) == 7 and user_color[0] == '#':
r = int(user_color[1:3], 16)
g = int(user_color[3:5], 16)
b = int(user_color[5:7], 16)
elif user_color in color_by_name:
c = color_by_name[user_color]
r = c[0]
g = c[1]
b = c[2]
return types.SimpleNamespace(r=r, g=g, b=b)
def badge_icons(message):
b = ''
if 'user_badges' in message:
for badge in message['user_badges']:
if badge['_id'] == 'broadcaster':
b += '🎥'
elif badge['_id'] == 'moderator':
b += '⚔'
elif badge['_id'] == 'subscriber':
b += '★'
elif badge['_id'] == 'staff':
b += '⛨'
return b
def simple_name(commenter):
name = commenter['name']
display_name = commenter['display_name']
if display_name:
c = display_name[0].lower()
if (c >= 'a' and c <= 'z') or (c >= '0' and c <= '9') or c == '_':
name = display_name
return name
def format_message(comment):
time = comment['content_offset_seconds']
time_to = comment['msg_time_to']
badges = badge_icons(comment['message'])
name = simple_name(comment['commenter'])
color = lighten_color(message_color(comment))
counter = comment['seq_counter']
message = comment['message']['body']
if comment['message']['is_action']:
#message = '\033[38;2;' + str(color.r) + ';' + str(color.g) + ';' + str(color.b) + 'm' + message + '\033[0m'
message = '<font color="{color_hex}">{message}</font>'.format(message=message, color_hex=rgb_to_hex(color))
#nick = '\033[38;2;{c.r};{c.g};{c.b}m<{badges}{name}>\033[0m'.format(badges=badges, name=name, c=color)
#if 'user_badges' in comment['message']:
# is_broadcaster = False
# for badge in comment['message']['user_badges']:
# if badge['_id'] == 'broadcaster':
# is_broadcaster = True
# break
# if is_broadcaster:
# nick = '\033[7m' + nick
nick = '<font color="{color_hex}">{name}</font>'.format(name=name, color_hex=rgb_to_hex(color))
#return "\033[94m{counter} {time} -> {time_to}\033[0m {nick} {message}".format(counter=counter, time=time_to_srt(time), time_to=time_to_srt(time_to), nick=nick, message=message)
return """{counter}
{time} --> {time_to}
{nick}: {message}
""".format(counter=counter, time=time_to_srt(time), time_to=time_to_srt(time_to), nick=nick, message=message)
def print_response_messages(data, start):
for comment in data['comments']:
if comment['content_offset_seconds'] < start:
continue
if comment['source'] != 'chat':
continue
print(format_message(comment))
def process_response_section(data, start, duration_multiplier, max_duration):
comments = data['comments']
global seq_counter
last_msg_offset = comments[-1]['content_offset_seconds']
segment_duration = last_msg_offset - comments[0]['content_offset_seconds']
per_msg_duration = min(segment_duration * duration_multiplier / len(comments), max_duration)
for comment in comments:
if comment['content_offset_seconds'] < start:
continue
if comment['source'] != 'chat':
continue
comment['msg_time_from'] = comment['content_offset_seconds']
msg_time_from = comment['content_offset_seconds']
msg_time_from_ms = math.floor(msg_time_from * 1000) % 1000
msg_time_from_sec = math.floor(msg_time_from) % 60
msg_time_from_min = math.floor(msg_time_from / 60) % 60
msg_time_from_hour = math.floor(msg_time_from / 3600)
comment['msg_time_to'] = msg_time_from + per_msg_duration
comment['seq_counter'] = seq_counter
msg_time_to = msg_time_from + per_msg_duration
msg_time_to_ms = math.floor(msg_time_to * 1000) % 1000
msg_time_to_sec = math.floor(msg_time_to) % 60
msg_time_to_min = math.floor(msg_time_to / 60) % 60
msg_time_to_hour = math.floor(msg_time_to / 3600)
print(format_message(comment))
seq_counter = seq_counter + 1
################################################################################
if len(sys.argv) < 3 or 'TWITCH_CLIENT_ID' not in os.environ:
print('Usage: TWITCH_CLIENT_ID=[client_id] {0} <video_id> <start>'.format(sys.argv[0]), file=sys.stderr)
sys.exit(1)
video_id = sys.argv[1]
start = int(sys.argv[2])
duration_multiplier = 10
max_duration = 10
seq_counter = 1
session = requests.Session()
session.headers = { 'Client-ID': os.environ['TWITCH_CLIENT_ID'], 'Accept': 'application/vnd.twitchtv.v5+json' }
response = session.get('https://api.twitch.tv/v5/videos/' + video_id + '/comments?content_offset_seconds=' + str(start), timeout=10)
response.raise_for_status()
data = response.json()
process_response_section(data, start, duration_multiplier, max_duration)
cursor = None
if '_next' in data:
cursor = data['_next']
time.sleep(0.1)
while cursor:
response = session.get('https://api.twitch.tv/v5/videos/' + video_id + '/comments?cursor=' + cursor, timeout=10)
response.raise_for_status()
data = response.json()
process_response_section(data, start, duration_multiplier, max_duration)
if '_next' in data:
cursor = data['_next']
time.sleep(0.1)
else:
cursor = None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment