This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
class Protocol(object): | |
def connection_made(self, transport): | |
print("connection_made: {}".format(transport)) | |
transport.abort() | |
def connection_lost(self, why): | |
print("connection_lost: {}".format(why)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- mode: python -*- | |
from __future__ import print_function | |
import sys | |
import functools | |
from tempfile import mkdtemp | |
from os.path import dirname, exists, join | |
from os import mkdir | |
from StringIO import StringIO |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import aiohttp | |
from aiosocks.connector import ProxyConnector, ProxyClientRequest | |
async def fetch(session, socks_port, url): | |
conn = ProxyConnector(remote_resolve=True) | |
async with aiohttp.ClientSession(connector=conn, request_class=ProxyClientRequest) as session: | |
request = session.get( |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def do_stuff(arg): | |
if arg == 1 or arg == 2: | |
return "flarg" | |
return "glumphy" | |
def test_a(): | |
assert do_stuff(1) == "flarg" | |
def test_b(): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from eliot import start_action | |
from eliot.twisted import DeferredContext | |
import eliot | |
from twisted.internet.task import react, deferLater | |
from twisted.internet.defer import _inlineCallbacks, Deferred | |
def thing(reactor): | |
with start_action(action_type=u"a thing"): |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class _HashSumProtocol(Protocol): | |
def __init__(self, kind='sha256'): | |
self._hash = hashlib.new(kind) | |
def dataReceived(self, data): | |
self.bytes_received += len(data) | |
self._hash.update(data) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import txtorcon | |
from twisted.internet import defer, task | |
async def main(reactor): | |
tor = await txtorcon.connect(reactor) | |
print("tor {}".format(tor)) | |
state = await tor.create_state() | |
print("state {}".format(state)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import pytest | |
@pytest.hookimpl(hookwrapper=True) | |
def pytest_fixture_setup(fixturedef, request): | |
print("pytest_fixture_setup before", fixturedef, request) | |
yield | |
print("pytest_fixture_setup after", fixturedef, request) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue | |
d0 = Deferred() | |
d1 = Deferred() | |
d2 = Deferred() | |
def check_value(arg, gold): | |
print "check_value", arg, gold | |
assert gold == arg |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue | |
d0 = Deferred() | |
d1 = Deferred() | |
d2 = Deferred() | |
def check_value(arg, gold): | |
print "check_value", arg, gold | |
assert gold == arg |
NewerOlder