Skip to content

Instantly share code, notes, and snippets.

@kfur
Last active October 25, 2021 01:54
Show Gist options
  • Save kfur/68d58b9c69b80eeae3968c0065e6f07e to your computer and use it in GitHub Desktop.
Save kfur/68d58b9c69b80eeae3968c0065e6f07e to your computer and use it in GitHub Desktop.
Youtube live chat replay to srt subtitles
import pysrt
import json
import sys
import functools
import emoji
from urllib import request
import re
class LiveChat():
replay_url_format = 'https://www.youtube.com/live_chat_replay/get_live_chat_replay?commandMetadata=%5Bobject%20Object%5D&continuation={}%253D%253D&playerOffsetMs={}&hidden=false&pbj=1'
headers = {
'authority': 'www.youtube.com',
'pragma': 'no-cache',
'cache-control': 'no-cache',
'x-youtube-device': 'cbr=Chrome&cosver=10.0&cos=Windows&cbrver=71.0.3578.80',
'x-youtube-page-label': 'youtube.ytfe.desktop_20200116_5_RC0',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.80 Safari/537.36',
'x-youtube-client-name': '1',
'accept': '*/*',
'sec-fetch-site': 'same-origin',
'sec-fetch-mode': 'cors',
'accept-language': 'en-US,en;q=0.9'
}
def __init__(self, continuation, max_comments_view_len=160, max_comments_per_view=4):
self.continuation = continuation
self.max_comments_view_len = max_comments_view_len
self.max_comments_per_view = max_comments_per_view
def _get_chat_chunk(self, playerOffsetMs):
r = request.Request(self.replay_url_format.format(self.continuation, playerOffsetMs), headers=self.headers)
with request.urlopen(r) as resp:
return resp.read()
def live_chat_to_srt(self):
lastTimeStamp = 1
newTimeStamp = -1
comments = []
while True:
chat_chunk = None
try:
chat_chunk = json.loads(self._get_chat_chunk(lastTimeStamp))
except Exception as e:
print(e)
continue
comments += parse_comments(chat_chunk)
newTimeStamp = int(chat_chunk['response']['continuationContents']['liveChatContinuation']['actions'][-1]['replayChatItemAction']['videoOffsetTimeMsec'])
if newTimeStamp == lastTimeStamp:
break
else:
lastTimeStamp = newTimeStamp
comments = functools.reduce(uniq_comments, comments, [])
subs = comments_to_subs(comments)
return pysrt.SubRipFile(subs)
def json_file_key(name):
a = name.index('.')
return int(name[:a])
class Date():
def __init__(self, sec, minute, hour=0, msec=0):
self.hour = hour
self.minute = minute
self.sec = sec
self.msec = 0
def __eq__(self, other):
return self.hour == other.hour and self.minute == other.minute and self.sec == other.sec and self.msec == other.msec
def __gt__(self, other):
if self.hour != other.hour and self.hour < other.hour:
return False
elif self.minute != other.minute and self.minute < other.minute:
return False
elif self.sec != other.sec and self.sec < other.sec:
return False
elif self.msec != other.msec and self.msec < other.msec:
return False
elif self != other:
return True
return False
class Comment():
def __init__(self, author, date, text, isModer=False):
self.author = author
self.date = date
self.text = text
self.isModer = isModer
def parse_comments(js_com):
actions = js_com['response']['continuationContents']['liveChatContinuation']['actions']
comments = []
for act in actions:
isModer = False
try:
text = act['replayChatItemAction']['actions'][0]['addChatItemAction']['item']['liveChatTextMessageRenderer']['message']['runs'][0]['text']
text = emoji.demojize(text)
except:
continue
author = act['replayChatItemAction']['actions'][0]['addChatItemAction']['item']['liveChatTextMessageRenderer']['authorName']['simpleText']
try:
isModer = act['replayChatItemAction']['actions'][0]['addChatItemAction']['item']['liveChatTextMessageRenderer']['authorBadges'][0]['liveChatAuthorBadgeRenderer']['icon']['iconType'] == "MODERATOR"
except Exception as e:
isModer = False
date_raw = act['replayChatItemAction']['actions'][0]['addChatItemAction']['item']['liveChatTextMessageRenderer']['timestampText']['simpleText']
hms = date_raw.split(':')
date = None
if len(hms) == 2:
m, s = hms
date = Date(int(s), int(m))
elif len(hms) == 3:
h, m, s = hms
date = Date(int(s), int(m), int(h))
else:
raise Exception("date array wrong")
comments.append(Comment(author, date, text, isModer))
return comments
def uniq_comments(old, new):
for i in old:
if i.author == new.author and i.date == new.date and i.text == new.text:
return old
old.append(new)
return old
def comments_to_subs(comments, max_comments_view_len=160, max_comments_per_view=4, reverse=False):
subrip_items = []
item_comments = []
item_index = 0
# fix time due to some comments can have same time with next comment
for z, c in enumerate(comments):
if len(comments) == z+1:
continue
if comments[z].date == comments[z + 1].date:
comments[z + 1].date.msec += 500
# if comments[z].date > comments[z + 1].date:
# comments[z + 1].date.msec = 500
if len(comments) == z + 2:
continue
if comments[z+1].date > comments[z + 2].date:
comments[z + 2].date.msec += 500
for i, com in enumerate(comments):
if reverse:
item_comments = ["[" + com.author + "]" + ("*" if com.isModer else "") + ": " + com.text] + item_comments
else:
item_comments.append("["+com.author+"]"+("*" if com.isModer else "") + ": " + com.text)
if len(item_comments) > max_comments_per_view:
item_comments = item_comments[:4] if reverse else item_comments[max(0, len(item_comments) - 4):]
if len('\n'.join(item_comments)) >= max_comments_view_len:
item_comments = item_comments[:3] if reverse else item_comments[max(0, len(item_comments) - 3):]
if len(comments) > i+1 and com.date > comments[i+1].date:
raise Exception('wrong time range ', i)
start_time = pysrt.SubRipTime(com.date.hour,
com.date.minute,
com.date.sec,
com.date.msec)
end_time = pysrt.SubRipTime(comments[i+1].date.hour if len(comments) > i+1 else com.date.hour,
comments[i+1].date.minute if len(comments) > i+1 else com.date.minute,
comments[i+1].date.sec if len(comments) > i+1 else (com.date.sec + 5),
comments[i+1].date.msec if len(comments) > i+1 else com.date.msec)
subitem = pysrt.SubRipItem(item_index,
start_time,
end_time,
'\n'.join(item_comments))
subrip_items.append(subitem)
item_index += 1
return subrip_items
def main(youtube_link, path=None):
# dir = os.listdir(path)
# path = os.path.normpath(path)
# dir.sort(key=json_file_key)
# comments = []
# for f in dir:
# jf = open(path+f, 'r')
# js_com = json.loads(jf.read())
# comments += parse_comments(js_com)
# comments = functools.reduce(uniq_comments, comments, [])
# subs = comments_to_subs(comments)
# pysrt.SubRipFile(subs).save(sys.argv[2])
vid_html = request.urlopen(youtube_link).read().decode()
continuation = re.findall('continuation=([a-zA-Z0-9]+)', vid_html)[0]
lc = LiveChat(continuation)
lc.live_chat_to_srt().save(path)
def print_usage():
print('''Usage:
yreplay2srt.py https://youtu.be/qy_qonT38DY subtitles.srt
''')
if __name__ == '__main__':
if len(sys.argv) != 3:
print_usage()
sys.exit(-1)
main(sys.argv[1], sys.argv[2])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment