Skip to content

Instantly share code, notes, and snippets.

@linnil1
Created October 5, 2021 16:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save linnil1/5c2b98c0ea0f6766f979d1f8bbebea55 to your computer and use it in GitHub Desktop.
Save linnil1/5c2b98c0ea0f6766f979d1f8bbebea55 to your computer and use it in GitHub Desktop.
Github webhook by aiohttp
import asyncio
import hmac
import json
from aiohttp import web
# custom import
from datetime import datetime
from pprint import pprint
async def run(cmd):
return await (await asyncio.create_subprocess_shell(cmd)).wait()
async def webhook(request):
body = await request.content.read()
signature = request.headers.get('X-HUB-SIGNATURE-256', "=")
method, code = signature.split("=", 1)
if method != "sha256":
return web.json_response({"message": "Not sha256 hash function"},
status=401)
if not hmac.compare_digest(
hmac.new(secret.encode(), body, "sha256").hexdigest(), code):
return web.json_response({"message": "sha256 verification fail"},
status=401)
data = json.loads(body.decode())
callback(data)
return web.json_response({"message": "ok"})
# ## Custom Region Start ###
# A temporary queue that store git push data
# Hope it'll not break
git_pull_queue = []
secret = "123"
web_port = 80
def callback(data):
git_pull_queue.append(data)
async def check_events(app):
while True:
try:
if len(git_pull_queue):
print("Push event", datetime.now().strftime("%c"))
data = git_pull_queue.pop(0)
pprint(data)
await run("git pull")
except asyncio.CancelledError:
return
except BaseException as ex:
print("Error", ex, datetime.now().strftime("%c"))
await asyncio.sleep(3)
# ## Custom region End ###
async def start_background_tasks(app):
app['pull_listener'] = asyncio.create_task(check_events(app))
async def cleanup_background_tasks(app):
app['pull_listener'].cancel()
await app['pull_listener']
app = web.Application()
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
app.add_routes([web.post('/', webhook)])
web.run_app(app, port=web_port)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment