Created
March 23, 2015 12:53
-
-
Save berlincount/39b1017336e6d55b9338 to your computer and use it in GitHub Desktop.
Twisted HTTP client example with parallel requests, timeouts and authentication
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from twisted.internet import defer, reactor | |
from twisted.internet.protocol import Protocol | |
from twisted.web.client import Agent | |
from twisted.web.http_headers import Headers | |
def run(): | |
# create some requests for parallel execution | |
# NOTE: in production, 16 parallel should be fine - 4 for demo | |
if createSomeRequests(maxparallel=4): | |
print("Starting reactor to run requests") | |
reactor.run() | |
else: | |
print("No requests where created.") | |
print("Everything done.") | |
def createSomeRequests(maxparallel): | |
# we're lining up requests in a semaphore | |
deferreds = [] | |
semaphore = defer.DeferredSemaphore(maxparallel) | |
# Twenty dummy requests .. | |
for i in range(20): | |
context = { | |
'url': 'http://localhost:9990/immediate/%d' % i, | |
'callback': handleResponse, | |
'errback': handleError, | |
'connect_timeout': 10 | |
} | |
print("Scheduling new request for '%s'" % context['url']) | |
d = semaphore.run(requestHTTP, context=context) | |
deferreds.append(d) | |
# One request which gets delayed ... | |
context = { | |
'url': 'http://localhost:9990/delayed', | |
'callback': handleResponse, | |
'errback': handleError, | |
'connect_timeout': 10, | |
'body_timeout': 10 | |
} | |
print("Scheduling new request for '%s'" % context['url']) | |
d = semaphore.run(requestHTTP, context=context) | |
deferreds.append(d) | |
# One request which uses authentication ... | |
context = { | |
'url': 'http://localhost:9990/protected', | |
'callback': handleResponse, | |
'errback': handleError, | |
'connect_timeout': 10, | |
'username': 'username', | |
'password': 'password' | |
} | |
print("Scheduling new request for '%s'" % context['url']) | |
d = semaphore.run(requestHTTP, context=context) | |
deferreds.append(d) | |
# One request which will not return ... | |
context = { | |
# this address is usually unreachable ... | |
'url': 'http://192.168.254.254:9990/unreachable', | |
'callback': handleResponse, | |
'errback': handleError, | |
'connect_timeout': 10 | |
} | |
print("Scheduling new request for '%s'" % context['url']) | |
d = semaphore.run(requestHTTP, context=context) | |
deferreds.append(d) | |
# .. and return an nice DeferredList with all of them | |
dl = None | |
print("Scheduled %d deferreds" % len(deferreds)) | |
if len(deferreds): | |
dl = defer.DeferredList(deferreds, consumeErrors=1) | |
dl.addCallback(showResponses) | |
dl.addCallback(finish) | |
return dl | |
def requestHTTP(context={}): | |
agent = Agent(reactor) | |
headers = {} | |
# optional basic authentication | |
# NOTE: this transfers authentication data in plain - consider OAuth | |
if 'username' in context and 'password' in context: | |
import base64 | |
headers['Authorization'] = ["Basic " + base64.encodestring( | |
'%s:%s' % (context['username'], context['password'])).strip()] | |
# deferred for the request | |
d = agent.request( | |
'GET', | |
context['url'], | |
Headers(headers)) | |
# a timeout for the request to have run | |
# NOTE: also counts time spent waiting for semaphore! | |
if 'connect_timeout' in context: | |
context['timeoutCall'] = \ | |
reactor.callLater(context['connect_timeout'], d.cancel) | |
if 'callback' in context: | |
d.addCallback(context['callback'], context) | |
if 'errback' in context: | |
d.addErrback(context['errback'], context) | |
return d | |
def handleResponse(response, context): | |
# update with information we already have | |
context.update({ | |
'status': response.code, | |
'message': response.phrase, | |
'headers': response.headers.getAllRawHeaders(), | |
}) | |
# cancel a possible connect timeout | |
if 'timeoutCall' in context: | |
if context['timeoutCall'].active(): | |
context['timeoutCall'].cancel() | |
body = defer.Deferred() | |
# a timeout for the request body to be delivered | |
if 'body_timeout' in context: | |
context['timeoutCall'] = \ | |
reactor.callLater(context['body_timeout'], body.cancel) | |
# capture the body data | |
response.deliverBody(BodyCapture(body, context)) | |
return body | |
class BodyCapture(Protocol): | |
def __init__(self, finished, context): | |
self.body = "" | |
self.context = context | |
self.finished = finished | |
def dataReceived(self, bytes): | |
self.body += bytes | |
def connectionLost(self, reason): | |
self.context.update({ | |
'length': len(self.body), | |
'content': self.body, | |
}) | |
print("got '%s' with '%d' bytes " % | |
(self.context['url'], self.context['length'])) | |
self.finished.callback(self.context) | |
def handleError(error, context): | |
context.update({ | |
'msg': error.getErrorMessage(), | |
'trace': error.getTraceback(), | |
'err': error, | |
}) | |
# cancel a possible timeout | |
if 'timeoutCall' in context: | |
if context['timeoutCall'].active(): | |
context['timeoutCall'].cancel() | |
return context | |
def showResponses(result): | |
resultlist = [] | |
for success, data in result: | |
if success: | |
if 'err' in data: | |
print("%s -> ERR: %s" % (data['url'], data['err'])) | |
else: | |
print("%s -> Result(%s): %s" % | |
(data['url'], data['status'], data['content'])) | |
else: | |
print("? -> %s" % data) | |
def finish(ign): | |
print("Stopping reactor.") | |
reactor.stop() | |
if __name__ == '__main__': | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment