Skip to content

Instantly share code, notes, and snippets.

@segfault87
Last active October 8, 2019 13:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save segfault87/47b8721108cce2c02de310661b666a65 to your computer and use it in GitHub Desktop.
Save segfault87/47b8721108cce2c02de310661b666a65 to your computer and use it in GitHub Desktop.
dong ozum bunga
# -*- coding: utf-8 -*-
#
# OpenRCT2 Twitch API Server
# (c) 2018 Fun-boong-e <https://tgd.kr/funzinnu>
#
# Complies BSD license.
#
# How to run:
#
# $ pip install flask gevent requests websocket-client
# $ export TS_OAUTH_KEY=oauth:xxx -- obtain one at https://twitchapps.com/tmi/
# $ export TS_CLIENT_ID=xxxxx -- make one at Twitch developer console
# $ python openrct2_twitch_server.py -H 0.0.0.0 -p 8000
#
# Alternatively, you can run via standalone WSGI server such as gunicorn.
# $ gunicorn -b 0.0.0.0:8000 openrct2_twitch_server:app
from gevent.monkey import patch_all; patch_all() # noqa
import argparse
import collections
import enum
import logging
import os
import re
import sys
import typing
from flask import Flask, abort, jsonify
from gevent import sleep, spawn, spawn_later
from requests import get
from websocket import WebSocketApp
USER_NAME = 'openrct2_bot'
OAUTH_KEY = os.getenv('TS_OAUTH_KEY')
CLIENT_ID = os.getenv('TS_CLIENT_ID')
if not OAUTH_KEY:
raise RuntimeError('No environment variable: TS_OAUTH_KEY')
if not CLIENT_ID:
raise RuntimeError('No environment variable: TS_CLIENT_ID')
MAX_USER_COUNT = 100
MAX_NEWS_ENTRY_COUNT = 10
MAX_FOLLOWERS_COUNT = 1500
TMI_WS_ENDPOINT = 'ws://irc-ws.chat.twitch.tv'
logger = logging.getLogger('openrct2_twitch_api_server')
IrcLine = collections.namedtuple('IrcLine', ['ident', 'command', 'parts'])
class RateLimitException(Exception):
pass
class ParseError(Exception):
pass
def extract_irc_line(raw: str) -> IrcLine:
parts = raw.split(' ', 2)
if len(parts) < 1:
raise ParseError(f'Unrecognized line: {raw}')
if parts[0].startswith(':'):
ident = parts[0][1:]
base = 1
else:
ident = None
base = 0
if len(parts) < base + 2:
return IrcLine(ident, parts[base], None)
def parse(part):
buf = ''
whitespace = False
fill_remaining = False
for c in part:
if c == ' ' and not fill_remaining:
if buf:
yield buf
buf = ''
whitespace = True
elif c == ':' and whitespace and not fill_remaining:
fill_remaining = True
whitespace = False
else:
buf += c
whitespace = False
if buf:
yield buf
return IrcLine(ident, parts[base], [x for x in parse(parts[base+1])])
def normalize_channel_name(name: str) -> str:
if name.startswith('#'):
return name.lower()
else:
return f'#{name.lower()}'
def extract_username(ident: str) -> str:
match = re.match(r':?(\w+)!\w+@[\w\.]+', ident)
if match:
return match.group(1)
# A very, very stripped down IRC client.
class IrcClient:
EXPONENTIAL_BACKOFF_UPPER_BOUND = 30
def on_message(self, ws, message):
messages = message.split('\r\n')
for message in messages:
message = message.strip()
if not message:
continue
try:
line = extract_irc_line(message)
except ParseError as e:
logger.warning(e)
if line.command in self.handlers:
if line.parts is None:
self.handlers[line.command](self, line.ident)
else:
self.handlers[line.command](self, line.ident, *line.parts)
else:
self.logger.info(f'Unhandled line: {message}')
def on_error(self, ws, error):
self.logger.error(f'Error occurred: {error}')
self.retries += 1
def on_close(self, ws):
self.set_connected(False)
def on_open(self, ws):
self.retries = 0
logger.info('Connection established.')
self.ws.send('CAP REQ :twitch.tv/commands twitch.tv/membership')
self.ws.send(f'PASS {OAUTH_KEY}')
self.ws.send(f'NICK {USER_NAME}')
self.ws.send(f'USER {USER_NAME} 8 * :{USER_NAME}')
def send(self, command):
if self.connected:
self.ws.send(command)
else:
self.backlog.append(command)
def _connect(self):
while True:
self.ws.on_open = self.on_open
self.ws.run_forever()
secs = min(self.EXPONENTIAL_BACKOFF_UPPER_BOUND, 2 ** self.retries)
self.logger.info(f'Reestablishing connection in {secs} secs...')
sleep(secs)
def set_connected(self, connected):
if not self.connected and connected:
for command in self.backlog:
self.ws.send(command)
self.backlog.clear()
self.connected = connected
def connect(self):
spawn(self._connect)
def on(self, type: str):
def decorator(f):
self.handlers[type] = f
return f
return decorator
def join(self, channel):
self.send(f'JOIN {normalize_channel_name(channel)}')
def part(self, channel):
self.send(f'PART {normalize_channel_name(channel)}')
def pong(self, server_ident):
self.send(f'PONG :{server_ident}')
def privmsg(self, channel, msg):
self.send(f'PRIVMSG {channel} :{msg}')
def __init__(self):
self.logger = logging.getLogger('irc_client')
self.retries = 0
self.handlers = {}
self.connected = False
self.backlog = []
self.ws = WebSocketApp(
TMI_WS_ENDPOINT,
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
irc = IrcClient()
channels = {}
@irc.on('001')
def handle_001(client: IrcClient, ident, username, msg):
client.username = username
client.logger.info(f'Signed in as {username}')
@irc.on('PING')
def handle_ping(client: IrcClient, ident, server_ident):
client.pong(server_ident)
@irc.on('GLOBALUSERSTATE')
def handle_globaluserstate(client: IrcClient, ident):
# Rejoin all the channels if connection is lost
for channel in channels:
client.join(channel)
client.set_connected(True)
@irc.on('JOIN')
def handle_join(client: IrcClient, ident, channel_name):
channel_name = normalize_channel_name(channel_name)
username = extract_username(ident)
if username == client.username:
if channel_name not in channels:
channel = Channel(channel_name)
channels[channel_name] = channel
logger.info(f'Joined channel {channel_name}')
else:
channel = channels[channel_name]
channel.status = ChannelStatus.connecting
channel.clear_audiences()
else:
if channel_name in channels:
channel = channels[channel_name]
channel.append_audiences([username])
if len(channel.join_window) == 0:
spawn_later(0.1, lookup_user_batch, channel, 'login')
channel.add_join_window(username)
@irc.on('PART')
def handle_part(client: IrcClient, ident, channel_name):
channel_name = normalize_channel_name(channel_name)
username = extract_username(ident)
if username == client.username:
if channel_name in channels:
del channels[channel_name]
else:
if channel_name in channels:
channels[channel_name].remove_audience(username)
@irc.on('PRIVMSG')
def handle_privmsg(client: IrcClient, ident, channel_name, msg):
channel_name = normalize_channel_name(channel_name)
if channel_name in channels and msg.lower().startswith('!news '):
channel = channels[channel_name]
while len(channel.newses) > MAX_NEWS_ENTRY_COUNT:
channel.newses.pop()
username = extract_username(ident)
channel.newses.appendleft(f'!news {username}: {msg[6:]}')
client.privmsg(channel_name, f'@{username} OK')
@irc.on('353')
def handle_names(client: IrcClient, ident, username, _, channel, names):
channels[channel].append_audiences(names.split())
@irc.on('366')
def handle_end_names(client: IrcClient, ident, username, channel_name, msg):
channel = channels[channel_name]
channel.status = ChannelStatus.connected
spawn(lookup_user_cached, channel.audiences, 'login')
user_info = lookup_user([channel_name.replace('#', '')], 'login')[0]
# Start fetching followers simultaneously
spawn(fetch_followers_cached, channel, user_info)
irc.connect()
# Caching data in memory is usually a bad idea (from scale-out standpoint)
# Should use independent key-value store e.g., redis.
ids = {}
users = {}
class ChannelStatus(enum.Enum):
connecting = 'connecting'
connected = 'connected'
class Channel:
def __init__(self, name: str):
self.status = ChannelStatus.connecting
self.name = name
self.followers = []
self.followers_set = set()
self.audiences = []
self.audiences_set = set()
self.join_window = []
self.newses = collections.deque()
def clear_audiences(self):
self.audiences.clear()
self.audiences_set.clear()
def append_followers(self, followers: typing.Sequence[str]):
for i in followers:
if i not in self.followers_set:
self.followers.append(i)
self.followers_set.add(i)
def append_audiences(self, audiences: typing.Sequence[str]):
for i in audiences:
if i not in self.audiences_set:
self.audiences.append(i)
self.audiences_set.add(i)
def remove_follower(self, follower: str):
if follower in self.followers_set:
self.followers.remove(follower)
self.followers_set.remove(follower)
def remove_audience(self, audience: str):
if audience in self.audiences_set:
self.audiences.remove(audience)
self.audiences_set.remove(audience)
def add_join_window(self, username: str):
if username not in self.join_window:
self.join_window.append(username)
def consume_join_window(self):
t = self.join_window
self.join_window = []
return t
def chunks(l: typing.Sequence[str], n: int):
for i in range(0, len(l), n):
yield l[i:i + n]
def lookup_user(u: list, parameter: str):
assert len(u) <= MAX_USER_COUNT
logger.info(f'looking up users: {u}')
qs = '&'.join([f'{parameter}={i}' for i in u])
url = f'https://api.twitch.tv/helix/users?{qs}'
r = get(url, headers={'Client-ID': CLIENT_ID})
if r.status_code == 429:
raise RateLimitException()
elif r.status_code != 200:
logger.error(f'Twitch API error: {r.status_code} {r.text}')
return None
json = r.json()
return json['data']
def lookup_user_cached(u: list, parameter: str):
if parameter not in ('login', 'id'):
raise ValueError('parameter should be login or id.')
if parameter == 'login':
store = users
else:
store = ids
c = chunks([i for i in u if i not in store], MAX_USER_COUNT)
for n, i in enumerate(c):
if n > 0:
sleep(0.5) # a crude way to mitigate Twitch API rate limit
try:
result = lookup_user(i, parameter)
for i in result:
ids[i['id']] = i
users[i['login']] = i
except RateLimitException:
logger.error('Twitch API rate limit exceeded. Will try later.')
spawn_later(10, lookup_user_cached, i, parameter)
def lookup_user_batch(channel: Channel, parameter: str):
c = channel.consume_join_window()
if len(c) > 0:
lookup_user_cached(c, parameter)
def fetch_followers(id: str, after: str=''):
url = f'https://api.twitch.tv/helix/users/follows?to_id={id}' \
f'&after={after}&first={MAX_USER_COUNT}'
r = get(url, headers={'Client-ID': CLIENT_ID})
if r.status_code == 429:
raise RateLimitException()
elif r.status_code != 200:
logger.error(f'Twitch API error: {r.status_code} {r.text}')
return None
json = r.json()
return json
def fetch_followers_cached(channel: Channel, user_data: dict, after: str=''):
has_more = True
while has_more:
try:
result = fetch_followers(user_data['id'], after)
except RateLimitException:
spawn_later(10, fetch_followers_cached, channel, user_data, after)
break
if result is None:
break
after = result.get('pagination') and result['pagination'].get('cursor')
has_more = bool(after)
ids = [i['from_id'] for i in result['data']]
channel.append_followers(ids)
sleep(0.5)
lookup_user_cached(ids, 'id')
if len(channel.followers) > MAX_FOLLOWERS_COUNT:
break
elif has_more:
sleep(0.5)
app = Flask(__name__)
@app.route('/join/<channel>')
def join_channel(channel: str):
irc.join(channel)
return jsonify(status=200)
@app.route('/leave/<channel>')
def leave_channel(channel: str):
irc.part(channel)
return jsonify(status=200)
@app.route('/channel/<channel_name>/audience')
def audiences(channel_name: str):
channel_name = normalize_channel_name(channel_name)
if channel_name not in channels:
irc.join(channel_name)
return jsonify([])
channel = channels[channel_name]
while (channel.status is ChannelStatus.connecting and
len(channel.audiences) + len(channel.followers) == 0):
sleep(5) # Wait until audience/followers list is sufficient
result = []
audience_ids = set()
for i in channel.audiences:
if i in users:
u = users[i]
audience_ids.add(u['id'])
result.append({
'name': u['display_name'],
'inChat': True,
# Needs extra API call. So let's estimate.
'isFollower': u['id'] in channel.followers_set,
'isMod': False
})
for i in channel.followers:
if i in ids and i not in audience_ids:
u = ids[i]
result.append({
'name': u['display_name'],
'inChat': False,
'isFollower': True,
'isMod': False
})
return jsonify(result)
@app.route('/channel/<channel_name>/messages')
def messages(channel_name: str):
channel_name = normalize_channel_name(channel_name)
if channel_name not in channels:
irc.join(channel_name)
return jsonify([])
channel = channels[channel_name]
result = []
for news in channel.newses:
result.append({'message': news})
channel.newses.clear()
return jsonify(result)
def main():
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--host', type=str, default='127.0.0.1')
parser.add_argument('-p', '--port', type=int, default=8000)
parser.add_argument('-d', '--debug', action='store_true')
args = parser.parse_args()
app.run(debug=args.debug, host=args.host, port=args.port,
use_reloader=args.debug)
if __name__ == '__main__':
main()
@Lastorder-DC
Copy link

Lastorder-DC commented May 30, 2018

다 좋은데 구현이 틀린부분이 있습니다

  1. !news가 들어가 있지 않은 채팅은 뽑아낼 필요 없음.
  2. !news 사용시 채팅으로 피드백이 있어야됨
  3. !news가 들어간 메세지를 그대로 넣으면 안되고 !news 말한시청자명: 메세지처럼 가공해서 넣어야됨
  4. /messages가 호출되고나서는 리스트를 비워줘야 됨. 안그러면 계속 똑같은 메세지가 보내짐. 채팅 메세지를 그대로 가져오기보다는 채팅을 받았을때 !news가 있으면 리스트에 넣고, /messages가 호출되었을때 비워주는 식으로 구현하면됨

@segfault87
Copy link
Author

@Lastorder-DC 고쳤습니다. 캄사요!

@Lastorder-DC
Copy link

방송보다 발견한 문제점

트위치 닉네임이 길면 트위치에서 닉네임을 가져와 적용->잘려서 적용됨->다음에 닉네임 가져올때 같은 닉네임이 없으니까 또 적용->계속 반복 루트를 타서 같은 닉네임이 계속 증식합니다. 오픈롤코에서 수정해주는게 제일 깔끔하긴한데 안그래도 한글닉네임 맛간거 못고치고 있는데 이걸 빠른시일내 고칠수있을것같질 않아서 서버쪽에서 고치는게 빠를것같네요

닉네임 가져올때 오픈롤코 최대손님이름 길이로 닉네임 일괄 자르기하면 될것같습니다

@segfault87
Copy link
Author

네. 저도 방금 방송 보고 확인했는데 중복체크가 제대로 되고 있지 않더군요. 일단 OpenRCT2에서 중복체크를 어떻게 하는지 확인해보고 조치하도록 하겠습니다.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment