Skip to content

Instantly share code, notes, and snippets.

View athoune's full-sized avatar

Mathieu Lecarme athoune

View GitHub Profile
@athoune
athoune / test_smtp.py
Created February 12, 2024 16:47
Test your SMTP settings
import smtplib
import base64
user = 'bob'
password = 'password'
# The server handle TLS connection, not the boring STARTTLS upgrade
server = smtplib.SMTP_SSL(host='smtp.example.com', timeout=5)
server.ehlo() # the server say its name and handled commands, you should see "AUTH PLAIN"
# The RFC for SMTP doesn't say wich text encoding must be used, smtplib use low ASCII, lets forge the command
@athoune
athoune / server.py
Created September 28, 2020 19:52
Route to healthy server
#!/usr/bin/env python3
import signal
import os
from flask import Flask, abort
from werkzeug.serving import run_simple
state = True
@athoune
athoune / cgroup_swap.py
Last active June 2, 2020 15:13
display swap and rss in cgroup services
#!/usr/bin/env python3
from pathlib import Path
def containerd():
for p in Path("/sys/fs/cgroup/memory/system.slice/containerd.service/").iterdir():
if not p.is_dir():
continue
print(p.name)
with (p / "memory.stat").open("r") as f:
@athoune
athoune / who_connect.py
Last active May 29, 2020 08:46
nia nia nia ssh public key
#!/usr/bin/env python3
import pwd
import sys
import re
import subprocess
import tempfile
import os
import io
from collections import namedtuple
@athoune
athoune / pubsub.go
Created December 17, 2018 23:08
Pubsub
package pubsub
import (
"sync"
)
type Pubsub struct {
channels map[string]map[int]*Channel
cpt int
lock sync.Mutex
@athoune
athoune / pubsub.py
Created November 2, 2018 16:16
aio pubsub pattern
from collections import defaultdict
from asyncio import gather
class Pubsub:
channels = defaultdict(list)
def add_handler(self, key: str, handler):
self.channels[key].append(handler)
@athoune
athoune / github_webhook.py
Last active October 15, 2022 13:51
aiohttp webhook for github
async def github_webhook(secret :str, request :web.Request) -> dict:
assert request.content_length < 1000000, "Request content too fat" # 1M
digest, signature = request.headers['X-HUB-SIGNATURE'].split("=", 1)
assert digest == "sha1", "Digest must be sha1" # use a whitelist
body = await request.content.read()
h = hmac.HMAC(bytes(secret, "UTF8"), msg=body, digestmod=digest)
assert h.hexdigest() == signature, "Bad signature"
return json.loads(body.decode('UTF8'))
@athoune
athoune / jsonrpc.py
Created January 7, 2018 21:57
jsonrpc2 with aiohttp
import logging
from jsonrpc.jsonrpc2 import JSONRPC20Request
def jsonrpcrequest(data):
logging.debug(data)
if isinstance(data, list):
for d in data:
yield JSONRPC20Request.from_data(d)
@athoune
athoune / sidekiq.lua
Last active December 11, 2017 09:50
Get all Sidekiq's queues length
local a={}
local queues=redis.call('smembers', 'resque:gitlab:queues')
for i,v in ipairs(queues) do
a[i*2-1] = 'queue:' .. v
a[i*2] = redis.call('llen', 'resqueue:gitlab:queue:' .. v)
end
return a
@athoune
athoune / acme.go
Created May 23, 2017 13:53
Steal certificates from Traefik
package acme
import (
"crypto/tls"
"encoding/base64"
"encoding/json"
"io/ioutil"
)
type Cert struct {