Skip to content

Instantly share code, notes, and snippets.

Avatar

meejah meejah

View GitHub Profile
View aio_abort_behavior.py
import asyncio
class Protocol(object):
def connection_made(self, transport):
print("connection_made: {}".format(transport))
transport.abort()
def connection_lost(self, why):
print("connection_lost: {}".format(why))
View carml_docs.py
# -*- mode: python -*-
from __future__ import print_function
import sys
import functools
from tempfile import mkdtemp
from os.path import dirname, exists, join
from os import mkdir
from StringIO import StringIO
View aio_client.py
import asyncio
import aiohttp
from aiosocks.connector import ProxyConnector, ProxyClientRequest
async def fetch(session, socks_port, url):
conn = ProxyConnector(remote_resolve=True)
async with aiohttp.ClientSession(connector=conn, request_class=ProxyClientRequest) as session:
request = session.get(
View gist:4453531267b14956d5c26497da616cfb
def do_stuff(arg):
if arg == 1 or arg == 2:
return "flarg"
return "glumphy"
def test_a():
assert do_stuff(1) == "flarg"
def test_b():
View eliot-test.py
from eliot import start_action
from eliot.twisted import DeferredContext
import eliot
from twisted.internet.task import react, deferLater
from twisted.internet.defer import _inlineCallbacks, Deferred
def thing(reactor):
with start_action(action_type=u"a thing"):
View setup-tor-vm.py
#!/usr/bin/env python
##
## this will set up a VDE switch which has a tap interface (tap_tor)
## which is also set up by this script. This tap interface has all its
## data pushed out through tor (or dropped, if it's UDP and not port
## 53) via some iptables rules. probably you need to run this as
## root. when the VM shuts down, the processes started here are killed
## as well.
##
View gist:6bae3497cf0f71d87060e10e26ae7737
class _HashSumProtocol(Protocol):
def __init__(self, kind='sha256'):
self._hash = hashlib.new(kind)
def dataReceived(self, data):
self.bytes_received += len(data)
self._hash.update(data)
View Python3 txtorcon Event Stream
import txtorcon
from twisted.internet import defer, task
async def main(reactor):
tor = await txtorcon.connect(reactor)
print("tor {}".format(tor))
state = await tor.create_state()
print("state {}".format(state))
View conftest.py
from __future__ import print_function
import pytest
@pytest.hookimpl(hookwrapper=True)
def pytest_fixture_setup(fixturedef, request):
print("pytest_fixture_setup before", fixturedef, request)
yield
print("pytest_fixture_setup after", fixturedef, request)
@meejah
meejah / tempdir.py
Created Sep 3, 2013
a temporary directory like tempfile.NamedTemporaryFile
View tempdir.py
class TempDir(object):
def __enter__(self, *args):
self.dir_name = tempfile.mkdtemp()
return self
def __exit__(self, *args):
shutil.rmtree(self.dir_name)
def __str__(self):
return self.dir_name
You can’t perform that action at this time.