Instantly share code, notes, and snippets.

View gist:b81dcab079562deeb3bfc3e96b4ab6db
import treq
from twisted.internet.task import react
from twisted.internet.defer import ensureDeferred
urls = [
b"https://www.torproject.org",
b"https://meejah.ca",
]
View carml_docs.py
# -*- mode: python -*-
from __future__ import print_function
import sys
import functools
from tempfile import mkdtemp
from os.path import dirname, exists, join
from os import mkdir
from StringIO import StringIO
View aio_client.py
import asyncio
import aiohttp
from aiosocks.connector import ProxyConnector, ProxyClientRequest
async def fetch(session, socks_port, url):
conn = ProxyConnector(remote_resolve=True)
async with aiohttp.ClientSession(connector=conn, request_class=ProxyClientRequest) as session:
request = session.get(
View gist:4453531267b14956d5c26497da616cfb
def do_stuff(arg):
if arg == 1 or arg == 2:
return "flarg"
return "glumphy"
def test_a():
assert do_stuff(1) == "flarg"
def test_b():
View eliot-test.py
from eliot import start_action
from eliot.twisted import DeferredContext
import eliot
from twisted.internet.task import react, deferLater
from twisted.internet.defer import _inlineCallbacks, Deferred
def thing(reactor):
with start_action(action_type=u"a thing"):
View gist:6bae3497cf0f71d87060e10e26ae7737
class _HashSumProtocol(Protocol):
def __init__(self, kind='sha256'):
self._hash = hashlib.new(kind)
def dataReceived(self, data):
self.bytes_received += len(data)
self._hash.update(data)
View Python3 txtorcon Event Stream
import txtorcon
from twisted.internet import defer, task
async def main(reactor):
tor = await txtorcon.connect(reactor)
print("tor {}".format(tor))
state = await tor.create_state()
print("state {}".format(state))
View conftest.py
from __future__ import print_function
import pytest
@pytest.hookimpl(hookwrapper=True)
def pytest_fixture_setup(fixturedef, request):
print("pytest_fixture_setup before", fixturedef, request)
yield
print("pytest_fixture_setup after", fixturedef, request)
View twisted3.py
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
d0 = Deferred()
d1 = Deferred()
d2 = Deferred()
def check_value(arg, gold):
print "check_value", arg, gold
assert gold == arg
View gist:d6f9fba8c080c35cba6f
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
d0 = Deferred()
d1 = Deferred()
d2 = Deferred()
def check_value(arg, gold):
print "check_value", arg, gold
assert gold == arg