Skip to content

Instantly share code, notes, and snippets.

@blakev
Created May 19, 2016 23:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save blakev/dc39e4747e847c034705c7d9c64c74e8 to your computer and use it in GitHub Desktop.
Save blakev/dc39e4747e847c034705c7d9c64c74e8 to your computer and use it in GitHub Desktop.
import os
import re
import sys
import json
import time
import requests
from bs4 import BeautifulSoup
from requests_oauthlib import OAuth1Session
from jinja2.environment import Environment
from jinja2.loaders import FileSystemLoader
from tornado.web import Application, RequestHandler, StaticFileHandler, url
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.httpclient import AsyncHTTPClient
from tornado.websocket import WebSocketHandler
CONFIG = {
'port': 8888,
'host': '0.0.0.0',
'connection': 'localhost:8888'
}
USER_CONF = json.load(open(r'user-settings.json', 'r'))
CWD = os.getcwd()
httpclient = AsyncHTTPClient()
class RelayWebSocket(WebSocketHandler):
all_stats = dict()
clients = set()
def open(self):
self.clients.add(self)
self.write_message(json.dumps(self.all_stats))
def on_close(self):
self.clients.remove(self)
@classmethod
def propagate(cls, data):
for provider, d in data.items():
cls.all_stats[provider] = d
for c in cls.clients:
c.write_message(json.dumps(data))
class TemplateRender(object):
def render_template(self, name, **kwargs):
template_path = self.settings.get('template_path', os.path.join(CWD, 'templates'))
env = Environment(loader=FileSystemLoader([template_path]))
template = env.get_template(name)
# bootstrap
settings = dict(self.settings)
settings.update(CONFIG)
kwargs['settings'] = settings
kwargs['user'] = dict(USER_CONF)
return template.render(kwargs)
class IndexHandler(TemplateRender, RequestHandler):
def get(self):
self.write(self.render_template('index.html', stats=RelayWebSocket.all_stats))
def social_response(provider, response):
response['time'] = int(time.time())
data = {provider: response}
RelayWebSocket.propagate(data)
def create_tasks():
def get_youtube():
def process_youtube(response):
resp = json.loads(response.body)
data = resp.get('items', [])[0].get('statistics')
# limit page data
data = {'subscriberCount': data.get('subscriberCount', -1)}
social_response('youtube', data)
data = USER_CONF['services']['youtube']
url = data.get('url').format(channel=data['channel'],api_key=data['api_key'])
httpclient.fetch(url, process_youtube)
def get_twitter():
def process_twitter(response):
data = json.loads(response.content)
# limit page data
data = {'followers': int(data.get('followers_count', -1))}
social_response('twitter', data)
data = USER_CONF['services']['twitter']
twitter = OAuth1Session(
data.get('client_key'),
client_secret=data.get('client_secret'),
resource_owner_key=data.get('owner_key'),
resource_owner_secret=data.get('owner_secret'))
response = twitter.get(data.get('url').format(account=data.get('account')))
process_twitter(response)
def get_instagram():
def process_instagram(response):
content = response.content
objs = IG_FOLLOW_REGEX.findall(content)
if objs:
data = json.loads(objs[0])
data = {'followers': data.get('count', -1)}
social_response('instagram', data)
data = USER_CONF['services']['instagram']
resp = requests.get(data.get('url').format(account=data.get('account')))
if resp.status_code == 200:
process_instagram(resp)
IG_FOLLOW_REGEX = re.compile(r'\"followed_by\":([\{\}\":a-z\d]+),')
funcs = [
get_youtube,
get_twitter,
get_instagram
]
# pre-run
for f in funcs:
f()
tasks = [PeriodicCallback(f, USER_CONF.get('variables').get('rotation')) for f in funcs]
# scheduled-run
for t in tasks:
t.start()
return tasks
def make_app():
config = {
'debug': False,
'compress_response': True,
'static_hash_cash': False,
'template_path': 'templates'
}
app = Application([
url(r'/static/(.*)', StaticFileHandler, {'path': os.path.join(CWD, 'static')}),
url(r'/ws', RelayWebSocket),
url(r'/', IndexHandler)
], **config)
app.tasks = create_tasks()
return app
if __name__ == '__main__':
app = make_app()
app.listen(CONFIG.get('port'), CONFIG.get('host'))
try:
IOLoop.current().start()
except KeyboardInterrupt as e:
sys.exit(1)
except Exception as e:
raise e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment