Created
August 4, 2016 18:04
-
-
Save joncle/bd32b9dbfbeb2054663054737a2d6381 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from bs4 import BeautifulSoup | |
from dispatcher import default_dispatcher | |
import json | |
import requests | |
import websockets | |
class Session(requests.Session): | |
""" | |
Handle cookies and automatically grab fkey on gets | |
and inject into post requests | |
""" | |
def __init__(self, url, chat, email, password, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
super().post(url, {'email': email, 'password': password}) | |
self.get(chat) | |
self.chat = chat | |
def get(self, *args, **kwargs): | |
res = super().get(*args, **kwargs) | |
soup = BeautifulSoup(res.content, 'html.parser') | |
fkey = soup.find(id='fkey') | |
if fkey: | |
self.fkey = fkey['value'] | |
return res | |
def post(self, url, data=None, *args, **kwargs): | |
if isinstance(data, dict): | |
data.setdefault('fkey', self.fkey) | |
return super().post(url, data, *args, **kwargs) | |
class ChatRoomBase: | |
def __init__(self, room_id, session, dispatcher=default_dispatcher): | |
self.room_id = room_id | |
self.session = session | |
self.dispatcher = dispatcher | |
def get_ws_url(self): | |
return self.session.post( | |
'{}/ws-auth'.format(self.session.chat), | |
{'roomid': self.room_id} | |
).json()['url'] | |
def say(self, text): | |
self.session.post( | |
'{}/chats/{}/messages/new'.format(self.session.chat, self.room_id), | |
{'text': text} | |
) | |
# | |
# add more here... (kick mute etc that are specific to a room) | |
# | |
@asyncio.coroutine | |
def process_events(self, message, *args, **kwargs): | |
data = json.loads(message) | |
for channel, info in data.items(): | |
if 'r{}'.format(self.room_id) != channel: | |
continue | |
for event in info.get('e', []): | |
event_type = event['event_type'] | |
self.dispatcher(self, event_type)(event, *args, **kwargs) | |
rabbit = Session( | |
'https://stackoverflow.com/users/login', | |
'https://chat.stackoverflow.com', | |
'XXX', # email | |
'YYY' # password | |
) | |
class ChatLogger: | |
def on_message(*args, **kwargs): | |
print(args, kwargs) | |
class SandBoxRoom(ChatRoomBase, ChatLogger): | |
pass | |
sandbox_room = SandBoxRoom(1, rabbit) | |
@asyncio.coroutine | |
def chat_handler(room=sandbox_room): | |
ws = yield from websockets.connect( | |
sandbox_room.get_ws_url() + '?l=99999999', | |
origin=room.session.chat | |
) | |
while True: | |
msg = yield from ws.recv() | |
data = yield from room.process_events(msg) | |
asyncio.get_event_loop().run_until_complete(chat_handler(sandbox_room)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment