Skip to content

Instantly share code, notes, and snippets.

meejah meejah

Block or report user

Report or block meejah

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
View aio_abort_behavior.py
import asyncio
class Protocol(object):
def connection_made(self, transport):
print("connection_made: {}".format(transport))
transport.abort()
def connection_lost(self, why):
print("connection_lost: {}".format(why))
View gist:b81dcab079562deeb3bfc3e96b4ab6db
import treq
from twisted.internet.task import react
from twisted.internet.defer import ensureDeferred
urls = [
b"https://www.torproject.org",
b"https://meejah.ca",
]
View carml_docs.py
# -*- mode: python -*-
from __future__ import print_function
import sys
import functools
from tempfile import mkdtemp
from os.path import dirname, exists, join
from os import mkdir
from StringIO import StringIO
View aio_client.py
import asyncio
import aiohttp
from aiosocks.connector import ProxyConnector, ProxyClientRequest
async def fetch(session, socks_port, url):
conn = ProxyConnector(remote_resolve=True)
async with aiohttp.ClientSession(connector=conn, request_class=ProxyClientRequest) as session:
request = session.get(
View gist:4453531267b14956d5c26497da616cfb
def do_stuff(arg):
if arg == 1 or arg == 2:
return "flarg"
return "glumphy"
def test_a():
assert do_stuff(1) == "flarg"
def test_b():
View eliot-test.py
from eliot import start_action
from eliot.twisted import DeferredContext
import eliot
from twisted.internet.task import react, deferLater
from twisted.internet.defer import _inlineCallbacks, Deferred
def thing(reactor):
with start_action(action_type=u"a thing"):
View gist:6bae3497cf0f71d87060e10e26ae7737
class _HashSumProtocol(Protocol):
def __init__(self, kind='sha256'):
self._hash = hashlib.new(kind)
def dataReceived(self, data):
self.bytes_received += len(data)
self._hash.update(data)
View Python3 txtorcon Event Stream
import txtorcon
from twisted.internet import defer, task
async def main(reactor):
tor = await txtorcon.connect(reactor)
print("tor {}".format(tor))
state = await tor.create_state()
print("state {}".format(state))
View conftest.py
from __future__ import print_function
import pytest
@pytest.hookimpl(hookwrapper=True)
def pytest_fixture_setup(fixturedef, request):
print("pytest_fixture_setup before", fixturedef, request)
yield
print("pytest_fixture_setup after", fixturedef, request)
View twisted3.py
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue
d0 = Deferred()
d1 = Deferred()
d2 = Deferred()
def check_value(arg, gold):
print "check_value", arg, gold
assert gold == arg
You can’t perform that action at this time.