Last active
May 4, 2020 22:39
-
-
Save tyrust/5eb83b05d0204370ada319237d01a045 to your computer and use it in GitHub Desktop.
Given app client ID and oauth token print followed live streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import datetime | |
import json | |
import sys | |
import urllib.parse | |
import urllib.request | |
# printing tables with the standard lib is annoying | |
from tabulate import tabulate | |
class TwitchSession: | |
def __init__(self, client_id, token): | |
self.client_id = client_id | |
self.token = token | |
def get_resource_data(self, resource, param_list=None): | |
if param_list is None: | |
param_list = [] | |
if len(param_list) > 100: | |
print('Sorry tyrus, time to refactor for chunking, twitch only allows ' | |
'requesting 100 things at a time') | |
cursor = '' | |
data = [] | |
while True: | |
# print(len(param_list)) | |
if cursor: | |
if param_list[0][0] == 'after': | |
param_list.pop(0) | |
param_list.insert(0, ('after', cursor)) | |
params = urllib.parse.urlencode(param_list) | |
request = urllib.request.Request('https://api.twitch.tv/helix/%s?%s' % | |
(resource, params)) | |
request.add_header('Client-ID', self.client_id) | |
request.add_header('Authorization', 'Bearer ' + self.token) | |
# print(request.headers) | |
# print(request.full_url) | |
try: | |
response = json.loads(urllib.request.urlopen(request).read()) | |
except urllib.error.HTTPError as e: | |
if e.code == 401: | |
print(e.reason) | |
raise ValueError('try getting a new token') from e | |
raise e | |
# print(response) | |
data.extend(response['data']) | |
cursor = response.get('pagination', {}).get('cursor') | |
if not cursor: | |
break | |
if cursor == 'IA': | |
# print("bug here") | |
break | |
# print("normal cursor") | |
return data | |
def get_user(self): | |
return self.get_resource_data('users')[0] | |
def get_follows_from(self, user): | |
params = [('first', 100), ('from_id', user['id'])] | |
return self.get_resource_data('users/follows', params) | |
def get_streams(self, user_ids): | |
params = [('first', 100)] + [('user_id', uid) for uid in user_ids] | |
return self.get_resource_data('streams', params) | |
def get_id_to_game_map(self, game_ids): | |
params = [('id', gid) for gid in game_ids] | |
return {g['id']: g for g in self.get_resource_data('games', params)} | |
def print_table(headers, data): | |
print(tabulate(map(lambda r: [r[h] for h in headers], data), headers=headers)) | |
def duration_string(timedelta): | |
ret = '' | |
if timedelta.days > 0: | |
ret += '{}d '.format(timedelta.days) | |
hours, seconds = divmod(timedelta.seconds, 3600) | |
if hours > 0: | |
ret += '{}h '.format(hours) | |
ret += '{}m'.format(int(seconds/60)) | |
return ret | |
def main(): | |
if len(sys.argv) != 3: | |
print('usage: {} $client_id [$oauth_token]'.format(sys.argv[0])) | |
if len(sys.argv) == 1: | |
print('1. create an app at https://dev.twitch.tv/console/apps/create\n' + | |
' (set OAuth Redirect URLs to "http://localhost")') | |
print('2. run') | |
print('$ {} $client_id'.format(sys.argv[0])) | |
print('to get an oauth_token') | |
if len(sys.argv) == 2: | |
# https://dev.twitch.tv/docs/authentication/getting-tokens-oauth#oauth-implicit-code-flow | |
print('get your oauth token at') | |
print('https://id.twitch.tv/oauth2/authorize' + | |
'?client_id={}'.format(sys.argv[1]) + | |
'&redirect_uri=http://localhost' + | |
'&response_type=token') | |
sys.exit(1) | |
session = TwitchSession(sys.argv[1], sys.argv[2]) | |
user = session.get_user() | |
following_ids = [f['to_id'] for f in session.get_follows_from(user)] | |
streams = session.get_streams(following_ids) | |
game_ids = [s['game_id'] for s in streams] | |
game_id_to_game = session.get_id_to_game_map(game_ids) | |
stream_rows = [] | |
for stream in streams: | |
row = {} | |
row['user'] = stream['user_name'] | |
game = game_id_to_game.get(stream['game_id']) | |
row['game'] = game['name'] if game else 'None' | |
started_at = datetime.datetime.strptime(stream['started_at'][:-1], | |
'%Y-%m-%dT%X') | |
uptime = datetime.datetime.utcnow() - started_at | |
row['info'] = '{} ({} watching, {} uptime)'.format(stream['title'], | |
stream['viewer_count'], | |
duration_string(uptime)) | |
row['viewers'] = stream['viewer_count'] | |
stream_rows.append(row) | |
print_table(['user', 'game', 'info'], stream_rows) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment