Skip to content

Instantly share code, notes, and snippets.

@mshroyer
Created April 8, 2014 20:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mshroyer/10190254 to your computer and use it in GitHub Desktop.
Save mshroyer/10190254 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""webhook
Implements web hooks for BitBucket, etc. on my Amazon EC2 instance.
"""
__author__ = 'Mark Shroyer'
from flask import Flask
from flask import request
import subprocess
from datetime import datetime, timedelta
app = Flask(__name__)
app.debug = True
class WebHookThrottle(object):
"""Web hook throttle
Allows us to impose a throttle on our S3 uploads so we don't get hit
with a huge bill if Bitbucket loses its mind...
"""
def __init__(self, period=timedelta(hours=1), threshold=5):
self.period = period
self.threshold = threshold
self.events = []
def submit(self):
now = datetime.utcnow()
self.events = filter(lambda evt: now - evt <= self.period, self.events)
if len(self.events) < self.threshold:
self.events.append(now)
else:
raise Exception("Web hook throttle threshold exceeded!")
pelican_throttle = WebHookThrottle(period=timedelta(hours=1), threshold=30)
@app.route('/pelican', methods=['POST'])
def pelican():
#app.logger.debug("POST for markshroyer.com webhook")
# Make sure the request actually came from Bitbucket
if request.remote_addr not in ["131.103.20.165", "131.103.20.166"]:
raise Exception("Invalid source IP address for pelican hook")
pelican_throttle.submit()
subprocess.check_call("/opt/webhook/pelican.sh", shell=True)
return ""
@app.route('/')
def root():
return "Web hook manager"
if __name__ == '__main__':
app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment