Skip to content

Instantly share code, notes, and snippets.

View luhn's full-sized avatar

Theron Luhn luhn

View GitHub Profile
@luhn
luhn / README.md
Last active May 9, 2023 20:25
Celery SQS Error

Celery SQS bug

  1. Install dependencies with pip install -r requirements.txt
  2. Fill up the queue by running python tasks.py 500
  3. Start Celery with celery -A tasks worker --loglevel=INFO -c 3
  4. Disconnect computer from internet until you receive a CANCEL_TASKS_BY_DEFAULT warning. This may take a couple minutes and you'll see several errors, but you have to wait until the warning.
  5. Reconnect to the internet. After a couple seconds, Celery will reconnect and tasks will start flowing again.
  6. Let tasks run for about a minute. You'll notice a slight slowdown and then suddenly tasks will stop entirely for a couple minutes, before resuming again.
@luhn
luhn / encoder.py
Last active September 6, 2022 20:56
Override default JSON encoder to add support for dataclasses
import json
from dataclasses import dataclass, is_dataclass, asdict
class DataclassEncoder(json.JSONEncoder):
def default(self, obj):
if is_dataclass(obj):
return asdict(obj)
return super().default(obj)
@luhn
luhn / abcbench.py
Created July 13, 2022 19:47
ABC Benchmark
import timeit
from abc import ABC, abstractmethod
class A(ABC):
@abstractmethod
def a(self):
...
@luhn
luhn / mount.sh
Created April 5, 2021 21:21
Mount NVMe EBS volume
# Mount the data disk
# Adapted from https://gist.github.com/ctompkinson/30f570882af38879b36fc7bffe3d1a60
device=/dev/sdh
mount_point=/monitor-config
apt-get install -y nvme-cli
while [ true ]; do
for blkdev in $(nvme list | awk '/^\/dev/ { print $1 }'); do
mapping=$(nvme id-ctrl --raw-binary "${blkdev}" | cut -c3073-3104 | tr -s ' ' | sed 's/ $//g')
if [ ${mapping} = ${device} ]; then
echo "Found $device on $blkdev"

Keybase proof

I hereby claim:

  • I am luhn on github.
  • I am luhn (https://keybase.io/luhn) on keybase.
  • I have a public key whose fingerprint is 8E57 AB5E 6FB9 441F 92C4 C3D8 9234 9742 8E86 184E

To claim this, I am signing this object:

@luhn
luhn / docker-compose.yml
Last active June 11, 2019 22:32
Fluentd Keepalive Bug
version: "3.7"
services:
src:
build:
context: .
dockerfile: src.Dockerfile
dst:
build:
import sys
import inspect
def wrap(func):
attach()
return func
def attach():
@luhn
luhn / auth.py
Created November 20, 2018 07:09
# Interfaces
class IUserIdentity:
userid = None
class IIdentityPolicy:
def __call__(request):
""" Return the claimed identity of the user associated with the given
@luhn
luhn / data.txt
Last active April 28, 2017 17:36
InfluxDB issue #8164
mymeasure,context=baz,view=3 duration=0.743992881887 1493397808910022759
mymeasure,context=foo,view=4 duration=0.340297307361 1493397809442500681
mymeasure,context=bar,view=2 duration=0.351295066647 1493397809994147837
mymeasure,context=bar,view=1 duration=0.485869769473 1493397810730819905
mymeasure,context=bar,view=1 duration=0.104839181702 1493397810993959410
@luhn
luhn / traversal_paths.py
Created April 12, 2017 00:25
URL traversal sitemap
"""
Inspect a Pyramid app using URL traversal and construct URLs for all the views.
"""
def get_resources(self, root):
resources = {
type(root): root,
}
for resource in root.values():
resources.update(self.get_resources(resource))