Skip to content

Instantly share code, notes, and snippets.

meejah meejah

Block or report user

Report or block meejah

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
meejah /
Created Sep 3, 2013
a temporary directory like tempfile.NamedTemporaryFile
class TempDir(object):
def __enter__(self, *args):
self.dir_name = tempfile.mkdtemp()
return self
def __exit__(self, *args):
def __str__(self):
return self.dir_name
from __future__ import print_function
import pytest
def pytest_fixture_setup(fixturedef, request):
print("pytest_fixture_setup before", fixturedef, request)
print("pytest_fixture_setup after", fixturedef, request)
View Python3 txtorcon Event Stream
import txtorcon
from twisted.internet import defer, task
async def main(reactor):
tor = await txtorcon.connect(reactor)
print("tor {}".format(tor))
state = await tor.create_state()
print("state {}".format(state))
View gist:6bae3497cf0f71d87060e10e26ae7737
class _HashSumProtocol(Protocol):
def __init__(self, kind='sha256'):
self._hash =
def dataReceived(self, data):
self.bytes_received += len(data)
#!/usr/bin/env python
## this will set up a VDE switch which has a tap interface (tap_tor)
## which is also set up by this script. This tap interface has all its
## data pushed out through tor (or dropped, if it's UDP and not port
## 53) via some iptables rules. probably you need to run this as
## root. when the VM shuts down, the processes started here are killed
## as well.
from eliot import start_action
from eliot.twisted import DeferredContext
import eliot
from twisted.internet.task import react, deferLater
from twisted.internet.defer import _inlineCallbacks, Deferred
def thing(reactor):
with start_action(action_type=u"a thing"):
View gist:4453531267b14956d5c26497da616cfb
def do_stuff(arg):
if arg == 1 or arg == 2:
return "flarg"
return "glumphy"
def test_a():
assert do_stuff(1) == "flarg"
def test_b():
import asyncio
import aiohttp
from aiosocks.connector import ProxyConnector, ProxyClientRequest
async def fetch(session, socks_port, url):
conn = ProxyConnector(remote_resolve=True)
async with aiohttp.ClientSession(connector=conn, request_class=ProxyClientRequest) as session:
request = session.get(
# -*- mode: python -*-
from __future__ import print_function
import sys
import functools
from tempfile import mkdtemp
from os.path import dirname, exists, join
from os import mkdir
from StringIO import StringIO
View gist:b81dcab079562deeb3bfc3e96b4ab6db
import treq
from twisted.internet.task import react
from twisted.internet.defer import ensureDeferred
urls = [
You can’t perform that action at this time.