Last active
August 4, 2019 18:18
-
-
Save vladiibine/ab8cfc4d8badabecc7c83ea5fdb9fe18 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Examples of usage of the twisted library | |
""" | |
from datetime import timedelta, datetime | |
# Step 1: | |
# create a server that each time it receives a http request, it will return "hello" | |
from twisted.web import server, resource | |
from twisted.internet import reactor | |
class Simple(resource.Resource): | |
isLeaf = True | |
def render_GET(self, request): | |
return "<html>Hello, world!</html>".encode() | |
def run_simple_server(): | |
site = server.Site(Simple()) | |
reactor.listenTCP(8080, site) | |
reactor.run() | |
# Step 2: | |
# create a server which makes a request to google.com, and then prints "hello" when that returns | |
from twisted.web import client | |
from twisted.internet import reactor | |
from twisted.web import server | |
class ReturnRequestAfterResponse(resource.Resource): | |
isLeaf = True | |
def render_GET(self, request): | |
a = client.Agent(reactor) | |
r = a.request(b'GET', b'http://google.com') | |
def write_google_response_length(response): | |
request.write(f'the status from google was {response.code}! niiice, right?'.encode()) | |
request.finish() | |
r.addBoth(write_google_response_length) | |
return server.NOT_DONE_YET | |
def run_request_after_response_server(): | |
site = server.Site(ReturnRequestAfterResponse()) | |
reactor.listenTCP(8080, site) | |
reactor.run() | |
# Step 3: | |
# make a process that schedules a few requests every few seconds | |
# (to simulate a POLL) | |
from twisted.internet import reactor | |
def periodic_print(): | |
def boom(): | |
print("BOOM!") | |
reactor.callLater(2, boom) | |
reactor.callLater(2, boom) | |
reactor.run() | |
from twisted.internet import defer | |
from twisted.web.client import Agent, readBody | |
def chain_multiple_deferred(): | |
a = client.Agent(reactor) | |
r = a.request( | |
b'GET', | |
b'https://asdf.com', | |
) | |
def finish_request(response): | |
resp_d = readBody(response) | |
resp_d.addBoth(print_beginning_of_response) | |
return resp_d | |
r.addBoth(finish_request) | |
def print_beginning_of_response(value): | |
print(value[:100]) | |
# d.addBoth(print_beginning_of_response) | |
reactor.callLater(1, print, "it's taking quite a while man!") | |
reactor.callLater(1.5, reactor.stop) | |
reactor.run() | |
# print('Hi!') | |
# return d | |
from twisted.internet import defer, reactor | |
from twisted.web.client import Agent, readBody | |
from twisted.internet.defer import ensureDeferred | |
def chain_multiple_callbacks_with_async_await_syntax_fml_callbacks(): | |
async def make_request(): | |
a = Agent(reactor) | |
try: | |
res1 = await a.request(b'GET', b'http://asdf.com') | |
except Exception as err: | |
print(err) | |
raise err | |
body = await readBody(res1) | |
return body | |
d = ensureDeferred(make_request()) | |
d.addBoth(print) | |
reactor.callLater(2, print, "finishing the run!") | |
reactor.callLater(2.1, reactor.stop) | |
reactor.run() | |
from twisted.internet import defer, reactor | |
from twisted.web.client import Agent, readBody | |
from twisted.internet.defer import ensureDeferred | |
import txrequests | |
def multiple_requests_txrequess(): | |
async def main(): | |
with txrequests.Session() as session: | |
responses = [] | |
for url in ( | |
'http://asdf.com', | |
'http://yahoo.com', | |
'http://python.org', | |
'http://abc.xyz', | |
'http://facebook.com', | |
'http://google.com', | |
): | |
responses.append(await session.get(url)) | |
return [r.text[:40] for r in responses] | |
def alert_and_stop(result): | |
print('stopping after result: ', result) | |
reactor.stop() | |
t2 = datetime.now() | |
print('took ', t2 - t1) | |
t1 = datetime.now() | |
d = ensureDeferred(main()) | |
d.addBoth(alert_and_stop) | |
reactor.run() | |
from twisted.internet import defer, reactor | |
from twisted.web.client import Agent, readBody | |
from twisted.internet.defer import ensureDeferred | |
import txrequests | |
def multiple_parallel_requests_txrequests(): | |
async def main(): | |
with txrequests.Session() as session: | |
reqs = [ | |
session.get(url) | |
for url in ( | |
'http://asdf.com', | |
'http://yahoo.com', | |
'http://python.org', | |
'http://abc.xyz', | |
'http://facebook.com', | |
'http://google.com', | |
) | |
] | |
response_list = await defer.DeferredList(reqs) | |
responses = [r[1].text[:30] for r in response_list] | |
return responses | |
def alert_and_stop(result): | |
print('stopping after result: ', result) | |
reactor.stop() | |
t2 = datetime.now() | |
print("took", t2 - t1) | |
t1 = datetime.now() | |
d = ensureDeferred(main()) | |
d.addBoth(alert_and_stop) | |
reactor.run() | |
from twisted.internet import reactor | |
from twisted.internet.defer import DeferredList, Deferred, ensureDeferred | |
class DeferredOnlyQueue: | |
"""This demonstrates how to handle with Deferreds situations where the | |
request can't be fulfilled "now" by blocking, because some other process | |
needs to do something first, and we don't control that. | |
STILL, even if some other process needs to do something before we can | |
produce a result, we still can fire the Deferred object in the future, | |
and so we can compose deferreds in all kinds of patterns. | |
This can happen many times: | |
Component 1 makes a request to component 2 with args | |
Comp 2 is unable to return a result now | |
Comp 2 enqueues the arguments, creates a Deferred obj (d1) and returns it | |
Time passes | |
Comp 3 makes Comp 2 be able to return a result | |
Comp 2 dequeues all arguments and fires all deferreds like d1 | |
""" | |
def __init__(self): | |
self.cache = [] | |
self.open = False | |
def read_from_queue(self, name): | |
d = Deferred() | |
self.cache.append((name, d)) | |
self.execute_read() | |
# if self.open: | |
# self.execute_read() | |
return d | |
def execute_read(self): | |
if not self.open: | |
return | |
while self.cache: | |
name, deferred = self.cache.pop(0) | |
try: | |
# Complicated process that can fail! | |
result = f"hello {name}!" | |
deferred.callback(result) | |
except Exception as err: | |
deferred.errback(err) | |
async def enqueue_and_wait(self): | |
print("Starting read attempt") | |
r1 = await self.read_from_queue('petre') | |
print(f"Finished reading r1: {r1}") | |
async def multiple_enqueue_and_wait(self): | |
print("Starting multiple reads") | |
deferreds = [ | |
self.read_from_queue(f"petre-{num}") | |
for num in range(4) | |
] | |
results = await DeferredList(deferreds) | |
print("Received results") | |
for result in results: | |
print(result) | |
reactor.stop() | |
def fulfill_actions(self): | |
self.open = True | |
self.execute_read() | |
class QueueWithCallbacks: | |
"""Same like DeferredOnlyQueue, but there's an extra step, that | |
the deferreds d1 also have a callback attached to them | |
""" | |
def __init__(self): | |
self.cache = [] | |
self.open = False | |
def read_from_queue(self, name, callback): | |
d = Deferred() | |
# !!!!!!Adding the callback here makes the DeferredList receive | |
# the callback's result | |
# d.addCallback(callback) | |
self.cache.append((name, callback, d)) | |
self.execute_read() | |
# if self.open: | |
# self.execute_read() | |
return d | |
def execute_read(self): | |
if not self.open: | |
return | |
while self.cache: | |
name, callback, deferred = self.cache.pop(0) # type: (str, object, Deferred) | |
# !!!! Adding the callback here makes the DeferredList NOT | |
# receive the callback's result. It's fine in this situation. | |
deferred.addCallback(callback) | |
try: | |
# Complicated process that can fail! | |
result = f"hello {name}!" | |
deferred.callback(result) | |
except Exception as err: | |
deferred.errback(err) | |
async def enqueue_and_wait(self, callback): | |
print("Starting read attempt") | |
r1 = await self.read_from_queue('petre', callback) | |
print(f"Finished reading r1: {r1}") | |
async def multiple_enqueue_and_wait(self, callback): | |
print("Starting multiple reads") | |
deferreds = [ | |
self.read_from_queue(f"petre-{num}", callback) | |
for num in range(4) | |
] | |
results = await DeferredList(deferreds) | |
print("Received results") | |
for result in results: | |
print(result) | |
reactor.stop() | |
def fulfill_actions(self): | |
self.open = True | |
self.execute_read() | |
def main_deferreds_only(): | |
queue = DeferredOnlyQueue() | |
print(1) | |
# ensureDeferred(queue.enqueue_and_wait()) | |
ensureDeferred(queue.multiple_enqueue_and_wait()) | |
print(2) | |
# ensureDeferred(fulfill_actions(queue)) | |
reactor.callLater(5, lambda: (queue.fulfill_actions())) | |
# ensureDeferred() | |
print(3) | |
reactor.run() | |
print(4) | |
def main_deferreds_and_callbacks(): | |
queue = QueueWithCallbacks() | |
def print_callback(*a, **kw): | |
print(f"Inside the callback, a={a}, kw={kw}") | |
# Something very interesting happens here. | |
# If the callback is added BEFORE the DeferredList is created, | |
# then the result in the DeferredList is 444 | |
# If however this callback is added AFTER the DeferredList is created, | |
# then its results will be "hello petre-X". It's very interesting | |
# how the DeferredList just adds a callback/errback pair on the | |
# deferred. As such, even if the Deferred's result will be 444, | |
# the DeferredList sees only the result from the callbacks | |
# added BEFORE it was created | |
# I | |
return 444 | |
return a | |
print(1) | |
# ensureDeferred(queue.enqueue_and_wait()) | |
ensureDeferred(queue.multiple_enqueue_and_wait(print_callback)) | |
print(2) | |
# ensureDeferred(fulfill_actions(queue)) | |
reactor.callLater(5, lambda: (queue.fulfill_actions())) | |
# ensureDeferred() | |
print(3) | |
reactor.run() | |
print(4) | |
if __name__ == '__main__': | |
# HOW TO RUN: uncomment one of these lines below. Twisted's reactor is not | |
# restartable, so you can't use more than one of these "main" functions | |
# run_simple_server() | |
# run_request_after_response_server() | |
# periodic_print() | |
# chain_multiple_deferred() | |
# chain_multiple_callbacks_with_async_await_syntax_fml_callbacks() | |
# multiple_requests_txrequess() # takes 2.3 -> 2.4 seconds for 6 requests | |
# multiple_parallel_requests_txrequests() # takes 0.9 seconds for 6 requests | |
# main_deferreds_only() | |
# main_deferreds_and_callbacks() | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment