Skip to content

Instantly share code, notes, and snippets.


meejah meejah

View GitHub Profile
import asyncio
class Protocol(object):
def connection_made(self, transport):
print("connection_made: {}".format(transport))
def connection_lost(self, why):
print("connection_lost: {}".format(why))
# -*- mode: python -*-
from __future__ import print_function
import sys
import functools
from tempfile import mkdtemp
from os.path import dirname, exists, join
from os import mkdir
from StringIO import StringIO
import asyncio
import aiohttp
from aiosocks.connector import ProxyConnector, ProxyClientRequest
async def fetch(session, socks_port, url):
conn = ProxyConnector(remote_resolve=True)
async with aiohttp.ClientSession(connector=conn, request_class=ProxyClientRequest) as session:
request = session.get(
View gist:4453531267b14956d5c26497da616cfb
def do_stuff(arg):
if arg == 1 or arg == 2:
return "flarg"
return "glumphy"
def test_a():
assert do_stuff(1) == "flarg"
def test_b():
from eliot import start_action
from eliot.twisted import DeferredContext
import eliot
from twisted.internet.task import react, deferLater
from twisted.internet.defer import _inlineCallbacks, Deferred
def thing(reactor):
with start_action(action_type=u"a thing"):
#!/usr/bin/env python
## this will set up a VDE switch which has a tap interface (tap_tor)
## which is also set up by this script. This tap interface has all its
## data pushed out through tor (or dropped, if it's UDP and not port
## 53) via some iptables rules. probably you need to run this as
## root. when the VM shuts down, the processes started here are killed
## as well.
View gist:6bae3497cf0f71d87060e10e26ae7737
class _HashSumProtocol(Protocol):
def __init__(self, kind='sha256'):
self._hash =
def dataReceived(self, data):
self.bytes_received += len(data)
View Python3 txtorcon Event Stream
import txtorcon
from twisted.internet import defer, task
async def main(reactor):
tor = await txtorcon.connect(reactor)
print("tor {}".format(tor))
state = await tor.create_state()
print("state {}".format(state))
from __future__ import print_function
import pytest
def pytest_fixture_setup(fixturedef, request):
print("pytest_fixture_setup before", fixturedef, request)
print("pytest_fixture_setup after", fixturedef, request)
meejah /
Created Sep 3, 2013
a temporary directory like tempfile.NamedTemporaryFile
class TempDir(object):
def __enter__(self, *args):
self.dir_name = tempfile.mkdtemp()
return self
def __exit__(self, *args):
def __str__(self):
return self.dir_name
You can’t perform that action at this time.