Skip to content

Instantly share code, notes, and snippets.

@cphrmky
Forked from raunakdoesdev/principessa.py
Created January 31, 2019 22:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cphrmky/65802859d118ba445bc5bbf614155c50 to your computer and use it in GitHub Desktop.
Save cphrmky/65802859d118ba445bc5bbf614155c50 to your computer and use it in GitHub Desktop.
from sseclient import SSEClient
import requests
from queue import Queue
import json
import threading
import socket
class ClosableSSEClient(SSEClient):
def __init__(self, *args, **kwargs):
self.should_connect = True
super(ClosableSSEClient, self).__init__(*args, **kwargs)
def _connect(self):
if self.should_connect:
super(ClosableSSEClient, self)._connect()
else:
raise StopIteration()
def close(self):
self.should_connect = False
self.retry = 0
try:
self.resp.raw._fp.fp._sock.shutdown(socket.SHUT_RDWR)
self.resp.raw._fp.fp._sock.close()
except AttributeError:
pass
class RemoteThread(threading.Thread):
def __init__(self, parent, URL, function):
self.function = function
self.URL = URL
self.parent = parent
super(RemoteThread, self).__init__()
def run(self):
try:
self.sse = ClosableSSEClient(self.URL)
for msg in self.sse:
msg_data = json.loads(msg.data)
if msg_data is None: # keep-alives
continue
msg_event = msg.event
# TODO: update parent cache here
self.function((msg.event, msg_data))
except socket.error:
pass # this can happen when we close the stream
except KeyboardInterrupt:
self.close()
def close(self):
if self.sse:
self.sse.close()
def firebaseURL(URL):
if '.firebaseio.com' not in URL.lower():
if '.json' == URL[-5:]:
URL = URL[:-5]
if '/' in URL:
if '/' == URL[-1]:
URL = URL[:-1]
URL = 'https://' + \
URL.split('/')[0] + '.firebaseio.com/' + URL.split('/', 1)[1] + '.json'
else:
URL = 'https://' + URL + '.firebaseio.com/.json'
return URL
if 'http://' in URL:
URL = URL.replace('http://', 'https://')
if 'https://' not in URL:
URL = 'https://' + URL
if '.json' not in URL.lower():
if '/' != URL[-1]:
URL = URL + '/.json'
else:
URL = URL + '.json'
return URL
class subscriber:
def __init__(self, URL, function):
self.cache = {}
self.remote_thread = RemoteThread(self, firebaseURL(URL), function)
def start(self):
self.remote_thread.start()
def stop(self):
self.remote_thread.close()
self.remote_thread.join()
def wait(self):
self.remote_thread.join()
class FirebaseException(Exception):
pass
def put(URL, msg):
to_post = json.dumps(msg)
response = requests.put(firebaseURL(URL), data=to_post)
if response.status_code != 200:
raise FirebaseException(response.text)
def patch(URL, msg):
to_post = json.dumps(msg)
response = requests.patch(firebaseURL(URL), data=to_post)
if response.status_code != 200:
raise FirebaseException(response.text)
def get(URL):
response = requests.get(firebaseURL(URL))
if response.status_code != 200:
raise FirebaseException(response.text)
return json.loads(response.text)
def push(URL, msg):
to_post = json.dumps(msg)
response = requests.post(firebaseURL(URL), data=to_post)
if response.status_code != 200:
raise Exception(response.text)
import os
from flask import abort, Flask, jsonify, request
import pytz
from datetime import datetime
app = Flask(__name__)
def is_request_valid(request):
is_token_valid = request.form['token'] == 'nsui4GlruL6QTRfuwO5nmobR'
is_team_id_valid = request.form['team_id'] == 'T8CBT81JP'
return is_token_valid and is_team_id_valid
@app.route('/glass', methods=['POST'])
def glass():
if not is_request_valid(request):
abort(400)
try:
amount = int(request.form.get('text'))
except ValueError:
amount = 1
translator = {'spottedfeather.02' : 'Principessa', 'sauhaarda' : 'Raunak', 'sauhaardammorpg' : 'hagu'}
username = translator[request.form.get('user_name')]
URL = 'retirementclub-2e565'
day = datetime.now(pytz.timezone('US/Pacific')).strftime('%Y-%m-%d')
response = ''
current = get(URL + '/' + day)
if current is None or username not in current:
response += 'Wow! It\'s your first drinker today! :clap: :clap: for your :potable_water:!\n'
current = {username : 0}
current[username] += amount
it = iter([(k, current[k]) for k in sorted(current, key=current.get, reverse=True)])
max_user, max_glasses = next(it)
if max_user == username:
response += "Nice! You're in the lead with {} glasses. Keep up the LOVELY work! Keep your bladder in check though :laughing:!\n".format(max_glasses)
if len(current) > 1:
s_user, s_glasses = next(it)
response += "{} is in 2nd (aka last lol) place with {} glasses, keep drinking ;)\n".format(s_user, s_glasses)
else:
response += "You've drunk {} glasses of water, but {} is in first place with {} glasses. You've got this! Keep drinking, hopefully hyperhydroxia isn't a thing ;)\n".format(current[username], max_user, max_glasses)
patch(URL + '/' + day, current)
return jsonify(
response_type='in_channel',
text=response,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment