Skip to content

Instantly share code, notes, and snippets.

@vladiibine
Last active August 4, 2019 18:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vladiibine/ab8cfc4d8badabecc7c83ea5fdb9fe18 to your computer and use it in GitHub Desktop.
Save vladiibine/ab8cfc4d8badabecc7c83ea5fdb9fe18 to your computer and use it in GitHub Desktop.
"""
Examples of usage of the twisted library
"""
from datetime import timedelta, datetime
# Step 1:
# create a server that each time it receives a http request, it will return "hello"
from twisted.web import server, resource
from twisted.internet import reactor
class Simple(resource.Resource):
isLeaf = True
def render_GET(self, request):
return "<html>Hello, world!</html>".encode()
def run_simple_server():
site = server.Site(Simple())
reactor.listenTCP(8080, site)
reactor.run()
# Step 2:
# create a server which makes a request to google.com, and then prints "hello" when that returns
from twisted.web import client
from twisted.internet import reactor
from twisted.web import server
class ReturnRequestAfterResponse(resource.Resource):
isLeaf = True
def render_GET(self, request):
a = client.Agent(reactor)
r = a.request(b'GET', b'http://google.com')
def write_google_response_length(response):
request.write(f'the status from google was {response.code}! niiice, right?'.encode())
request.finish()
r.addBoth(write_google_response_length)
return server.NOT_DONE_YET
def run_request_after_response_server():
site = server.Site(ReturnRequestAfterResponse())
reactor.listenTCP(8080, site)
reactor.run()
# Step 3:
# make a process that schedules a few requests every few seconds
# (to simulate a POLL)
from twisted.internet import reactor
def periodic_print():
def boom():
print("BOOM!")
reactor.callLater(2, boom)
reactor.callLater(2, boom)
reactor.run()
from twisted.internet import defer
from twisted.web.client import Agent, readBody
def chain_multiple_deferred():
a = client.Agent(reactor)
r = a.request(
b'GET',
b'https://asdf.com',
)
def finish_request(response):
resp_d = readBody(response)
resp_d.addBoth(print_beginning_of_response)
return resp_d
r.addBoth(finish_request)
def print_beginning_of_response(value):
print(value[:100])
# d.addBoth(print_beginning_of_response)
reactor.callLater(1, print, "it's taking quite a while man!")
reactor.callLater(1.5, reactor.stop)
reactor.run()
# print('Hi!')
# return d
from twisted.internet import defer, reactor
from twisted.web.client import Agent, readBody
from twisted.internet.defer import ensureDeferred
def chain_multiple_callbacks_with_async_await_syntax_fml_callbacks():
async def make_request():
a = Agent(reactor)
try:
res1 = await a.request(b'GET', b'http://asdf.com')
except Exception as err:
print(err)
raise err
body = await readBody(res1)
return body
d = ensureDeferred(make_request())
d.addBoth(print)
reactor.callLater(2, print, "finishing the run!")
reactor.callLater(2.1, reactor.stop)
reactor.run()
from twisted.internet import defer, reactor
from twisted.web.client import Agent, readBody
from twisted.internet.defer import ensureDeferred
import txrequests
def multiple_requests_txrequess():
async def main():
with txrequests.Session() as session:
responses = []
for url in (
'http://asdf.com',
'http://yahoo.com',
'http://python.org',
'http://abc.xyz',
'http://facebook.com',
'http://google.com',
):
responses.append(await session.get(url))
return [r.text[:40] for r in responses]
def alert_and_stop(result):
print('stopping after result: ', result)
reactor.stop()
t2 = datetime.now()
print('took ', t2 - t1)
t1 = datetime.now()
d = ensureDeferred(main())
d.addBoth(alert_and_stop)
reactor.run()
from twisted.internet import defer, reactor
from twisted.web.client import Agent, readBody
from twisted.internet.defer import ensureDeferred
import txrequests
def multiple_parallel_requests_txrequests():
async def main():
with txrequests.Session() as session:
reqs = [
session.get(url)
for url in (
'http://asdf.com',
'http://yahoo.com',
'http://python.org',
'http://abc.xyz',
'http://facebook.com',
'http://google.com',
)
]
response_list = await defer.DeferredList(reqs)
responses = [r[1].text[:30] for r in response_list]
return responses
def alert_and_stop(result):
print('stopping after result: ', result)
reactor.stop()
t2 = datetime.now()
print("took", t2 - t1)
t1 = datetime.now()
d = ensureDeferred(main())
d.addBoth(alert_and_stop)
reactor.run()
from twisted.internet import reactor
from twisted.internet.defer import DeferredList, Deferred, ensureDeferred
class DeferredOnlyQueue:
"""This demonstrates how to handle with Deferreds situations where the
request can't be fulfilled "now" by blocking, because some other process
needs to do something first, and we don't control that.
STILL, even if some other process needs to do something before we can
produce a result, we still can fire the Deferred object in the future,
and so we can compose deferreds in all kinds of patterns.
This can happen many times:
Component 1 makes a request to component 2 with args
Comp 2 is unable to return a result now
Comp 2 enqueues the arguments, creates a Deferred obj (d1) and returns it
Time passes
Comp 3 makes Comp 2 be able to return a result
Comp 2 dequeues all arguments and fires all deferreds like d1
"""
def __init__(self):
self.cache = []
self.open = False
def read_from_queue(self, name):
d = Deferred()
self.cache.append((name, d))
self.execute_read()
# if self.open:
# self.execute_read()
return d
def execute_read(self):
if not self.open:
return
while self.cache:
name, deferred = self.cache.pop(0)
try:
# Complicated process that can fail!
result = f"hello {name}!"
deferred.callback(result)
except Exception as err:
deferred.errback(err)
async def enqueue_and_wait(self):
print("Starting read attempt")
r1 = await self.read_from_queue('petre')
print(f"Finished reading r1: {r1}")
async def multiple_enqueue_and_wait(self):
print("Starting multiple reads")
deferreds = [
self.read_from_queue(f"petre-{num}")
for num in range(4)
]
results = await DeferredList(deferreds)
print("Received results")
for result in results:
print(result)
reactor.stop()
def fulfill_actions(self):
self.open = True
self.execute_read()
class QueueWithCallbacks:
"""Same like DeferredOnlyQueue, but there's an extra step, that
the deferreds d1 also have a callback attached to them
"""
def __init__(self):
self.cache = []
self.open = False
def read_from_queue(self, name, callback):
d = Deferred()
# !!!!!!Adding the callback here makes the DeferredList receive
# the callback's result
# d.addCallback(callback)
self.cache.append((name, callback, d))
self.execute_read()
# if self.open:
# self.execute_read()
return d
def execute_read(self):
if not self.open:
return
while self.cache:
name, callback, deferred = self.cache.pop(0) # type: (str, object, Deferred)
# !!!! Adding the callback here makes the DeferredList NOT
# receive the callback's result. It's fine in this situation.
deferred.addCallback(callback)
try:
# Complicated process that can fail!
result = f"hello {name}!"
deferred.callback(result)
except Exception as err:
deferred.errback(err)
async def enqueue_and_wait(self, callback):
print("Starting read attempt")
r1 = await self.read_from_queue('petre', callback)
print(f"Finished reading r1: {r1}")
async def multiple_enqueue_and_wait(self, callback):
print("Starting multiple reads")
deferreds = [
self.read_from_queue(f"petre-{num}", callback)
for num in range(4)
]
results = await DeferredList(deferreds)
print("Received results")
for result in results:
print(result)
reactor.stop()
def fulfill_actions(self):
self.open = True
self.execute_read()
def main_deferreds_only():
queue = DeferredOnlyQueue()
print(1)
# ensureDeferred(queue.enqueue_and_wait())
ensureDeferred(queue.multiple_enqueue_and_wait())
print(2)
# ensureDeferred(fulfill_actions(queue))
reactor.callLater(5, lambda: (queue.fulfill_actions()))
# ensureDeferred()
print(3)
reactor.run()
print(4)
def main_deferreds_and_callbacks():
queue = QueueWithCallbacks()
def print_callback(*a, **kw):
print(f"Inside the callback, a={a}, kw={kw}")
# Something very interesting happens here.
# If the callback is added BEFORE the DeferredList is created,
# then the result in the DeferredList is 444
# If however this callback is added AFTER the DeferredList is created,
# then its results will be "hello petre-X". It's very interesting
# how the DeferredList just adds a callback/errback pair on the
# deferred. As such, even if the Deferred's result will be 444,
# the DeferredList sees only the result from the callbacks
# added BEFORE it was created
# I
return 444
return a
print(1)
# ensureDeferred(queue.enqueue_and_wait())
ensureDeferred(queue.multiple_enqueue_and_wait(print_callback))
print(2)
# ensureDeferred(fulfill_actions(queue))
reactor.callLater(5, lambda: (queue.fulfill_actions()))
# ensureDeferred()
print(3)
reactor.run()
print(4)
if __name__ == '__main__':
# HOW TO RUN: uncomment one of these lines below. Twisted's reactor is not
# restartable, so you can't use more than one of these "main" functions
# run_simple_server()
# run_request_after_response_server()
# periodic_print()
# chain_multiple_deferred()
# chain_multiple_callbacks_with_async_await_syntax_fml_callbacks()
# multiple_requests_txrequess() # takes 2.3 -> 2.4 seconds for 6 requests
# multiple_parallel_requests_txrequests() # takes 0.9 seconds for 6 requests
# main_deferreds_only()
# main_deferreds_and_callbacks()
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment