Skip to content

Instantly share code, notes, and snippets.

@athoune
athoune / server.py
Created Sep 28, 2020
Route to healthy server
View server.py
#!/usr/bin/env python3
import signal
import os
from flask import Flask, abort
from werkzeug.serving import run_simple
state = True
@athoune
athoune / gist:01a739ed722257e613e1
Created Nov 10, 2014
Sending Luigi failure to Sentry
View gist:01a739ed722257e613e1
import os
import sys
import luigi
from raven import Client
client = Client()
# Define your Luigi tasks here
@luigi.Task.event_handler(luigi.Event.FAILURE)
@athoune
athoune / cgroup_swap.py
Last active Jun 2, 2020
display swap and rss in cgroup services
View cgroup_swap.py
#!/usr/bin/env python3
from pathlib import Path
def containerd():
for p in Path("/sys/fs/cgroup/memory/system.slice/containerd.service/").iterdir():
if not p.is_dir():
continue
print(p.name)
with (p / "memory.stat").open("r") as f:
@athoune
athoune / github_webhook.py
Last active Jun 2, 2020
aiohttp webhook for github
View github_webhook.py
async def github_webhook(secret :str, request :web.Request) -> dict:
assert request.content_length < 1000000, "Request content too fat" # 1M
digest, signature = request.headers['X-HUB-SIGNATURE'].split("=", 1)
assert digest == "sha1", "Digest must be sha1" # use a whitelist
body = await request.content.read()
h = hmac.HMAC(bytes(secret, "UTF8"), msg=body, digestmod=digest)
assert h.hexdigest() == signature, "Bad signature"
return json.loads(body.decode('UTF8'))
@athoune
athoune / who_connect.py
Last active May 29, 2020
nia nia nia ssh public key
View who_connect.py
#!/usr/bin/env python3
import pwd
import sys
import re
import subprocess
import tempfile
import os
import io
from collections import namedtuple
@athoune
athoune / mailq.py
Last active Oct 17, 2019
Parsing mailq format (from Postfix 2.x)
View mailq.py
#!/usr/bin/env python3
import subprocess
import re
from pypred import Predicate
SPACE = re.compile(r"\s+")
def mailq():
@athoune
athoune / ping.py
Created Oct 10, 2016
async ping for python 3
View ping.py
#!/usr/bin/env python3
import asyncio
@asyncio.coroutine
def ping(loop, target, dump=False):
create = asyncio.create_subprocess_exec('ping', '-c', '10', target,
stdout=asyncio.subprocess.PIPE)
proc = yield from create
@athoune
athoune / Dockerfile
Created Sep 5, 2016
Dockerfile for tmate-slave with Archlinux
View Dockerfile
FROM base/archlinux
RUN pacman-key --populate archlinux &&\
pacman-key --refresh-keys &&\
pacman-key -r arojas@us.es
RUN pacman -Sy
RUN yes | pacman -Sy libevent libssh libutempter msgpack-c openssl zlib automake cmake ruby autoconf gcc pkg-config make libunistring git
@athoune
athoune / metaping.py
Created Jun 28, 2016
Ping all the ping
View metaping.py
#!/usr/bin/env python3
import os
import asyncio
import aiohttp
from aiohttp import web
@asyncio.coroutine
def do_ping(session, domain):
View pubsub.go
package pubsub
import (
"sync"
)
type Pubsub struct {
channels map[string]map[int]*Channel
cpt int
lock sync.Mutex