Skip to content

Instantly share code, notes, and snippets.

@toabctl
Created August 8, 2015 08:08
Show Gist options
  • Save toabctl/649d7a0c3272992b131b to your computer and use it in GitHub Desktop.
Save toabctl/649d7a0c3272992b131b to your computer and use it in GitHub Desktop.
#!/usr/bin/python
from __future__ import print_function
import argparse
import logging
import collections
import os
import json
import requests
import tempfile
import time
import sys
class StateFileStore(object):
"""Store some headers from github requests"""
def __init__(self, org):
self.headers = {}
self.org = org
self._state_filename = "gh-event-handler_%s.json" % org
# FIXME(toabctl): it's stupid to use tempdir to store data!
self._state_datadir = tempfile.gettempdir()
if os.environ.has_key('XDG_CACHE_HOME'):
self._state_datadir = os.environ['XDG_CACHE_HOME']
def write(self):
dest = os.path.join(self._state_datadir, self._state_filename)
with open(dest, "w") as f:
json.dump(self.headers, f)
def read(self):
src = os.path.join(self._state_datadir, self._state_filename)
if os.path.exists(src):
with open(src, "r") as f:
self.headers = json.load(f)
def handle_events(events_list):
"""handle a list of events"""
for e in events_list:
logging.debug(
"Received event '%s' for '%s' from '%s'" % (
e["type"], e["repo"]["name"], e["actor"]["login"]))
#if e["type"] == "PullRequestEvent":
# print(e["repo"]["name"], e["org"]["login"])
#print(e.keys())
#elif e["type"] == "PushEvent":
# print("Push detected")
def get_events(org, state):
EventsInfo = collections.namedtuple("Events", ["event_list", "next_run"])
# custom headers we want to set
headers = {}
if "etag" in state.headers.keys():
headers["If-None-Match"] = state.headers["etag"]
url = "https://api.github.com/orgs/%s/events" % org
r = requests.get(url, headers=headers)
if r.status_code == 304:
# not modified (and does not count for the rate-limit), so
# rerun shortly
return EventsInfo([], 5)
elif r.status_code == 200:
# remember some specific headers
state.headers = {k: r.headers[k] for k in (
'etag', 'x-poll-interval')}
# process events
data = r.json()
return EventsInfo(data, int(r.headers['x-poll-interval']))
else:
raise Exception(
"'%s' while trying to access '%s'" % (
r.status_code, url))
def handle_args():
parser = argparse.ArgumentParser(description="process github events for a "
"given organization")
parser.add_argument("org", help="name of the github organization")
parser.add_argument("--debug", action="store_true", help="debugging")
return vars(parser.parse_args())
def setup_logging(debug=False):
"""basic logging setup"""
root = logging.getLogger()
if debug:
root.setLevel(logging.DEBUG)
else:
root.setLevel(logging.INFO)
ch = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
root.addHandler(ch)
if __name__ == "__main__":
args = handle_args()
setup_logging(debug=args["debug"])
# init state
state = StateFileStore(args["org"])
state.read()
while True:
events_info = get_events(args["org"], state)
logging.debug("recieved %s events" % (
len(events_info.event_list)))
handle_events(events_info.event_list)
# remember what we already did
state.write()
logging.debug("wait %ss before next run" % (events_info.next_run))
time.sleep(events_info.next_run)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment