Skip to content

Instantly share code, notes, and snippets.

@berlincount
Created March 23, 2015 12:53
Show Gist options
  • Save berlincount/39b1017336e6d55b9338 to your computer and use it in GitHub Desktop.
Save berlincount/39b1017336e6d55b9338 to your computer and use it in GitHub Desktop.
Twisted HTTP client example with parallel requests, timeouts and authentication
#!/usr/bin/env python
from twisted.internet import defer, reactor
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
def run():
# create some requests for parallel execution
# NOTE: in production, 16 parallel should be fine - 4 for demo
if createSomeRequests(maxparallel=4):
print("Starting reactor to run requests")
reactor.run()
else:
print("No requests where created.")
print("Everything done.")
def createSomeRequests(maxparallel):
# we're lining up requests in a semaphore
deferreds = []
semaphore = defer.DeferredSemaphore(maxparallel)
# Twenty dummy requests ..
for i in range(20):
context = {
'url': 'http://localhost:9990/immediate/%d' % i,
'callback': handleResponse,
'errback': handleError,
'connect_timeout': 10
}
print("Scheduling new request for '%s'" % context['url'])
d = semaphore.run(requestHTTP, context=context)
deferreds.append(d)
# One request which gets delayed ...
context = {
'url': 'http://localhost:9990/delayed',
'callback': handleResponse,
'errback': handleError,
'connect_timeout': 10,
'body_timeout': 10
}
print("Scheduling new request for '%s'" % context['url'])
d = semaphore.run(requestHTTP, context=context)
deferreds.append(d)
# One request which uses authentication ...
context = {
'url': 'http://localhost:9990/protected',
'callback': handleResponse,
'errback': handleError,
'connect_timeout': 10,
'username': 'username',
'password': 'password'
}
print("Scheduling new request for '%s'" % context['url'])
d = semaphore.run(requestHTTP, context=context)
deferreds.append(d)
# One request which will not return ...
context = {
# this address is usually unreachable ...
'url': 'http://192.168.254.254:9990/unreachable',
'callback': handleResponse,
'errback': handleError,
'connect_timeout': 10
}
print("Scheduling new request for '%s'" % context['url'])
d = semaphore.run(requestHTTP, context=context)
deferreds.append(d)
# .. and return an nice DeferredList with all of them
dl = None
print("Scheduled %d deferreds" % len(deferreds))
if len(deferreds):
dl = defer.DeferredList(deferreds, consumeErrors=1)
dl.addCallback(showResponses)
dl.addCallback(finish)
return dl
def requestHTTP(context={}):
agent = Agent(reactor)
headers = {}
# optional basic authentication
# NOTE: this transfers authentication data in plain - consider OAuth
if 'username' in context and 'password' in context:
import base64
headers['Authorization'] = ["Basic " + base64.encodestring(
'%s:%s' % (context['username'], context['password'])).strip()]
# deferred for the request
d = agent.request(
'GET',
context['url'],
Headers(headers))
# a timeout for the request to have run
# NOTE: also counts time spent waiting for semaphore!
if 'connect_timeout' in context:
context['timeoutCall'] = \
reactor.callLater(context['connect_timeout'], d.cancel)
if 'callback' in context:
d.addCallback(context['callback'], context)
if 'errback' in context:
d.addErrback(context['errback'], context)
return d
def handleResponse(response, context):
# update with information we already have
context.update({
'status': response.code,
'message': response.phrase,
'headers': response.headers.getAllRawHeaders(),
})
# cancel a possible connect timeout
if 'timeoutCall' in context:
if context['timeoutCall'].active():
context['timeoutCall'].cancel()
body = defer.Deferred()
# a timeout for the request body to be delivered
if 'body_timeout' in context:
context['timeoutCall'] = \
reactor.callLater(context['body_timeout'], body.cancel)
# capture the body data
response.deliverBody(BodyCapture(body, context))
return body
class BodyCapture(Protocol):
def __init__(self, finished, context):
self.body = ""
self.context = context
self.finished = finished
def dataReceived(self, bytes):
self.body += bytes
def connectionLost(self, reason):
self.context.update({
'length': len(self.body),
'content': self.body,
})
print("got '%s' with '%d' bytes " %
(self.context['url'], self.context['length']))
self.finished.callback(self.context)
def handleError(error, context):
context.update({
'msg': error.getErrorMessage(),
'trace': error.getTraceback(),
'err': error,
})
# cancel a possible timeout
if 'timeoutCall' in context:
if context['timeoutCall'].active():
context['timeoutCall'].cancel()
return context
def showResponses(result):
resultlist = []
for success, data in result:
if success:
if 'err' in data:
print("%s -> ERR: %s" % (data['url'], data['err']))
else:
print("%s -> Result(%s): %s" %
(data['url'], data['status'], data['content']))
else:
print("? -> %s" % data)
def finish(ign):
print("Stopping reactor.")
reactor.stop()
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment