Created
July 6, 2015 18:20
-
-
Save wchargin/5d4fa36dbd2a61429a7c to your computer and use it in GitHub Desktop.
source for gae-sandbox-997.appspot.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
version: 1 | |
runtime: python27 | |
api_version: 1 | |
threadsafe: true | |
handlers: | |
- url: /_ah/pipeline(/.*)? | |
script: pipeline.handlers._APP | |
- url: /.* | |
script: main.application |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import webapp2 | |
import plumber | |
import logging | |
import time | |
from google.appengine.api import taskqueue | |
from google.appengine.api import memcache | |
# there are better ways to poll on tasks, but this is a proof-of-concept | |
MEMCACHE_STATUS_KEY = "PipelineTaskDone" | |
MEMCACHE_RESULT_KEY = "PipelineTaskResult" | |
def poll_and_print(request, pipe): | |
request.response.headers['Content-Type'] = 'text/plain' | |
result = plumber.poll_on_pipeline(pipe.pipeline_id) | |
logging.info("Done polling. Result: %s" % result) | |
if result: | |
logging.info("Pipeline computation succeeded.") | |
request.response.write('Computation complete: %s' % result) | |
else: | |
logging.error("Pipeline computation timed out.") | |
request.response.write('Pipeline computation timed out.') | |
return result | |
def spawn_and_wait(request): | |
pipe = plumber.start_pipeline({'first': 2, 'second': 1}) | |
result = poll_and_print(request, pipe) | |
return result | |
class MainPage(webapp2.RequestHandler): | |
def get(self): | |
self.response.headers['Content-Type'] = 'text/html' | |
self.response.write('<ul>') | |
for (name, href) in [ | |
("Start a standalone pipeline", "/pipeline-standalone"), | |
("Start a pipeline via a TaskQueue task", "/pipeline-task")]: | |
self.response.write('<li><a href="%s">%s</a></li>' % (href, name)) | |
self.response.write('</ul>') | |
class PipelineStandalone(webapp2.RequestHandler): | |
def get(self): | |
spawn_and_wait(self) | |
class PipelineTask(webapp2.RequestHandler): | |
def get(self): | |
self.response.headers['Content-Type'] = 'text/plain' | |
memcache.set(MEMCACHE_STATUS_KEY, False) | |
task = taskqueue.add(url='/pipeline-task-worker') | |
delay = 1 | |
attempts = 30 | |
for i in xrange(attempts): | |
logging.info("(%s) Waiting on task...", i) | |
time.sleep(delay) | |
if memcache.get(MEMCACHE_STATUS_KEY): | |
self.response.write("Task done! Pipeline result: %s" % | |
memcache.get(MEMCACHE_RESULT_KEY)) | |
logging.info("Task done!") | |
break | |
else: | |
self.response.write("Task timed out.") | |
logging.error("Giving up on task.") | |
class PipelineTaskWorker(webapp2.RequestHandler): | |
def post(self): | |
result = spawn_and_wait(self) | |
memcache.set(MEMCACHE_RESULT_KEY, result) | |
memcache.set(MEMCACHE_STATUS_KEY, True) | |
routes = [ | |
('/', MainPage), | |
('/pipeline-standalone', PipelineStandalone), | |
('/pipeline-task', PipelineTask), | |
('/pipeline-task-worker', PipelineTaskWorker) | |
] | |
application = webapp2.WSGIApplication(routes, debug=True) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pipeline | |
import pipeline.common | |
import time | |
import logging | |
def start_pipeline(entries): | |
pipe = TestPipelineRoot(entries) | |
pipe.start() | |
return pipe | |
def poll_on_pipeline(pipeline_id, delay=1, attempts=30): | |
for i in xrange(attempts): | |
logging.info("(%s) Waiting on pipeline %s...", i, pipeline_id) | |
time.sleep(delay) | |
pipe = pipeline.Pipeline.from_id(pipeline_id) | |
if pipe.has_finalized: | |
logging.info("Pipeline done!") | |
return pipe.outputs.default.value | |
else: | |
logging.error("Giving up on pipeline %s.", pipeline_id) | |
class TestPipelineRoot(pipeline.Pipeline): | |
def run(self, entries): | |
logging.info("Root pipeline starting.") | |
results = {} | |
for (key, number) in entries.iteritems(): | |
worker = TestPipelineWorker(number) | |
results[key] = (yield worker) | |
yield pipeline.common.Dict(**results) | |
def finalized(self): | |
# superclass bugs me by sending a bunch of emails | |
pass | |
class TestPipelineWorker(pipeline.Pipeline): | |
def run(self, number): | |
logging.info("Worker starting with number=%s." % number) | |
return number + 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment