Skip to content

Instantly share code, notes, and snippets.

View mumbleskates's full-sized avatar
🤔

Kent Ross mumbleskates

🤔
  • 🌵
View GitHub Profile
# coding=utf-8
from itertools import product
adjacents = ('08', '124', '1253', '236', '1457', '24568', '3569', '478', '57890', '689')
adjacents = {str(i): s for i, s in enumerate(adjacents)}
def get_pins_itertools(observed):
for pins in product(*(adjacents[ch] for ch in observed)):
@mumbleskates
mumbleskates / pybullet.py
Last active April 25, 2016 21:11
Python extension plugin for weechat that pushes smart notifications to PushBullet. Notifications are grouped per channel, have increasing delays, preview the first few lines, go away when you have been active in the channel, and detect when notifications have been dismissed remotely.
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
try:
# noinspection PyUnresolvedReferences,PyShadowingBuiltins
str = unicode
except NameError:
# noinspection PyShadowingBuiltins,PyUnboundLocalVariable
str = str
# coding=utf-8
from math import log2, ceil
# valid chars for a url path component: a-z A-Z 0-9 .-_~!$&'()*+,;=:@
# for the default set here (base 72) we have excluded $'();:@
radix_alphabet = sorted(
"0123456789"
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
@mumbleskates
mumbleskates / heap.py
Last active July 11, 2023 12:15
Pure Python N-ary Heap implementation
# coding=utf-8
class NaryHeap(object):
"""implements an n-ary heap"""
def __init__(self, items=(), *, n=2, direction=min):
"""
create a new heap
# coding=utf-8
from mumblecode.multithreading import CloseableQueue
from queue import Queue, Empty
from threading import Thread
import requests
_STOP = object()
# coding=utf-8
from mumblecode.multithreading import CloseableQueue
from threading import Thread
from concurrent.futures import Future
import requests
class RequestFuturePool(object):
def __init__(self, pool_size=20, queue_size=64, rate_limiter=lambda: None):
# coding=utf-8
from collections import Mapping, Iterable
from operator import attrgetter
from struct import pack, unpack
__doc__ = """
Pure python tools for inspecting unknown protobuf data. Written for py3.6+.
Author: Kent Ross
@mumbleskates
mumbleskates / multithreading.py
Created November 28, 2018 22:06
closeable channel and threadsafe iterator wrapper
# coding=utf-8
from collections import deque
from queue import Empty, Full
from threading import Condition, RLock, Thread
from time import monotonic as now
from weakref import finalize
class ChannelClosed(Exception):
"""Exception raised when a channel has been closed."""
# coding=utf-8
"""Lazy hacks to enable easier usage of async functions, context managers, and iterators in the REPL."""
from asyncio import set_event_loop
try:
import uvloop as loop_maker
except ImportError:
import asyncio as loop_maker
loop = loop_maker.new_event_loop()
from threading import Thread
import requests
from multithreading import Channel
def fetch_all_with_pool(iterable_of_urls, pool_size=20):
"""
Fetches URLs with a thread pool.