Skip to content

Instantly share code, notes, and snippets.

@jolynch
Created September 20, 2016 08:08
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jolynch/b9761f829db9f511b70d1b5fdf18ac69 to your computer and use it in GitHub Desktop.
Save jolynch/b9761f829db9f511b70d1b5fdf18ac69 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import logging
import gevent
import gevent.event
import time
logger = logging.getLogger(__name__)
class Speculate(object):
"""
Forwards all incoming requests to client. Tees a configurable portion
of incoming requests to another client, but the result is ignored.
This class is primarily useful for dark launching new clients.
:param toggle: staticconf key to load toggle weight from
:param client: The client used to actually handle the request
:param pool_size: Maximum number of speculations to process at once
"""
def __init__(self, client, speculate_after_s, max_speculations=1):
self.client = client
self._speculate_after_s = speculate_after_s
self._max_speculations = min(max_speculations, 3)
self._num_speculated = 0
def _handle(self, method_name, query):
"""Returns a dictionary with the client result under 'client_result'
and the thread computing the tee client result under 'tee_thread',
if a tee is attempted for this request."""
greenlets = []
result = gevent.event.AsyncResult()
def done_callback(response_greenlet):
if response_greenlet != greenlets[0]:
self._num_speculated += 1
for greenlet in greenlets:
greenlet.unlink(done_callback)
gevent.killall(greenlets, block=False)
result.set(response_greenlet.value)
speculation_request = gevent.Greenlet(
self.client.handle,
method_name,
query
)
real_request = gevent.Greenlet(
self.client.handle,
method_name,
query
)
greenlets = [real_request, speculation_request]
for greenlet in greenlets:
greenlet.link_value(done_callback)
speculation_request.start_later(self._speculate_after_s)
real_request.start()
res = result.get(1.0)
return res
def handle(self, method_name, query):
return self._handle(method_name, query)
def healthcheck(self):
health, client_info = self.client.healthcheck()
return (health, {
self.__class__.__name__: {
'client': client_info,
'speculated': self._num_speculated
}
})
def exposed_methods(self):
return self.client.exposed_methods()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment