Skip to content

Instantly share code, notes, and snippets.

@wchargin
Created July 6, 2015 18:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wchargin/5d4fa36dbd2a61429a7c to your computer and use it in GitHub Desktop.
Save wchargin/5d4fa36dbd2a61429a7c to your computer and use it in GitHub Desktop.
source for gae-sandbox-997.appspot.com
version: 1
runtime: python27
api_version: 1
threadsafe: true
handlers:
- url: /_ah/pipeline(/.*)?
script: pipeline.handlers._APP
- url: /.*
script: main.application
import webapp2
import plumber
import logging
import time
from google.appengine.api import taskqueue
from google.appengine.api import memcache
# there are better ways to poll on tasks, but this is a proof-of-concept
MEMCACHE_STATUS_KEY = "PipelineTaskDone"
MEMCACHE_RESULT_KEY = "PipelineTaskResult"
def poll_and_print(request, pipe):
request.response.headers['Content-Type'] = 'text/plain'
result = plumber.poll_on_pipeline(pipe.pipeline_id)
logging.info("Done polling. Result: %s" % result)
if result:
logging.info("Pipeline computation succeeded.")
request.response.write('Computation complete: %s' % result)
else:
logging.error("Pipeline computation timed out.")
request.response.write('Pipeline computation timed out.')
return result
def spawn_and_wait(request):
pipe = plumber.start_pipeline({'first': 2, 'second': 1})
result = poll_and_print(request, pipe)
return result
class MainPage(webapp2.RequestHandler):
def get(self):
self.response.headers['Content-Type'] = 'text/html'
self.response.write('<ul>')
for (name, href) in [
("Start a standalone pipeline", "/pipeline-standalone"),
("Start a pipeline via a TaskQueue task", "/pipeline-task")]:
self.response.write('<li><a href="%s">%s</a></li>' % (href, name))
self.response.write('</ul>')
class PipelineStandalone(webapp2.RequestHandler):
def get(self):
spawn_and_wait(self)
class PipelineTask(webapp2.RequestHandler):
def get(self):
self.response.headers['Content-Type'] = 'text/plain'
memcache.set(MEMCACHE_STATUS_KEY, False)
task = taskqueue.add(url='/pipeline-task-worker')
delay = 1
attempts = 30
for i in xrange(attempts):
logging.info("(%s) Waiting on task...", i)
time.sleep(delay)
if memcache.get(MEMCACHE_STATUS_KEY):
self.response.write("Task done! Pipeline result: %s" %
memcache.get(MEMCACHE_RESULT_KEY))
logging.info("Task done!")
break
else:
self.response.write("Task timed out.")
logging.error("Giving up on task.")
class PipelineTaskWorker(webapp2.RequestHandler):
def post(self):
result = spawn_and_wait(self)
memcache.set(MEMCACHE_RESULT_KEY, result)
memcache.set(MEMCACHE_STATUS_KEY, True)
routes = [
('/', MainPage),
('/pipeline-standalone', PipelineStandalone),
('/pipeline-task', PipelineTask),
('/pipeline-task-worker', PipelineTaskWorker)
]
application = webapp2.WSGIApplication(routes, debug=True)
import pipeline
import pipeline.common
import time
import logging
def start_pipeline(entries):
pipe = TestPipelineRoot(entries)
pipe.start()
return pipe
def poll_on_pipeline(pipeline_id, delay=1, attempts=30):
for i in xrange(attempts):
logging.info("(%s) Waiting on pipeline %s...", i, pipeline_id)
time.sleep(delay)
pipe = pipeline.Pipeline.from_id(pipeline_id)
if pipe.has_finalized:
logging.info("Pipeline done!")
return pipe.outputs.default.value
else:
logging.error("Giving up on pipeline %s.", pipeline_id)
class TestPipelineRoot(pipeline.Pipeline):
def run(self, entries):
logging.info("Root pipeline starting.")
results = {}
for (key, number) in entries.iteritems():
worker = TestPipelineWorker(number)
results[key] = (yield worker)
yield pipeline.common.Dict(**results)
def finalized(self):
# superclass bugs me by sending a bunch of emails
pass
class TestPipelineWorker(pipeline.Pipeline):
def run(self, number):
logging.info("Worker starting with number=%s." % number)
return number + 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment