Skip to content

Instantly share code, notes, and snippets.

Kent Ross mumbleskates

Block or report user

Report or block mumbleskates

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
View fetch_with_pool.py
from threading import Thread
import requests
from multithreading import Channel
def fetch_all_with_pool(iterable_of_urls, pool_size=20):
"""
Fetches URLs with a thread pool.
View aw8.py
# coding=utf-8
"""Lazy hacks to enable easier usage of async functions, context managers, and iterators in the REPL."""
from asyncio import set_event_loop
try:
import uvloop as loop_maker
except ImportError:
import asyncio as loop_maker
loop = loop_maker.new_event_loop()
@mumbleskates
mumbleskates / multithreading.py
Created Nov 28, 2018
closeable channel and threadsafe iterator wrapper
View multithreading.py
# coding=utf-8
from collections import deque
from queue import Empty, Full
from threading import Condition, RLock, Thread
from time import monotonic as now
from weakref import finalize
class ChannelClosed(Exception):
"""Exception raised when a channel has been closed."""
View proto_inspect.py
# coding=utf-8
from collections import Mapping, Iterable
from operator import attrgetter
from struct import pack, unpack
__doc__ = """
Pure python tools for inspecting unknown protobuf data. Written for py3.6+.
Author: Kent Ross
View requests_future_pool.py
# coding=utf-8
from mumblecode.multithreading import CloseableQueue
from threading import Thread
from concurrent.futures import Future
import requests
class RequestFuturePool(object):
def __init__(self, pool_size=20, queue_size=64, rate_limiter=lambda: None):
View requests_pool.py
# coding=utf-8
from mumblecode.multithreading import CloseableQueue
from queue import Queue, Empty
from threading import Thread
import requests
_STOP = object()
@mumbleskates
mumbleskates / heap.py
Last active May 16, 2017
Pure Python N-ary Heap implementation
View heap.py
# coding=utf-8
class NaryHeap(object):
"""implements an n-ary heap"""
def __init__(self, items=(), *, n=2, direction=min):
"""
create a new heap
View shorturl.py
# coding=utf-8
from math import log2, ceil
# valid chars for a url path component: a-z A-Z 0-9 .-_~!$&'()*+,;=:@
# for the default set here (base 72) we have excluded $'();:@
radix_alphabet = sorted(
"0123456789"
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
@mumbleskates
mumbleskates / pybullet.py
Last active Apr 25, 2016
Python extension plugin for weechat that pushes smart notifications to PushBullet. Notifications are grouped per channel, have increasing delays, preview the first few lines, go away when you have been active in the channel, and detect when notifications have been dismissed remotely.
View pybullet.py
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
try:
# noinspection PyUnresolvedReferences,PyShadowingBuiltins
str = unicode
except NameError:
# noinspection PyShadowingBuiltins,PyUnboundLocalVariable
str = str
View get_pins.py
# coding=utf-8
from itertools import product
adjacents = ('08', '124', '1253', '236', '1457', '24568', '3569', '478', '57890', '689')
adjacents = {str(i): s for i, s in enumerate(adjacents)}
def get_pins_itertools(observed):
for pins in product(*(adjacents[ch] for ch in observed)):
You can’t perform that action at this time.