Skip to content

Instantly share code, notes, and snippets.

meejah meejah

Block or report user

Report or block meejah

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
import asyncio
class Protocol(object):
def connection_made(self, transport):
print("connection_made: {}".format(transport))
def connection_lost(self, why):
print("connection_lost: {}".format(why))
View gist:b81dcab079562deeb3bfc3e96b4ab6db
import treq
from twisted.internet.task import react
from twisted.internet.defer import ensureDeferred
urls = [
# -*- mode: python -*-
from __future__ import print_function
import sys
import functools
from tempfile import mkdtemp
from os.path import dirname, exists, join
from os import mkdir
from StringIO import StringIO
import asyncio
import aiohttp
from aiosocks.connector import ProxyConnector, ProxyClientRequest
async def fetch(session, socks_port, url):
conn = ProxyConnector(remote_resolve=True)
async with aiohttp.ClientSession(connector=conn, request_class=ProxyClientRequest) as session:
request = session.get(
View gist:4453531267b14956d5c26497da616cfb
def do_stuff(arg):
if arg == 1 or arg == 2:
return "flarg"
return "glumphy"
def test_a():
assert do_stuff(1) == "flarg"
def test_b():
from eliot import start_action
from eliot.twisted import DeferredContext
import eliot
from twisted.internet.task import react, deferLater
from twisted.internet.defer import _inlineCallbacks, Deferred
def thing(reactor):
with start_action(action_type=u"a thing"):
View gist:6bae3497cf0f71d87060e10e26ae7737
class _HashSumProtocol(Protocol):
def __init__(self, kind='sha256'):
self._hash =
def dataReceived(self, data):
self.bytes_received += len(data)
View Python3 txtorcon Event Stream
import txtorcon
from twisted.internet import defer, task
async def main(reactor):
tor = await txtorcon.connect(reactor)
print("tor {}".format(tor))
state = await tor.create_state()
print("state {}".format(state))
from __future__ import print_function
import pytest
def pytest_fixture_setup(fixturedef, request):
print("pytest_fixture_setup before", fixturedef, request)
print("pytest_fixture_setup after", fixturedef, request)
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
d0 = Deferred()
d1 = Deferred()
d2 = Deferred()
def check_value(arg, gold):
print "check_value", arg, gold
assert gold == arg
You can’t perform that action at this time.