Last active
August 29, 2015 14:02
-
-
Save derwolfe/49cbe3a23db981dff5e6 to your computer and use it in GitHub Desktop.
example of long running task with intermediate status result
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# XXX many of the addBoths should be changed to involve error handling | |
""" | |
longrunning | |
This is an example of how one can develop a system using twisted.web | |
that can process long running results but still provide a short | |
request-response cycle. | |
This implements the following pattern: | |
1) request received by twisted application | |
2) the request is passed to the correct handler | |
3) the handler immediately retuns a response stating: | |
"I have received your request and the response will be | |
available at http://foo.bar/?key=plop" | |
At this point, an intermediate result will exist at http://foo.bar/?key=plop. | |
The result that exists can be polled by the client. | |
4) Begin processing the long running task. | |
5) While result is being processed, the client can poll the resource located | |
at http://foo.bar/?key=plop to see if the calculation exists. | |
To run this example you must have twisted in your path. Type the following | |
to start the server: | |
$ python longrunning.py | |
Once the server is running, issue a request to the server via curl: | |
$ curl -d "number=23" http://127.0.0.1:58888 | |
This will create the initial request and start the calculation. It will also | |
return the location of where the intermediate object exists. To see either | |
the final or immediate result, type: | |
$ curl http://127.0.0.1:58888/?key=23 | |
""" | |
from twisted.internet import reactor, defer, threads | |
from twisted.web.resource import Resource | |
from twisted.web.server import NOT_DONE_YET, Site | |
from twisted.python import log | |
from sys import stdout | |
from time import sleep | |
from decimal import Decimal | |
import json | |
def longRunning(number): | |
""" | |
This is the blocking, long running calculation | |
""" | |
log.msg("begin long running calculation") | |
sleep(20) | |
dec_number = Decimal(number) | |
result = dec_number * dec_number | |
log.msg("finish long running calculation") | |
return result | |
class MultiplyResource(Resource): | |
def __init__(self, results): | |
Resource.__init__(self) | |
self._results = results | |
def render_POST(self, request): | |
""" | |
start a long running calculation | |
""" | |
request.setHeader("content-type", "application/json") | |
def process(request): | |
""" | |
Take a request for a long running calculation and respond with | |
with the url of where the result will eventually reside. | |
""" | |
number = request.args["number"][0] | |
self._results[number] = "notready" | |
url = str(request.URLPath()) + "?key=" + number | |
request.write(json.dumps({"result_at": url})) | |
request.finish() # close the response cycle, but continue to work | |
return number | |
def calculateResult(number): | |
""" | |
Invoke the long running calculation in a thread; once ready | |
save the result to the result set. | |
""" | |
d = threads.deferToThread(longRunning, number) | |
def getResult(result): | |
self._results[number] = result | |
d.addBoth(getResult) | |
return d | |
d = defer.Deferred() | |
d.addBoth(process) | |
d.addBoth(calculateResult) | |
d.callback(request) | |
return NOT_DONE_YET | |
def render_GET(self, request): | |
""" | |
Display the results contained in self._results. This is meant to house | |
both complete and intermediate results. | |
""" | |
request.setHeader("content-type", "application/json") | |
def getArgs(request): | |
""" | |
extract the key argument from the request | |
""" | |
key = request.args.get("key") | |
if key is not None: | |
return key[0] | |
return key | |
def whichOperation(key): | |
""" | |
This resource returns two types of responses: | |
1) A dump of all results held in the system | |
2) The status of a specific result. | |
To determine which result should be returned, whichOperation | |
checks for the presence of a `key` variable. If this variable | |
exists, then whichOperation will return the status of the given key. | |
If no `key` variable is provided, then the request will return the | |
entire result set. | |
If the result stored at a given key is no longer being | |
processed AND has been returned to the client, then the result | |
stored at that key will be deleted. | |
""" | |
if key is None: | |
request.write(json.dumps(self._results)) | |
else: | |
result = self._results.get(key) | |
if result is None: | |
request.setResponseCode(404) | |
request.write(json.dumps({ | |
"processing": False, | |
"status": 404, | |
"answer": None, | |
})) | |
else: | |
if result == "notready": | |
request.write(json.dumps({ | |
"processing": True, | |
"status": 200, | |
"answer": None | |
})) | |
else: | |
request.write(json.dumps({ | |
"processing": False, | |
"status": 200, | |
"answer": float(result) | |
})) | |
del self._results[key] | |
request.finish() | |
d = defer.Deferred() | |
d.addBoth(getArgs) | |
d.addBoth(whichOperation) | |
d.callback(request) | |
return NOT_DONE_YET | |
def main(): | |
log.startLogging(stdout) | |
port = 58888 | |
root = Resource() | |
root.putChild('', MultiplyResource({})) | |
reactor.listenTCP(port, Site(root)) | |
reactor.run() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment