Skip to content

Instantly share code, notes, and snippets.

Last active May 26, 2022 02:05
Show Gist options
  • Save alangpierce/f0ad63643b446a4f84ad to your computer and use it in GitHub Desktop.
Save alangpierce/f0ad63643b446a4f84ad to your computer and use it in GitHub Desktop.
BigBingo (as of early July 2014)
"""Alternative deciders that can be used in BigBingo experiments.
An alternative decider is a function that determines which user gets which
alternative in a BigBingo experiment. It takes as arguments the experiment
itself and the bingo ID and returns the chosen alternative. See the Experiment
class in bigbingo/ for more info.
The alternative decider should take the following arguments:
experiment: A config.Experiment object containing all information about
the experiment (most notably, an OrderedDict mapping alternative
name to weight).
bingo_id: A string with the ID for the given user.
It should return one of the experiment alternative names, or None if the user
should not participate in the experiment. It should be a deterministic
function of experiment and bingo_id.
import hashlib
import intl.request
def legacy(experiment, bingo_id):
"""An alternative decider function that emulates GAE/Bingo.
This function should match modulo_choose in gae_bingo/ This
decider should not be used for new experiments; it only exists for
experiments that were ported from GAE/Bingo to BigBingo and are still
alternatives_weight = sum(experiment.alternative_weights.itervalues())
sig = hashlib.md5(
experiment.legacy_hashable_name + bingo_id).hexdigest()
sig_num = int(sig, base=16)
index_weight = sig_num % alternatives_weight
current_weight = alternatives_weight
alternative_data = [
(index, alt, weight)
for (index, (alt, weight))
in enumerate(experiment.alternative_weights.iteritems())]
for _, alt, weight in sorted(
key=lambda (alt_index, _, alt_weight): (alt_weight, alt_index),
current_weight -= weight
if index_weight >= current_weight:
return alt
assert False, "Didn't find a suitable alternative."
def english_only(experiment, bingo_id):
"""Test the experiment on a (deterministic) pseudorandom set of
English-speaking users.
if intl.request.locale_for_mo() == 'en':
return by_weights(experiment, bingo_id)
return None
def by_weights(experiment, bingo_id):
"""Test the experiment on a (deterministic) pseudorandom set of users."""
total_weight = sum(experiment.alternative_weights.itervalues())
weight_index = hash_on_interval( + bingo_id) * total_weight
current_index = 0
for alternative, weight in experiment.alternative_weights.iteritems():
if current_index <= weight_index < current_index + weight:
return alternative
current_index += weight
assert False, 'Hash was out of bounds when computing BigBingo alternative.'
def limited_to_range(decider, range_start, range_end):
"""Return a decider that only allows a predictable set of users.
This is useful for making experiments that are mutually exclusive with each
other and re-using the participants of one experiment in a later
experiment, since the assignment of bingo_id to location in the [0, 1)
range is experiment-independent.
Example: limited_to_range(english_only, 0.2, 0.4) returns an alternative
decider that only allows a specific 20% of users to be in the experiment,
and of those, only allows english users. If you add another experiment with
decider limited_to_range(by_weights, 0.4, 0.5), English and non-English
users will be included and the experiment will be disjoint with the first
decider: The alternative decider to use if the user is in the
range_start: The inclusive start of the range, between 0 and 1.
range_end: The exclusive end of the range, between 0 and 1.
assert 0 <= range_start < range_end <= 1
def wrapper_decider(experiment, bingo_id):
if range_start <= hash_on_interval(bingo_id) < range_end:
return decider(experiment, bingo_id)
return None
return wrapper_decider
def hash_on_interval(value):
"""Given a string, return a hash of the string in the interval [0, 1).
For example, you can multiply the result by n to get a deterministic hash
that's a float value between 0 (inclusive) and n (exclusive).
The common way to use this is to call hash_on_interval(base + bingo_id), in
which case the right way to think about it is that each bingo_id for a base
gets a random (but deterministic) point on the "hash space", the full set
of hash space mappings is re-randomzied for each base.
# Using a cryptographic hash function is overkill here, but it looks like
# Python doesn't have a good non-cryptographic hash function that's stable
# across versions, and MD5 is still plenty fast for our use case.
hash_str = hashlib.md5(value).hexdigest()
hash_uint32 = int(hash_str, 16) & 0xFFFFFFFF
return float(hash_uint32) / (1 << 32)
Snapshot of Khan Academy's BigBingo A/B testing framework and related code.
Here's a basic overview: is the most interesting file. It contains all stages of the
summarize task, as well as the publish and archive steps that happen at the
end. contains lots of useful pipelines for interacting with
BigQuery. QueryToTableBatchPipeline can run many simultaneous queries, and will
properly handle all batching and retry logic. is where all experiment configuraiton lives. For this Gist, I
replaced the big list of active and archived experiments with simple examples.
The summarize task depends heavily on the App Engine Pipeline API
( ), more specifically, on KA's
fork of it here: .
"""API for interacting with BigBingo."""
from __future__ import absolute_import
import logging
from bigbingo import config
from bigbingo import log
import cookie_util
def ab_test(experiment_id, bingo_id):
"""Participates the given user in the given experiment.
Logs a participation event and determines which alternative to use for this
user. If this is the first participation event for this experiment and
user, the time is used as the user's participation time for the experiment
and conversions start being tracked for the user; otherwise, the BigBingo
summarizer will ignore the participation event.
experiment_id: A string name of the ID of the experiment.
bingo_id: A string identity for the user that should participate in the
A string name of the alternative for this user in the experiment.
experiment = config.get_active_experiment_by_id(experiment_id)
if not isinstance(bingo_id, basestring):
'When calling bigbingo.ab_test on the experiment %s, the bingo_id '
'was %s. Non-strings were allowed in GAE/Bingo, but is no longer '
'allowed in BigBingo, so the event will be ignored.' %
(experiment_id, bingo_id))
return experiment.control
# If the user set the alternative in a cookie, don't log it as a real
# participation event, just return the alternative value.
cookie_alternative = _get_cookie_alternative(experiment)
if cookie_alternative is not None:
'bigbingo.ab_test was forced to return alternative "%s" for '
'experiment "%s" (bingo_id "%s") due to a cookie override. No '
'participation event will be logged.' %
(cookie_alternative, experiment_id, bingo_id))
return cookie_alternative
alternative = experiment.choose_alternative(bingo_id)
if alternative is None:
# The user should not participate in this experiment, so return the
# control and don't log anything.
return experiment.control
log.log_participation_event(bingo_id, experiment.logged_name,
return alternative
def get_alternative_for_user(experiment_id, bingo_id):
"""Determine which alternative a user would be in.
This is similar to ab_test, but doesn't actually enter the user into the
experiment. This is mostly useful for "passive" systems that want to get
the information without disturbing anything, so this function is also
designed to swallow and log exceptions rather than propagating them in most
experiment_id: An ID of a valid BigBingo experiment.
bingo_id: A string with the identity of the current user.
A string with the alternative to return, or None if there was a
problem getting it.
experiment = config.get_active_experiment_by_id(experiment_id)
except KeyError:
logging.error('Tried to get the alternative for an experiment that is '
'not an active experiment: "%s".', experiment_id)
return None
# Any well-written choose_alternative function should never crash, but
# we want to be resistant to crashes in it anyway, which could especially
# come up if the bingo_id is accidentally None.
alternative = experiment.choose_alternative(bingo_id)
except Exception:
'Error when trying to determine alternative for bingo_id "%s" in '
'BigBingo experiment "%s".' % (bingo_id, experiment_id))
return None
return alternative or experiment.control
def _get_cookie_alternative(experiment):
"""Return any alternative override that the user may have set.
Returns either a valid alternative for the experiment or None if there
wasn't a valid alternative set.
alt_index = cookie_util.get_cookie_value('bingo_force_' +
if not alt_index:
return None
return experiment.alternatives[int(alt_index)]
except (IndexError, ValueError):
logging.warn('Cookie overriding experiment %s specified index %s, '
'which is not valid. Ignoring.' %
(, alt_index))
return None
def mark_conversion(conversion_id, bingo_id):
"""Indicates that the given user has triggered the given conversion.
The BigBingo summarizer will count the conversion event toward all
experiments that the given user is currently a part of.
conv = config.get_source_conversion_by_id(conversion_id)
except KeyError:
conv = (try_conversion_id(conversion_id + '_count') or
try_conversion_id(conversion_id + '_binary'))
if conv:
'The BigBingo conversion "%s" is invalid, but we found one to '
'use instead: "%s".' % (conversion_id,
logging.error("The conversion \"%s\" was used, but is not a valid "
"BigBingo conversion. We'll treat the conversion_id "
"as if it's a valid logged_name.", conversion_id)
log.log_conversion_event(bingo_id, conversion_id)
def try_conversion_id(conversion_id):
return config.get_source_conversion_by_id(conversion_id)
except KeyError:
return None
"""This module makes it easy for other modules to connect to BigQuery."""
import json
import logging
import os
import re
import time
from third_party.google_api_python_client import httplib2
from third_party.google_api_python_client.apiclient import (
discovery as apiclient_discovery,
errors as apiclient_errors,
from third_party.google_api_python_client.oauth2client import (
appengine as oauth2client_appengine,
client as oauth2client_client,
import ka_globals
import ka_root
# Hard-coded project ID.
# Keywords that BigQuery will refuse to parse as column names.
BQ_RESERVED_KEYWORDS = frozenset(["profile"])
class BigQueryError(Exception):
class BigQueryService(object):
"""A wrapper for the BigQuery Python client service.
This page enumerates the calls available to the API:
It doesn't describe the interface for making calls. Do so like this:
service = BigQueryService.get_service()
request = service.tables().list(<ARGS>)
response = request.execute()
# response is a Python object representation of the JSON response.
def get_service(cls):
if not cls._SERVICE:
# Service creation does an HTTP fetch so we only do it one
# per instance and cache the service object.
credentials = get_credentials()
# Use a timeout higher than the default 5 seconds since this
# service is currently only used for background jobs, not
# user-facing requests, and batching and BigQuery throttling can
# make us exceed that default.
http = httplib2.Http(timeout=30)
http = credentials.authorize(http)
cls._SERVICE ='bigquery', 'v2',
return cls._SERVICE
def get_credentials():
if ka_globals.is_dev_server:
return oauth2client_client.AccessTokenCredentials(
get_dev_access_token(), 'log2bq/0.1')
return oauth2client_appengine.AppAssertionCredentials(
def get_dev_access_token():
"""Gets the developer's BigQuery access token for use in a dev environment.
You can get a token by first installing the BigQuery command line tool "bq"
in a new virtual environment (to avoid conflicts with App Engine):
virtualenv ~/.virtualenv/bigquery_env
source ~/.virtualenv/bigquery_env/bin/activate
pip install bigquery
bq ls
(If you don't yet have access to BigQuery, you'll need to ask someone for
access). The first time you run "bq ls", it will provide instructions for
authenticating, then create a JSON file called .bigquery.v2.token in your
home directory, which contains the access token as one of its fields.
Running "bq ls" again (in the right virtualenv) will refresh that token.
The most robust approach is to have a file called bigquery_token in the
webapp directory contains the contents of the .bigquery.v2.token file that
is created/updated in your home directory when running "bq ls". You can
accomplish this using a symlink or by making a shell script to both run "bq
ls" and copy the token file to the right place. (See (We can't read
this file directly since the app engine sandbox disallows reading files
outside of webapp and also disallows reading files starting with a dot.)
You can also provide the access token more directly by setting the
BQ_ACCESS_TOKEN environment variable in app.yaml or by temporarily changing
this code to return the access token as a string literal.
env_token = os.getenv('BQ_ACCESS_TOKEN')
if env_token:
return env_token
token_file_path = ka_root.join('bigquery_token')
if not os.path.isfile(token_file_path):
raise BigQueryError('You do not have a BigQuery token configured. '
'See get_dev_access_token in for '
'instructions on how to configure one.')
with open(token_file_path, 'r') as token_file:
token_config_obj = json.load(token_file)
return token_config_obj['access_token']
def ensure_bigquery_dataset_exists(name, description=""):
"""Create a new bigquery dataset if it does not already exist.
Datasets contain tables. When loading a new table, the
dataset must be specified. If the specified dataset does
not already exist then the call will fail.
bigquery: the BigQuery service interface.
name: the unique name of the dataset (alphanumeric or _)
description: (optional) string describing the dataset
# Setup a dataset creation request
request = BigQueryService.get_service().datasets().insert(
projectId=BQ_PROJECT_ID, body={
"datasetReference": {
"datasetId": name,
"projectId": BQ_PROJECT_ID,
"friendlyName": name,
"description": description
# Try to create the dataset
# Catch the error that is returned if the dataset already exists
# pass forward all other errors
except apiclient_errors.HttpError, e:
code = json.loads(e.content)['error']['code']
table_already_exists_code = 409
if code == table_already_exists_code:
def ensure_bigquery_table_exists(
dataset, table, new_schema, create_if_necessary=False,
patch_schema_if_necessary=False, ttl_days=None):
"""Makes sure that the given table exists and has the given columns.
The schema check does not check for an exact schema match; instead it
ensures that all fields in the given schema exist within the given table.
By default, an exception is thrown if the given table does not exist or
does not match.
dataset: The name of the dataset of the table to check/create.
table: The name (not including the dataset) of the table to
new_schema: A dict containing the schema to use when creating the
table, or the schema we expect the table to have if the table
already exists. The schema format is specified in the bigquery docs
(a link to them is in the BigQueryService docstring) for the
bq_service.tables().insert(...) call. For example, here's a schema
with a single nullable string, 'foo':
{'fields': [{'name': 'foo', 'type': 'STRING', 'mode', 'NULLABLE'}]}
Note that all three components of each field should be specified,
or else the schema check will fail if the table already exists.
create_if_necessary: If the table does not exist, we create it with the
given schema rather than throwing an exception.
patch_schema_if_necessary: If the table exists but does not contain
the given schema, it is patched to include the given schema. Note
that BigQuery does not allow columns to be removed with a patch
operation, so any previously-existing columns not in new_schema are
moved to the end of the table. Also, a failure can still occur here
if the schema update is invalid (for example, changing an integer
column to a string column). Not all valid patch options are
currently supported: in particular, the operation will fail if the
mode of a column is relaxed or if a column is added to a nested
record, even though both types of operations are supported by
ttl_days: If present and this operation creates a new table, the table
is created with an expiration time that makes it expire after this
many days.
old_schema = get_table_schema(dataset, table)
if not old_schema:
# Table does not exist.
if create_if_necessary:
create_empty_table(dataset, table, new_schema, ttl_days)
raise BigQueryError('Expected table %s.%s to exist.' %
(dataset, table))
# Table exists, so validate the schema.
if schema_is_subset(new_schema, old_schema):
# Schema mismatch, so error or fix.
if patch_schema_if_necessary:
patch_schema_to_table(dataset, table, old_schema, new_schema)
raise BigQueryError(
'Schema mismatch for table %s.%s. Expected schema %s to '
'contain %s.' % (dataset, table, old_schema, new_schema))
def schema_is_subset(subset_schema, full_schema):
"""Returns true if the given schema is a subset of the full schema."""
full_schema_fields_by_name = {
field['name']: field for field in full_schema['fields']}
for subset_field in subset_schema['fields']:
full_field = full_schema_fields_by_name.get(subset_field['name'])
if full_field is None or subset_field != full_field:
return False
return True
def patch_schema_to_table(dataset, table, old_schema, patch_schema):
"""Patch the given table to include the given schema.
If the patch_schema removes any fields from old_schema, the old fields are
kept and put at the end of the schema.
dataset: The dataset of the table to patch.
table: The table to patch.
old_schema: The current schema of the table to patch.
patch_schema: A schema containing the new fields.
body={'schema': get_schema_for_patch(old_schema, patch_schema)}
def get_schema_for_patch(old_schema, patch_schema):
"""Returns the schema to send to patch patch_schema onto old_schema."""
patch_schema_names = {field['name'] for field in patch_schema['fields']}
new_schema_fields = list(patch_schema['fields'])
for old_field in old_schema['fields']:
if old_field['name'] not in patch_schema_names:
return {
'fields': new_schema_fields
def get_table_schema(dataset, table):
"""If the table exists, returns its schema. Otherwise, returns None."""
table_service = BigQueryService.get_service().tables()
get_result = table_service.get(
return get_result['schema']
except apiclient_errors.HttpError as e:
# Return None if the table doesn't exist.
if json.loads(e.content)['error']['code'] == 404:
return None
def get_leaf_column_selectors(dataset, table):
"""Parses the table's schema to generate a list of column selectors.
BigQuery tables may have record fields, which have a hierarchical schema.
Each subfield is delimited with a dot, but BigQuery views cannot have dots
in their names. So, instead of defining the view like:
You have to define it like:
__key__.namespace as __key___namespace,
__key__.path as __key___path
For more information, see
schema = get_table_schema(dataset, table)
if not schema:
raise BigQueryError('Expected table %s.%s to exist.' % (
dataset, table))
return ",\n".join([
_get_leaf_selectors("", top_field)
for top_field in schema["fields"]
def _get_leaf_selectors(prefix, field, depth=0):
"""Recursive helper for get_leaf_column_selectors()"""
field_name = field["name"]
if prefix:
field_name = prefix + "." + field_name
if 'fields' not in field:
if "." in field_name:
# If we translate to user_email, the user_email field
# may already exist as a top-level field, so prepend an underscore
# to signify this is a record-turned-regular field. There shouldn't
# be any top-level actual fields that start with an underscore.
safe_name = field_name.replace(".", "_")
return "%s as _%s" % (field_name, safe_name)
elif depth == 0 and field_name.lower() in BQ_RESERVED_KEYWORDS:
# Reserved keywords mess up BigQuery's SQL parsing, so make sure we
# don't use the bare keyword as the column name
return "[%s]" % field_name
return field_name
# Recursive case
return ",\n".join([
_get_leaf_selectors(field_name, sub_field, depth + 1)
for sub_field in field["fields"]
def delete_table_if_exists(dataset, table):
"""If the table exists, delete it. Otherwise, do nothing."""
table_service = BigQueryService.get_service().tables()
except apiclient_errors.HttpError as e:
# Return (do nothing) if the table doesn't exist.
if json.loads(e.content)['error']['code'] == 404:
def create_empty_table(dataset, table, schema, ttl_days=None):
"""Create the given table
An exception will be thrown if the table already exists.
dataset: The name of the dataset of the table to check/create.
table: The name (not including the dataset) of the table to
schema: A dict describing the schema of the new table. See the
docstring on ensure_bigquery_table_exists for a description of the
dict format.
ttl_days: If present, the table is set to expire after this many days.
table_service = BigQueryService.get_service().tables()
'tableReference': {
'projectId': BQ_PROJECT_ID,
'datasetId': dataset,
'tableId': table
'schema': schema,
# We could technically set the expiration time in the regular request that
# creates the table, but setting it after-the-fact makes the code simpler.
if ttl_days is not None:
set_table_ttl_in_days(dataset, table, ttl_days)
def create_or_update_view(dataset, view_name, query):
"""Create a BigQuery view from from the given query.
If the table already exists, it is changed to a view (if necessary) and the
query for the view is updated to the given query.
dataset: The dataset where the view will go.
view_name: The name of the view to create or update.
query: A string with the SQL for the view.
table_service = BigQueryService.get_service().tables()
table_body = {
'tableReference': {
'projectId': BQ_PROJECT_ID,
'datasetId': dataset,
'tableId': view_name
'view': {
'query': query
except apiclient_errors.HttpError as e:
if json.loads(e.content)['error']['code'] == 409: # Already exists
logging.error("Failed with query: %s", query)
def set_table_ttl_in_days(dataset, table, num_days):
"""Set the table to expire after the given number of days.
Tables last forever by default, but this sets a time-to-live so that
BigQuery will later delete the table. This function relies on the system
clock to compute an expiration time.
dataset: The dataset containing the table to modify.
table: The name of the table to modify.
num_days: A number (not necessarily an integer) with the number of days
to keep the table around.
assert num_days > 0
current_time_ms = time.time() * 1000
time_to_live_ms = num_days * 24 * 60 * 60 * 1000
expiration_time_ms = int(current_time_ms + time_to_live_ms)
body={'expirationTime': expiration_time_ms}
def _get_all_pages(query_function, parameters, key_to_merge):
"""Pages through results returned by the BigQuery API
query_function - the function to call, such as datasets().list
parameters - the initial parameters to send to the function. This will
be augmented with the page tokens if there are multiple pages.
key_to_merge - the key within the dictionary of results that contains
an array of objects to merge together to form the final list.
A really long list in the format of the original API call.
def get_list(the_result):
# Sometimes the query is valid, but returns nothing. In this case the
# key is not present, so we should just return an empty list.
if key_to_merge not in the_result:
return []
return the_result[key_to_merge]
result = query_function(**parameters).execute()
final_result = {
key_to_merge: get_list(result)
while "nextPageToken" in result:
parameters["pageToken"] = result["nextPageToken"]
result = query_function(**parameters).execute()
return final_result
def get_all_backup_dataset_names():
"""Return the names of all datastore backup datasets.
backup_dataset_names = []
parameters = {
"projectId": BQ_PROJECT_ID
datasets_list = _get_all_pages(
BigQueryService.get_service().datasets().list, parameters, "datasets")
for d in datasets_list['datasets']:
dataset_name = d['datasetReference']['datasetId']
# Must match YYYY_MM_DD format
if'(^[0-9]{4}_[0-9]{2}_[0-9]{2}$)', dataset_name):
return backup_dataset_names
def get_all_tables_in_dataset(dataset):
"""Return the names of all a dataset's tables, or None for a bad dataset.
Note that there is currently no way to disambiguate between a view and a
table, so both are returned.
parameters = {
"projectId": BQ_PROJECT_ID,
"datasetId": dataset,
"maxResults": 500,
tables_list = _get_all_pages(
BigQueryService.get_service().tables().list, parameters, "tables")
return [table['tableReference']['tableId']
for table in tables_list['tables']]
except apiclient_errors.HttpError as e:
# Return None if the dataset doesn't exist.
if json.loads(e.content)['error']['code'] == 404:
return None
def get_most_recent_backup_dataset():
"""Return the name of the most recent datastore backup dataset."""
backup_names = get_all_backup_dataset_names()
if backup_names:
return sorted(backup_names)[-1]
return None
"""Useful pipeline classes for interacting with BigQuery."""
import json
import logging
import os
import time
from third_party.google_api_python_client.apiclient import (
errors as apiclient_errors,
http as apiclient_http
from third_party.mapreduce.lib.pipeline import pipeline
from third_party.mapreduce.lib.pipeline import common
from bigquery import bq_connection
# In some situations, we want to submit lots of API requests at once. Batching
# helps with this, but BigQuery will give an error if we send way too many API
# requests in the same batch, so we limit the number of requests we're willing
# to send at once.
# When hitting retriable errors (rate limit exceeded or internal BigQuery
# errors) when submitting HTTP requests, automatically retry up to this many
# times.
# Note that automatic pipeline retry accomplishes some of this, but this HTTP
# retry works better because only failed requests in a batch are retried rather
# than retrying the whole batch. Also, since we limit retrying to definitely-
# retriable errors, it's ok to be more aggressive about retrying.
# Sets a limit on the amount of time we're willing to wait before giving up on
# a query. This setting is necessary because sometimes, BigQuery will spend a
# very long time on a small number of queries. For example, there have been
# times where 50 queries are submitted at the same time and 48 of those queries
# finish within a few minutes, but the remaining two take multiple hours to
# finish. In these cases, simply submitting another copy of the two stalled
# queries would work, so we treat the time limit as a retriable error.
# TODO(alan): This time limit used to be 15 minutes, but I disabled the feature
# by setting it to None to work around a correctness issue in BigQuery that
# happens when you submit the same query twice:
# If we keep running into issues, it may make sense to set this time limit
# again (potentially to a number higher than 15 minutes), and if we stop
# running into the issue, we may want to remove this feature completely.
# Initial amount of time, in seconds, to sleep when waiting for a job to finish
# (assuming it's not done immediately).
# Increase the sleeping time by the following factor when waiting for a job to
# finish. Back off exponentially so the task hierarchy doesn't get ridiculously
# deep in the pipeline viewer when we wait on a long-running job like a batch
# query.
class ReturnOutputs(pipeline.Pipeline):
"""General-purpose mechanism for returning named outputs.
This makes it more convenient to return pipeline results as outputs, since
the normal self.fill mechanism requires returning already-resolved values,
not pipeline futures. To use this, this pipeline should be yielded last.
Also, any outputs specified will be ignored unless they are specified in
the output_names list of the calling pipeline.
For example, this code snippet lets you combine the results from two child
pipelines into three outputs (one default and two named):
default_and_foo = yield ComputeDefaultAndFoo()
bar = yield ComputeBar()
yield ReturnOutputs(default_and_foo,, bar=bar)
Note that this pipeline is unnecessary if the last pipeline yielded already
returns outputs with the desired names and values.
def run(self, default_output, **kwargs):
for key in kwargs:
self.fill(key, kwargs[key])
return default_output
class ListIndex(pipeline.Pipeline):
"""Returns the element at the given index from the given list."""
def run(self, input_list, index):
return input_list[index]
class SmallResultQueryPipeline(pipeline.Pipeline):
"""Pipeline for running a query and getting its results.
This pipeline is convenient if the results are small (e.g. if you're
querying an aggregate) and you want to view the results directly in Python
rather than sending the results to a table.
query: A string with the text of the query to run.
The query results as returned in the getQueryResults method in the jobs
section of the BigQuery API. For example,
results['rows'][0]['f'][0]['v'] gets the first field of the first row.
def run(self, query):
job_metadata_list = yield RunJobBatchPipeline([{
'query': {
'query': query
job_metadata = yield ListIndex(job_metadata_list, 0)
yield GetQueryResultsPipeline(job_metadata)
class GetQueryResultsPipeline(pipeline.Pipeline):
"""Fetches and returns the actual query results for a job.
job_metadata: A dict containing information about the job to get query
results for, as returned by insert or get in the jobs section of
the BigQuery API.
The query results as returned in the getQueryResults method in the jobs
section of the BigQuery API. For example,
results['rows'][0]['f'][0]['v'] gets the first field of the first row.
def run(self, job_metadata):
job_service = bq_connection.BigQueryService.get_service().jobs()
return job_service.getQueryResults(
class QueryToTablePipeline(pipeline.Pipeline):
"""Pipeline for running a query and sending the output to a table.
This pipeline will submit a query and will not mark itself as done until
the query has either finished or failed.
query: A string with the text of the query to run.
output_dataset: The dataset of the output table.
output_table: The name of the output table.
create_if_needed: If true (the default), we create the table (rather
than failing) if it doesn't already exist.
append_to_table: If false (the default), we overwrite the table
contents rather than appending to the end of the table.
priority: A string, either 'INTERACTIVE' or 'BATCH'. In both cases, the
query will be submitted asynchronously and the pipeline will wait
until the query completes, but interactive queries generally run
faster and are subject to rate limits, while batch queries are
marked as lower priority in BigQuery's scheduler and have no
restriction on the number of concurrent queries. The default
priority is BATCH since pipelines are most commonly run in
background jobs where stability and isolation (i.e. not affecting
humans using the BigQuery webapp) are more important than low
allow_large_results: Setting to True (the default) may slow the query
down, but allows the query to return more than 128MB of results.
result_table_ttl_days: If specified, the resulting table is
asynchronously set to expire after the given number of days.
table_desc: A string with a human-readable description of this BigQuery
table. If this is None, we do not set the table description.
schema_desc: A list of dictionaries, where each dictionary is of the
form as described in [1] in the patch/body/schema/fields section.
If this is None, we do not set the schema descriptions.
default: Metadata describing the status of the query job (the return
value of the "get" function in the "jobs" section of the BigQuery
cost: The number of cents spent on the query.
output_names = ['cost']
def run(self, query, output_dataset, output_table, create_if_needed=True,
append_to_table=False, priority=DEFAULT_QUERY_PRIORITY,
allow_large_results=True, result_table_ttl_days=None,
table_desc=None, schema_desc=None):
job_results = yield RunJobBatchPipeline(
[query_to_table_job_config(query, output_dataset, output_table,
create_if_needed, append_to_table,
priority, allow_large_results)],
if result_table_ttl_days is not None:
with pipeline.After(job_results):
yield SetTableTTLInDaysPipeline(
output_dataset, output_table, result_table_ttl_days)
if table_desc is not None or schema_desc is not None:
with pipeline.After(job_results):
yield UpdateTableInfoPipeline(
output_dataset, output_table, table_desc, schema_desc)
cost = yield GetCostPipeline(job_results)
job_result = yield ListIndex(job_results, 0)
yield ReturnOutputs(job_result, cost=cost)
class QueryToTableBatchPipeline(pipeline.Pipeline):
"""Batched version of QueryToTablePipeline.
This pipeline lets you submit many queries at the same time and block until
all of them have finished. The API requests are batched, so this version is
more performant, and the requests are automatically throttled as necessary
to avoid hitting BigQuery's request limit.
query_specs: A list of dictionaries, each of which describes a query
to make and the table that it should output to. The dictionary keys
are the same as the arguments to QueryToTablePipeline, although
result_table_ttl_days is not yet supported for this pipeline.
default: A list of job results, each of which has the same format as
the return value of
cost: The combined cost of all queries.
output_names = ['cost']
def run(self, query_specs):
job_configs = [query_to_table_job_config(**spec)
for spec in query_specs]
results = yield RunJobBatchPipeline(
job_configs, time_limit=QUERY_TIME_LIMIT_SECONDS)
cost = yield GetCostPipeline(results)
yield ReturnOutputs(results, cost=cost)
def query_to_table_job_config(
query, output_dataset, output_table, create_if_needed=True,
append_to_table=False, priority=DEFAULT_QUERY_PRIORITY,
"""Gets the 'configuration' dict to use when starting a BigQuery query.
See the documentation for for more
"""'Submitting query %s' % query)
create_mode = 'CREATE_IF_NEEDED' if create_if_needed else 'CREATE_NEVER'
# Note that the API also allows for a WRITE_EMPTY option where we
# expect the table to be empty, and there's no reason we can't add
# support for that here if we need to.
write_mode = 'WRITE_APPEND' if append_to_table else 'WRITE_TRUNCATE'
return {
'query': {
'query': query,
'priority': priority,
'destinationTable': {
'projectId': bq_connection.BQ_PROJECT_ID,
'datasetId': output_dataset,
'tableId': output_table
'createDisposition': create_mode,
'writeDisposition': write_mode,
'allowLargeResults': allow_large_results,
class GetCostPipeline(pipeline.Pipeline):
"""Given a list of query metadata, compute the total cost of all queries.
job_infos: A list of dicts, each containing information about a query
job. The format is the return format for, as
documented in the BigQuery API (see the documentation on
BigQueryService for a link).
Returns: The combined cost of all queries, in cents.
def run(self, job_infos):
result = 0
for job_info in job_infos:
num_bytes = int(job_info['statistics']['query']
# Queries cost 0.5 cents per GB of data touched.
result += 0.5 * num_bytes / (2 ** 30)
return result
class CopyTablePipeline(pipeline.Pipeline):
"""Copies the contents of the source table to the destination table.
Both tables are specified by their dataset name and table name. If
append_to_table is False and the destination table already exists, it is
overwritten with the contents of the source table.
This pipeline is not marked as finished until the copy job is complete.
def run(self, src_dataset, src_table, dest_dataset, dest_table,
append_to_table=False, result_table_ttl_days=None):
result_list = yield RunJobBatchPipeline([copy_table_job_config(
src_dataset, src_table, dest_dataset, dest_table, append_to_table)
if result_table_ttl_days is not None:
with pipeline.After(result_list):
yield SetTableTTLInDaysPipeline(
dest_dataset, dest_table, result_table_ttl_days)
yield ListIndex(result_list, 0)
class CopyTableBatchPipeline(pipeline.Pipeline):
"""Performs multiple copy operations in a batch.
copy_specs: A list of dictionaries, each of which describes a copy
operation. The dict keys should be the same as the arguments to
def run(self, copy_specs):
yield RunJobBatchPipeline(
[copy_table_job_config(**copy_spec) for copy_spec in copy_specs])
def copy_table_job_config(src_dataset, src_table, dest_dataset, dest_table,
write_mode = 'WRITE_APPEND' if append_to_table else 'WRITE_TRUNCATE'
return {
'copy': {
'sourceTable': {
'projectId': bq_connection.BQ_PROJECT_ID,
'datasetId': src_dataset,
'tableId': src_table
'destinationTable': {
'projectId': bq_connection.BQ_PROJECT_ID,
'datasetId': dest_dataset,
'tableId': dest_table
'createDisposition': 'CREATE_IF_NEEDED',
'writeDisposition': write_mode
class UpdateTableInfoPipeline(pipeline.Pipeline):
"""Updates the descriptions of an existing BigQuery table.
This pipeline assumes that the table already exists and the schema are as
specified. The entire purpose of this class as is is to add a description
for the table itself and/or for the schema in the table.
For an example of how to include descriptions when creating a table, please
refer to bigquery_reports/
TODO(ilan): It would be ideal to automatically generate the schema for our
existing tables based on the docstrings we've already written.
def run(self, dataset_name, table_name, table_desc=None, schema_desc=None):
"""Updates the descriptions for the BigQuery table and its schema.
dataset_name: A string with the ID of the dataset where this table
table_name: A string with the ID of this table.
table_desc: A string with a human-readable description of this
BigQuery table. If this is None, we do not set the
table description.
schema_desc: A list of dictionaries, where each dictionary is of
the form as described in [1] in the patch/body/
schema/fields section. If this is None, we do not set
the schema descriptions.
table_service = bq_connection.BigQueryService.get_service().tables()
# I'm really not sure what to put here, so I'm just copying from
# EnsureTablesDeletedPipeline for now
def handle_result(request_id, response, exception):
# Ignore missing tables
if exception and not (
isinstance(exception, apiclient_errors.HttpError) and
json.loads(exception.content)['error']['code'] == 404):
raise exception
body=patch_table_job_config(dataset_name, table_name,
table_desc, schema_desc))],
def patch_table_job_config(dataset_name, table_name, table_desc=None,
"""Returns the job config dictionary for this table patch method call.
dataset_name: A string with the ID of the dataset where this table
table_name: A string with the ID of this table.
table_desc: A string with a human-readable description of this
BigQuery table. If this is None, we do not set the
table description.
schema_desc: A list of dictionaries, where each dictionary is of
the form as described in [1] in the patch/body/
schema/fields section. If this is None, we do not set
the schema descriptions.
A dictionary of the form required to run the patch job.
result = {
'tableReference': {
'projectId': bq_connection.BQ_PROJECT_ID,
'tableId': table_name,
'datasetId': dataset_name,
if table_desc is not None:
result['description'] = table_desc
if schema_desc is not None:
result['schema'] = {
'fields': schema_desc,
return result
class LoadTablePipeline(pipeline.Pipeline):
"""Loads a table from Cloud Storage.
If the specified table does not already exist, it is created. This pipeline
is not marked as finished until the load job is complete, and the pipeline
is aborted if there is a job failure.
cloud_storage_paths: A list of strings specifying the cloud storage
files to load, e.g. "gs://foo/bar".
dataset: The dataset of the table to write to.
table: The name of the table to write to.
schema: The schema of the table. If the table already exists, this must
be a subset of the existing schema. See the documentation for (the BigQueryService documentation
has a link) for a description of the schema format.
source_format: A string, either NEWLINE_DELIMITED_JSON or CSV.
write_mode: A string, either WRITE_APPEND or WRITE_TRUNCATE.
result_table_ttl_days: If specified, the resulting table is
asynchronously set to expire after the given number of days.
Metadata about the job result, in the format of the return value of
def run(self, cloud_storage_paths, dataset, table, schema, source_format,
write_mode, result_table_ttl_days=None):
result_list = yield RunJobBatchPipeline([{
'load': {
'sourceFormat': source_format,
'sourceUris': cloud_storage_paths,
'schema': schema,
'destinationTable': {
'projectId': bq_connection.BQ_PROJECT_ID,
'datasetId': dataset,
'tableId': table,
'createDisposition': 'CREATE_IF_NEEDED',
'writeDisposition': write_mode,
'encoding': 'UTF-8',
if result_table_ttl_days is not None:
with pipeline.After(result_list):
yield SetTableTTLInDaysPipeline(
dataset, table, result_table_ttl_days)
yield ListIndex(result_list, 0)
class EnsureTablesDeletedPipeline(pipeline.Pipeline):
"""Deletes the tables with the given names in the given dataset.
Any tables that already don't exist are ignored. The delete operation
happens synchronously, so the tables will be deleted when the pipeline is
marked as finished.
def run(self, dataset_name, table_names):
table_service = bq_connection.BigQueryService.get_service().tables()
def handle_result(request_id, response, exception):
# Ignore missing tables
if exception and not (
isinstance(exception, apiclient_errors.HttpError) and
json.loads(exception.content)['error']['code'] == 404):
raise exception
for table_name in table_names],
class SetTableTTLInDaysPipeline(pipeline.Pipeline):
"""Set the table to expire after the given number of days.
See bq_connection.set_table_ttl_in_days for more info.
def run(self, dataset, table, num_days):
bq_connection.set_table_ttl_in_days(dataset, table, num_days)
def should_retry_job(job_info):
"""Return True if the given job failed and is eligible for retry.
Jobs should be retried if they failed due to BigQuery internal errors.
BigQuery job statuses provide a 'reason' error code that helps distinguish
between transient BigQuery errors and bugs in our code or other errors that
are not worth retrying.
job_info: The result type of, which provides
information about the status and other details of a job.
return ('errorResult' in job_info['status'] and
job_info['status']['errorResult']['reason'] in
('internalError', 'backendError', 'quotaExceeded',
class RunJobBatchPipeline(pipeline.Pipeline):
"""Pipeline to run a list of BigQuery job and block on their completion.
configs: A list of job configurations. The format is defined in the
"configuration" section of the documentation for
time_limit: The number of seconds we're willing to wait before
considering a job stalled and trying again. Or None if we should
wait indefinitely. If specified, the jobs passed in should all be
retries_remaining: The number of times we're willing to retry after
this time. This is mostly used internally.
A list of metadata dicts, each of which has the format of the return
value of
def run(self, configs, time_limit=None, retries_remaining=2):
job_service = bq_connection.BigQueryService.get_service().jobs()
job_ids = [None] * len(configs)
def handler(index, response):
job_ids[index] = response['jobReference']['jobId']
'projectId': bq_connection.BQ_PROJECT_ID,
'configuration': config
for config in configs],
# Setting this to be higher than 1 causes things to break. Looks
# like a BigQuery bug.
job_results = yield WaitForJobBatchPipeline(
job_ids, wait_time_seconds=JOB_WAIT_INITIAL_TIME,
yield ConsiderJobRetry(job_results, time_limit, retries_remaining)
class ConsiderJobRetry(pipeline.Pipeline):
"""If any jobs failed and should be retried, retry them.
If any jobs actually are retried successfully, we patch the given
job_results list so that the new successful results replace the old failed
TODO(alan): In the quotaExceeded case, we can be smarter about our response
rather than just blindly retrying with a cap on the number of times. I
think these semantics make the most sense:
-If we're not making progress (every job had quotaExceeded), add in a delay
of 30 seconds or so before retrying to let the system calm down.
-If we're making progress (some jobs were successful, while others had
quotaExceeded), don't decrement the retry count (or maybe reset it) and
retry immediately.
This should make it possible to throw hundreds of queries at
RunJobBatchPipeline and have BigQuery process them as fast as it can
without ever failing, while still protecting against the case where an
individual query fails due to too many queries from other jobs.
job_results: A list of job result dicts (the return value of
time_limit: The job time limit to send to RunJobBatchPipeline.
retries_remaining: The number of times we're willing to retry.
The job_results list, which may be updated with newly-retried job info.
def run(self, job_results, time_limit, retries_remaining):
# List of (index, config) pairs. We need to track the original indexes
# so we can replace the right ones when we finish retrying.
retry_configs = []
for index, job_result in enumerate(job_results):
if should_retry_job(job_result):'The following failure occurred, and will be '
'considered for retry: %s' % job_result)
retry_configs.append((index, job_result['configuration']))
if retry_configs:
if retries_remaining == 0:
raise bq_connection.BigQueryError(
'The following jobs failed and were retried too many '
'times: %s' % [config for _, config in retry_configs])
new_job_results = yield RunJobBatchPipeline(
[config for _, config in retry_configs], time_limit,
retries_remaining - 1)
yield UpdateJobResultsList(job_results, new_job_results,
[index for index, _ in retry_configs])
yield common.Return(job_results)
class UpdateJobResultsList(pipeline.Pipeline):
"""Update some entries in a list of job results with new results.
This is used after retrying some failed jobs to combine their new
successful result values with the values of the jobs that succeeded
original_job_results: A list of job results for the entire batch, some
of which need to be replaced.
new_job_results: A list of job results to replace the original results.
replacement_indices: A list of indices. The nth index is the place in
original_job_results to put new_job_results[n].
A list based on original_job_results, but with some replacements made,
as described by new_job_results and replacement_indices.
def run(self, original_job_results, new_job_results, replacement_indices):
for new_job_result, index in zip(new_job_results, replacement_indices):
original_job_results[index] = new_job_result
return original_job_results
class WaitForJobBatchPipeline(pipeline.Pipeline):
"""Pipeline which waits for a list of BigQuery jobs to finish.
job_ids: A list of BigQuery job IDs to wait on.
wait_time_seconds: The initial amount of time to wait for the job to
finish. This is used to implement exponential backoff (the pipeline
will always wait indefinitely for the jobs to finish).
time_remaining_seconds: Either an integer number of seconds or None. If
not None, this pipeline will stop waiting after that many seconds
have passed and any jobs that are still not done will have their
statuses modified so that they're marked as DONE with a
timeLimitExceeded error (we use the same error format that BigQuery
uses since error-handling code expects that format).
A list of metadata dicts describing the jobs' results, as returned by See the docstring on BigQueryService for a
link to the docs. Since we retry until the jobs are finished, this
metadata will always show the jobs as finished.
def run(self, job_ids, wait_time_seconds, time_remaining_seconds):
job_service = bq_connection.BigQueryService.get_service().jobs()
# This list starts out with all Nones and gets filled in.
completed_results = [None] * len(job_ids)
remaining_job_results = []
remaining_job_indices = []
def handle_result(index, response):
if response['status']['state'] in ('PENDING', 'RUNNING'):
completed_results[index] = response
for job_id in job_ids],
# Compute other_results, an ordered list of job results for jobs that
# we wait for recursively.
if remaining_job_results:
if (time_remaining_seconds is not None and
time_remaining_seconds <= 0):
# If we're out of time, pretend the job finished with an error
# so we retry it. In practice, these long-running jobs can go
# for hours before finishing, so it's generally best to give up
# and try again.
for job in remaining_job_results:
job['status']['state'] = 'DONE'
job['status']['errorResult'] = {
'reason': 'ka:timeLimitExceeded'
other_results = remaining_job_results
# Otherwise, keep waiting.
remaining_job_ids = [job['jobReference']['jobId']
for job in remaining_job_results]
with pipeline.InOrder():
yield common.Delay(seconds=wait_time_seconds)
if time_remaining_seconds is not None:
time_remaining_seconds -= wait_time_seconds
other_results = yield WaitForJobBatchPipeline(
wait_time_seconds * JOB_WAIT_BACKOFF_MULTIPLIER,
other_results = []
yield UpdateJobResultsList(completed_results, other_results,
def run_http_job_requests(
requests, callback,
"""Runs the given HTTP requests relating to BigQuery jobs.
If any HTTP request fails or any request returns a job failure, an
exception is thrown.
requests: A list of HttpRequest objects.
callback: A function taking an index and response parameter. It is
called for each request that is returned. The "index" parameter is
the index of the http request in the requests list, and the
response is the response to the HTTP request.
num_requests_per_batch: The number of requests we're we're willing to
send in the same request batch.
Unlike the more general batch call provided by BatchHttpRequest, the given
callback should only have a single parameter for the response.
def new_callback(request_id, response, exception):
if exception:
raise exception
# If the job failed and is eligible for retry, return the failure
# normally instead of failing now so that the parent pipeline has a
# chance to handle the failure.
if ('errorResult' in response['status'] and
not should_retry_job(response)):
raise bq_connection.BigQueryError(
'Job failed: %s' % response['status']['errorResult'])
index = int(request_id)
callback(index, response)
run_http_requests(requests, new_callback, num_requests_per_batch)
def run_http_requests(
requests, callback,
"""Runs all HttpRequests as a batch with the same callback.
If BigQuery sends back errors because the request rate limit is exceeded,
this function attempts to back off and retry those failures later, although
it eventually will give up.
requests: A list of HttpRequest objects (such as those returned by
function calls on bq_service).
callback: A callback to run on each result. See the documentation on
BatchHttpRequest for more details. The request_ids provided to the
callback will be the string value of the index into the requests
list, but the order in which the callbacks are called is not
guaranteed to be the same as the order of the requests.
num_requests_per_batch: The number of requests we're willing to send in
the same request batch. This throttling exists to avoid BigQuery
errors from sending too many requests at once, and to work around
a bug where BigQuery will only let you insert one job at a time.
request_ids = [str(num) for num in xrange(len(requests))]
for id_and_request_batch in partition(zip(request_ids, requests),
requests_by_id = dict(id_and_request_batch)
requests_by_id, callback,
retries_remaining=NUM_HTTP_RETRY_ATTEMPTS - 1)
def run_http_requests_with_retry(requests_by_id, callback, retries_remaining):
"""Run the given requests as a batch, retrying as necessary.
If we hit any rate limiting errors or internal BigQuery errors, we sleep
for half a second and try again with the failed requests. If the retry
limit is exceeded, it is reported to the callback in the same way that
other exceptions are reported.
requests_by_id: A dictionary mapping string request ID to the request
callback: A callback to handle the requests, as described in the docs
for BatchHttpRequest.
retries_remaining: The number of times we're willing to retry again
before giving up.
ids_needing_retry = []
def retry_collecting_callback(request_id, response, exception):
if should_retry_http_exception(exception) and retries_remaining > 0:
callback(request_id, response, exception)
http_batch = apiclient_http.BatchHttpRequest()
for request_id, request in requests_by_id.iteritems():
http_batch.add(request, callback=retry_collecting_callback,
# If we get a retriable exception (which happens in the case of rate
# limiting and internal BigQuery errors), back off a bit to let the system
# settle down, then retry.
if ids_needing_retry:
logging.warning('After submitting %s requests at once, %s needed to '
'be retried.' %
(len(requests_by_id), len(ids_needing_retry)))
requests_by_id_to_retry = {request_id: requests_by_id[request_id]
for request_id in ids_needing_retry}
run_http_requests_with_retry(requests_by_id_to_retry, callback,
retries_remaining - 1)
def should_retry_http_exception(exception):
"""Returns True if the given HTTP exception should be retried.
The two main cases that are worth retrying are when we've hit BigQuery API
request rate limits or when BigQuery has some other error and gives a
message saying "Unexpected. Please try again.".
if not exception or not isinstance(exception, apiclient_errors.HttpError):
return False
content = json.loads(exception.content)
code, message = content['error']['code'], content['error']['message']
return ((code == 403 and message.startswith('Exceeded rate limits')) or
(code == 500 and 'Please try again' in message))
def partition(source_list, partition_size):
"""Break the given list up into a list of lists, each of the given size.
Every resulting list will have size partition_size except possibly the last
one, and no list will be empty.
return [source_list[n:n + partition_size]
for n in xrange(0, len(source_list), partition_size)]
"""Module responsible for providing the active experiments and conversions."""
from __future__ import absolute_import
import collections
from third_party import markdown
from bigbingo import alternative_deciders
class Experiment(object):
"""Configuration for an experiment.
id: A string identifying the experiment, which can consist only of
alphanumeric characters and underscores. This string is used in
table names, so it is recommended that the ID be relatively short.
Every experiment must have a unique ID.
logged_name: The name of the experiment that shows up in logs and in
strings in BigQuery. For legacy experiments, this is display_name,
and for new-style experiments, this is experiment_id. This field
only exists for transition reasons, and should be removed when the
last legacy experiment is removed.
display_name: A human-readable name for the experiment, used for
display purposes, which is allowed to have arbitrary characters.
legacy_hashable_name: For legacy experiments, the name for hashing
purposes, which is either the family name or the canonical name of
the experiment. None if the experiment is not a legacy experiment.
alternative_weights: An OrderedDict from alternative name (a string) to
weight (an integer).
control: A string of the experiment's control, which is used for
display purposes.
alternative_decider: A function from (Experiment, bingo_id) to
alternative which determines the alternative assigned to the given
user. This function should be stable in the sense that it always
gives the same alternative to any particular user. If the function
returns None, the user will be excluded from the test entirely.
owner: A string name of the person responsible for the experiment.
description: A string explaining the experiment, which will show up
when viewing the experiment in the BigBingo dashboard.
conclusion: Specify None for active experiments. For finished
experiments, provide a string explaining any results and
conclusions from the experiment.
# We can't serialize alternative_decider because it's a function. We also
# shouldn't serialize alternative_weights because it's scary to send
# alternative strings as dictionary keys since they could get inadvertently
# camel-cased.
_serialize_blacklist = ['alternative_decider', 'alternative_weights']
def __init__(self, exp_id, logged_name, display_name, legacy_hashable_name,
alternative_weights, control, alternative_decider, owner,
description, conclusion):
assert control in alternative_weights, (
'The control must be one of the alternatives.')
assert alternative_decider is not None = exp_id
self.logged_name = logged_name
self.display_name = display_name
self.legacy_hashable_name = legacy_hashable_name
self.alternative_weights = alternative_weights
self.control = control
self.alternative_decider = alternative_decider
self.owner = owner
self.description = description
self.conclusion = conclusion
def choose_alternative(self, bingo_id):
result = self.alternative_decider(self, bingo_id)
if result is not None and result not in self.alternatives:
raise ValueError(
'The chosen alternative in experiment %s was unexpectedly %s, '
'was expecting one of %s.' % (, result,
return result
def alternatives(self):
return self.alternative_weights.keys()
def is_archived(self):
return self.conclusion is not None
def description_html(self):
return markdown.markdown(self.description)
def conclusion_html(self):
if self.conclusion is not None:
return markdown.markdown(self.conclusion)
def is_previewable(self):
return not self.is_archived
class Conversion(object):
"""Configuration for a conversion.
id: A string identifying the conversion, which must consist only of
alphanumeric characters and underscores.
column_name: The string to use in column names in BigQuery. This is
often the same as the ID, but may be different for compatibility
logged_name: The name of the conversion that shows up in the logs.
display_name: A friendly name of the conversion.
# Ignore logged_name for consistency with DerivedConversion (and since it's
# more of an implementation detail anyway).
_serialize_blacklist = ['logged_name']
def __init__(self, conv_id, column_name, logged_name, display_name): = conv_id
self.column_name = column_name
self.logged_name = logged_name
self.display_name = display_name
class DerivedConversion(object):
"""Configuration for a derived conversion.
A derived conversion looks like a regular conversion but isn't valid to
log. Instead, it's created by the summarize task based on other conversion
values using the "aggregator" field.
_serialize_blacklist = ['aggregator']
def __init__(self, conv_id, display_name, aggregator): = conv_id
self.column_name = conv_id
self.display_name = display_name
self.aggregator = aggregator
def experiment(exp_id, display_name, alts, owner, description, conclusion=None,
control=None, decider=alternative_deciders.english_only):
"""Creates a standard BigBingo experiment.
The experiment may not be used in GAE/Bingo. See the fields on the
Experiment class for information on how to call this function.
if control is None:
control = default_control(alts)
if description:
description = strip_indentation(description)
if conclusion:
conclusion = strip_indentation(conclusion)
return Experiment(
exp_id=exp_id, logged_name=exp_id, display_name=display_name,
legacy_hashable_name=None, alternative_weights=alts, control=control,
alternative_decider=decider, owner=owner, description=description,
def default_control(alts):
"""Find the alternative that looks most like the control."""
for alternative in alts:
if any(word in alternative.lower()
for word in ('old', 'original', 'false', 'control')):
return alternative
return alts.keys()[0]
def strip_indentation(s):
return '\n'.join(line.lstrip() for line in s.split('\n'))
def bigbingo_conversion(conversion_id, display_name):
return Conversion(conv_id=conversion_id, column_name=conversion_id,
logged_name=conversion_id, display_name=display_name)
def alternatives(*alts):
""""Specifies a list of equal-weighted alternatives."""
return weighted_alternatives(*((alt, 1) for alt in alts))
def weighted_alternatives(*weighted_alts):
"""Specifies a list of weighted alternatives for an experiment.
Each argument should be a pair of (string, int) that specifies the name and
weight of the given alternative.
for name, weight in weighted_alts:
assert isinstance(name, basestring), 'Alternatives must be strings.'
assert isinstance(weight, int), 'Weights must be integers.'
return collections.OrderedDict(weighted_alts)
# This list started out as alphabetical and then became chronologically ordered
# So, you should put new experiments at the bottom.
display_name='Sample Experiment',
alts=alternatives('control', 'alt_1', 'alt_2'),
This is an example experiment description.
Arbitrary markdown is allowed here.
# Organized roughly in reverse chronological order. Experiments at the top
# were archived more recently.
display_name='Sample Archived Experiment',
alts=weighted_alternatives(('control', 9), ('alt', 1)),
All experiments end up here when they're archived. This makes sure
there is a record of all past experiments and lets the dashboard
still display old experiment information.
All archived experiments need to have a conclusion filled in.
# Set of string experiment names that are allowed to be used in the code, but
# are not tracked by BigBingo. This should be the logged name, so for legacy
# experiments (the common case), the full experiment name.
# A source conversion is a conversion that is logged directly. This is contrast
# to a derived conversion, which is computed during the BigBingo process.
# Alphabetical order, with deprecated conversions at the bottom.
# It's currently impossible to remove conversions from the BigBingo system.
bigbingo_conversion('problem_attempt', 'Attempted to do a problem'),
bigbingo_conversion('return_visit', 'Returned to the site'),
# These conversions are allowed to be used in the code, but are not tracked by
# BigBingo.
# A derived conversion is something that is computed from other existing
# conversion values and simple rules. For example, we may be interested in
# the question "did the user do any exercise problems the day after being
# exposed to an experiment?". This can be computed from the
# `problem_attempt_count` conversion by only counting the ones that were
# between 24hr and 48hr after the participation event.
# The format of an entry is as follows:
# '<derived_conversion_name>': {
# # A sub-section of a SELECT statement that deals with the column
# # of the resulting conversion. Records being aggregated are
# # rows from a join of the conversion events, and participation
# # events.
# 'aggregator': '<part of a SELECT>'
# }
'problem_attempt_next_day', 'Problem attempt next day',
conv_problem_attempt AND
logs.time >= part.participation_time + (24 * 3600) AND
logs.time < part.participation_time + (48 * 3600),
1, 0))
# Normally, experiments can't be deleted; removing them will cause
# the notification system to complain until it is added back. If you really
# want to remove an experiment, though, you can add its ID here and the next
# summarize task will forget it ever existed. This is mostly useful for testing
# or to remove experiments that never should have existed in the first place.
_LOGGED_SOURCE_CONVERSION_NAME_SET = {conversion.logged_name
for conversion in _SOURCE_CONVERSIONS}
_FINAL_CONVERSIONS_BY_ID = collections.OrderedDict(
[(, conversion) for conversion in _SOURCE_CONVERSIONS] +
[(, conversion) for conversion in _DERIVED_CONVERSIONS]
def get_active_experiments():
def get_archived_experiments():
def get_active_experiment_by_id(exp_id):
"""Returns the experiment with the given ID.
Raises KeyError if the experiment doesn't exist.
def get_possibly_archived_experiment(exp_id):
"""Returns the given experiment, even if it's in the archive.
Returns None if the experiment doesn't exist.
return (_ACTIVE_EXPERIMENTS_BY_ID.get(exp_id) or
def is_experiment_valid(experiment_name):
"""Determines if the experiment string is allowed to be logged."""
return (experiment_name in _LOGGED_EXPERIMENT_NAME_SET or
def is_experiment_whitelisted(experiment_name):
return experiment_name in _UNKNOWN_EXPERIMENT_WHITELIST
def get_conversion_by_id(conv_id):
"""Gets a conversion by ID, or crashes if the conversion doesn't exist.
Note that the result might be a regular Conversion or it might be a
return _FINAL_CONVERSIONS_BY_ID[conv_id]
def get_source_conversion_by_id(conv_id):
conv = get_conversion_by_id(conv_id)
if not isinstance(conv, Conversion):
raise KeyError('The conversion %s was not a source conversion.' %
return conv
def is_source_conversion_id_valid(conv_id):
return True
except KeyError:
return False
def get_source_conversions():
def get_source_conversion_ids():
"""Return a list of all conversions to extract from the logs.
Note the final list of conversions tracked and aggregated may be
expanded to include derived conversions. See get_final_conversion_ids()
and get_derived_conversions() for more details.
return [ for conversion in _SOURCE_CONVERSIONS]
def is_source_conversion_valid(conversion_name):
return (conversion_name in _LOGGED_SOURCE_CONVERSION_NAME_SET or
def get_final_conversion_columns():
"""Return the final list of column names to use."""
return [ for conv in _FINAL_CONVERSIONS_BY_ID.itervalues()]
def is_final_conversion_id_valid(conversion_id):
return conversion_id in _FINAL_CONVERSIONS_BY_ID
def get_derived_conversions():
"""Return information about derived conversions.
See the comment on _DERIVED_CONVERSIONS for details of the format.
def get_allowed_deletions():
"""Logic for config information kept in the datastore.
Although the primary source of truth for BigBingo configuration is,
we store some information in the datastore so that we can detect what changed
and send the appropriate notifications.
from __future__ import absolute_import
from google.appengine.ext import ndb
from third_party import alertlib
import db_util
from bigbingo import config
class BigBingoExperiment(ndb.Model):
"""Entity tracking the existence of an experiment.
Note that is still the canonical source of truth for the active
experiments and their configuration. The datastore is useful for detecting
when is different from last time and responding appropriately
(e.g. sending notifications for new/stopped experiments and giving a
warning when an experiment has mysteriously disappeared.)
start_time = ndb.DateTimeProperty(indexed=False, auto_now_add=True)
last_updated = ndb.DateTimeProperty(indexed=False, auto_now=True)
owner = ndb.StringProperty(indexed=False)
is_archived = ndb.BooleanProperty(indexed=False)
def send_notification(subject, notification, short_notification, color):
"""Sends a notification to HipChat.
subject: The subject line for the email to send.
notification: A possibly-long string with the notification text.
short_notification: A version of the notification that can fit
comfortably in a HipChat message.
color: A valid color to use in HipChat. See the documentation on
send_to_hipchat for more details.
alertlib.Alert(short_notification, html=True).send_to_hipchat(
'1s and 0s', color, sender='BigBingo Baboon')
alertlib.Alert(notification, subject, html=True).send_to_email(
def notify_new_active_experiment(experiment):
subject = 'Experiment started: %s' % experiment.display_name
format_str = ("%s's new BigBingo experiment %s just started.\n"
"Description: %s")
link = experiment_link(experiment)
notification = format_str % (experiment.owner, link,
short_notification = format_str % (experiment.owner, link,
send_notification(subject, notification, short_notification, 'purple')
def notify_archived_experiment(experiment):
subject = 'Experiment stopped: %s' % experiment.display_name
format_str = "%s's BigBingo experiment %s just stopped.\nConclusion: %s"
link = experiment_link(experiment)
notification = format_str % (experiment.owner, link,
short_notification = format_str % (experiment.owner, link,
send_notification(subject, notification, short_notification, 'purple')
def notify_missing_experiment(experiment_entity):
subject = 'Missing experiment: %s' % experiment_entity.key.string_id()
message = (
'Error: The BigBingo experiment %s, owned by %s, has gone missing! '
'Please add it back to bigbingo/ If you meant to stop the '
'experiment, you can do so by adding it to the list of archived '
'experiments.' %
(experiment_entity.key.string_id(), experiment_entity.owner))
send_notification(subject, message, message, 'red')
def notify_unarchived_experiment(experiment):
subject = 'Unarchived experiment: %s' % experiment.display_name
message = (
"That's strange, the %s's BigBingo experiment %s was moved from the "
"archive back to being an active experiment. If you actually want to "
"resume the experiment, you'll have missing data unless you manually "
"re-run the summarize task over the time that the experiment was "
"archived." % (experiment.owner, experiment_link(experiment)))
send_notification(subject, message, message, 'yellow')
def notify_new_archived_experiment(experiment):
subject = 'New archived experiment: %s' % experiment.display_name
message = (
"That's strange, %s's BigBingo experiment %s was seen in the list of "
"archived experiments, but it hasn't ever been active!" %
(experiment.owner, experiment_link(experiment)))
send_notification(subject, message, message, 'yellow')
def experiment_link(experiment):
return ('<a href="">%s</a>' %
(, experiment.display_name))
def truncate(string):
return string if len(string) < 50 else (string[:50] + '...')
def warn_for_missing_experiments(experiment_entities):
# If any experiments are in the datastore but not the config, send a
# notification that you're not supposed to do that. Note that we
# intentionally do NOT remove the entity from the datastore, since we want
# to keep poking HipChat until it gets fixed. If people really want to make
# an exception to that rule, though, they can set to allow some
# experiments to be deleted, in which case we actually delete them here.
keys_to_delete = []
for entity in experiment_entities:
experiment_id = entity.key.string_id()
if config.get_possibly_archived_experiment(experiment_id) is None:
if experiment_id in config.get_allowed_deletions():
if keys_to_delete:
def update_config_and_notify():
"""Update the datastore with any config changes and send notifications.
Every change to is worth broadcasting, so we keep track of the
previous set of experiments so we can see what changed. For new and
newly-stopped experiments, we announce them so people have an understanding
of what experiments are going on. If an experiment starts in the archive
or changes from archived to active, it is probably worth having a human
sanity-check it anyway, so we broadcast warnings for those cases. Finally,
if an experiment used to exist and doesn't anymore, we repeatedly give an
error, because the experiment was really supposed to be archived instead.
# There currently aren't that many experiments, so it should be fine to
# just fetch all of them. Also, this query is only eventually consistent,
# but that's fine since it runs so infrequently and occasional incorrect
# notifications aren't a big deal anyway.
all_experiment_entities = BigBingoExperiment.query().fetch()
experiment_entities_by_key = {
entity.key: entity for entity in all_experiment_entities}
entities_to_store = []
for experiment in (config.get_active_experiments() +
experiment_key = ndb.Key(BigBingoExperiment,
experiment_entity = experiment_entities_by_key.get(experiment_key)
if experiment_entity is None:
# The entry in is new.
if experiment.is_archived:
BigBingoExperiment(key=experiment_key, owner=experiment.owner,
elif experiment_entity.is_archived != experiment.is_archived:
# The entry in moved between the archive and active list.
if experiment.is_archived:
experiment_entity.is_archived = experiment.is_archived
if entities_to_store:
The MIT License (MIT)
Copyright (c) 2014 Khan Academy
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
"""Utilities for dealing with BigBingo events in the logs."""
from __future__ import absolute_import
import json
import logging
from bigbingo import config
import event_log
import ka_globals
class InvalidBingoEventError(Exception):
def log_participation_event(bingo_identity, experiment_name, alternative):
if not config.is_experiment_valid(experiment_name):
message = ('The experiment named "%s" was used, but was not found in '
'bigbingo/ . All active experiments must be registered '
'there (or added to the experiment whitelist).' % experiment_name)
if ka_globals.is_dev_server:
raise InvalidBingoEventError(message)
'bingo_id': bingo_identity,
'experiment': experiment_name,
'alternative': alternative
def log_conversion_event(bingo_identity, conversion_id):
"""Records that the given conversion event happened.
Normal application code should not call this directly; it should use
bigbingo.mark_conversion instead.
event_log.log_event('bingo.param', conversion_id)
'bingo_id': bingo_identity,
'conversion': conversion_id,
def parse_participation_events(log_lines):
"""Returns a generator for all participation events in the given logs.
The yielded results are dicts constructed from the logged JSON.
log_lines: A list of strings, which typically correspond to all log
lines in a particular request.
return _parse_events_with_prefix(log_lines, _PARTICIPATION_EVENT_PREFIX)
def parse_conversion_events(log_lines):
"""Returns a generator for all conversion events in the given logs.
The input and output format are the same as parse_participation_events, so
see its docstring for more details.
return _parse_events_with_prefix(log_lines, _CONVERSION_EVENT_PREFIX)
def _parse_events_with_prefix(log_lines, prefix):
for line in log_lines:
if not line.startswith(prefix):
yield json.loads(line[len(prefix):])
"""Summarize task for BigBingo experiments
This task regularly runs in the background to aggregate the latest bingo log
data. The raw data that we operate on are the bingo_participation_events and
bingo_conversion_events records generated from the request logs in
from __future__ import absolute_import
import time
from third_party.mapreduce.lib.pipeline import common
from third_party.mapreduce.lib.pipeline import pipeline
from bigbingo import config
from bigbingo import datastore_config
from bigquery import bq_connection
from bigquery import bq_pipelines
import log_to_bigquery
import pipeline_util
import request_handler
import user_util
# All fields should be nullable because the participants table is replaced by
# the result of the query when the summarize process runs, and the inferred
# schema has every field as nullable when writing a query result to a table.
'fields': [
{'name': 'experiment', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'bingo_id', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'alternative', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'participation_time', 'type': 'FLOAT', 'mode': 'NULLABLE'},
def get_conversions_schema():
return {
'fields': [
{'name': 'experiment', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'bingo_id', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'alternative', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'last_updated_time', 'type': 'FLOAT', 'mode': 'NULLABLE'}
] + [
{'name': 'conv_' + conv_col, 'type': 'INTEGER', 'mode': 'NULLABLE'}
for conv_col in config.get_final_conversion_columns()
def get_conversion_snapshots_schema():
fields = [
{'name': 'experiment', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'alternative', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'snapshot_time', 'type': 'INTEGER', 'mode': 'NULLABLE'}
for conv_id in config.get_final_conversion_columns():
{'name': 'count_' + conv_id,
'type': 'INTEGER', 'mode': 'NULLABLE'},
{'name': 'binary_' + conv_id,
'type': 'INTEGER', 'mode': 'NULLABLE'}
return {'fields': fields}
class StartSummarize(request_handler.RequestHandler):
"""Kicks off the BigBingo summarize task to process a time range of logs.
dataset: The name of the dataset to work with. In production, this is
always the "bigbingo" dataset, but for manual testing, you should
choose a different one, such as "bigbingo_test". This paramater is
required since a mistake could cause data corruption.
logs_dataset: The name of the "base dataset" used in LogToBigQuery.
In practice, and by default, this is a dataset called "logs". The
actual logs table(s) used are either in this dataset or in the
hourly version of this dataset (normally "logs_hourly"), depending
on whether use_daily_logs is set.
publish_dataset: The name of the dataset to copy result tables to, or
None if no publishing should occur.
archive_dataset: The name of the dataset to which to archive old
experiments, or None if no archiving should occur.
end_time: The upper bound on the time range to process, specified as a
UNIX timestamp in seconds. This time must be on an hour boundary.
If not specified, the hour boundary before the most recent one is
used (e.g. end_time is 3:00 if the current time is 4:30). This
allows the LogToBigQuery process to have enough time to export the
logs to BigQuery before we use them.
num_hours: The number of hours of data to process (going backward from
end_time). All hours processed must fall within the same day (UTC).
If not specified, we process 2 hours.
is_initial_run: If 0 (the default) we expect to find tables from a
previous run of the summarize task. If 1, dummy empty versions of
these tables will be created.
use_daily_logs: If 0 (the default), we read from logs tables that each
contain an hour of data. If 1, we read from the daily logs table
containing the hours that we are processing. It is usually
preferable to use the hourly logs tables so that the summarize task
will automatically fail if LogToBigQuery is failing.
# The admin requirement is enforced by the /admin path. We can't directly
# use admin_required here since it stops cron from being able to access it.
def get(self):
dataset = self.request_string('dataset')
if not dataset:
raise ValueError('The dataset name must be specified.')
logs_dataset = self.request_string('logs_dataset', 'logs')
publish_dataset = self.request_string('publish_dataset', None)
archive_dataset = self.request_string('archive_dataset', None)
# If unspecified, end_time is computed in get_and_validate_time_window.
end_time = self.request_int('end_time', -1)
num_hours = self.request_int('num_hours', 2)
is_initial_run = self.request_bool('is_initial_run', False)
use_daily_logs = self.request_bool('use_daily_logs', False)
start_time, end_time = self.get_and_validate_time_window(
end_time, num_hours, use_daily_logs)
summarize_pipeline = SummarizePipeline(
dataset, logs_dataset, publish_dataset, archive_dataset,
start_time, end_time, is_initial_run,
'Successfully started summarize task. '
'<a href="/_ah/pipeline/status?root=%s">Monitor task</a>' %
def get_and_validate_time_window(end_time, num_hours, use_daily_logs):
"""Validates and returns a [start_time, end_time) time range to use.
The time window is defined by ending at end_time and going back
num_hours hours. The start and end time need to be hour-aligned.
end_time: The last timestamp to process, as a UNIX time in seconds,
or -1. If this argument is -1, the hour mark before the most
recent one is used.
num_hours: The size of the window to process, in hours.
use_daily_logs: True if the daily logs table should be used instead
of the hourly logs tables.
A pair (start_time, end_time). The end_time is returned since it is
filled in when it is -1.
if end_time == -1:
current_time_seconds = time.time()
end_time = (int(current_time_seconds / 3600) - 1) * 3600
if end_time % 3600 != 0:
raise ValueError('The end_time parameter must be exactly on an '
'hour boundary.')
start_time = end_time - num_hours * 3600
if use_daily_logs and (
start_time // (24 * 3600) != (end_time - 1) // (24 * 3600)):
raise ValueError('The specified time window crosses multiple '
'days, which is not allowed.')
return start_time, end_time
class StartSummarizeBackfill(request_handler.RequestHandler):
"""Kicks off multiple summarize tasks in series.
The time range can span multiple days, but each individual summarize task
should be contained within one day. The arguments are very similar to the
arguments for StartSummarize.
dataset: The name of the dataset for the BigBingo tables.
logs_dataset: The base dataset containing the logs to process.
publish_dataset: (Optional) The dataset to copy result tables to.
archive_dataset: The name of the dataset to which to archive old
experiments, or None if no archiving should occur. For backfill
jobs not running up to the present, this should be None unless you
know what you're doing. (Otherwise, data on experiments recently
archived from the time after your backfill ends until the
experiment was archived could be lost.)
start_time: The start of the time range to process. This must be on an
hour boundary.
end_time: The end of the time range to process. This must be on an hour
num_hours_per_run: The size of each summarize task to run. Larger
values make the backfill faster and cheaper, but more likely to
is_initial_run: False if we expect previous tables to already exist.
use_daily_logs: True if we should use daily rather than hourly logs.
def get(self):
dataset = self.request_string('dataset')
if not dataset:
raise ValueError('The dataset name must be specified.')
logs_dataset = self.request_string('logs_dataset', 'logs')
publish_dataset = self.request_string('publish_dataset', None)
archive_dataset = self.request_string('archive_dataset', None)
start_time = self.request_int('start_time')
end_time = self.request_int('end_time')
num_hours_per_run = self.request_int('num_hours_per_run')
is_initial_run = self.request_bool('is_initial_run', False)
use_daily_logs = self.request_bool('use_daily_logs', False)
# TODO(alan): This first check is overly defensive in a lot of cases.
if 24 % num_hours_per_run != 0:
raise ValueError('num_hours_per_run must evenly divide a day.')
if start_time % (num_hours_per_run * 3600) != 0:
raise ValueError('start_time must be on a boundary defined by '
if end_time % (num_hours_per_run * 3600) != 0:
raise ValueError('end_time must be on a boundary defined by '
summarize_pipeline = BackfillSummarizePipeline(
dataset, logs_dataset, publish_dataset, archive_dataset,
start_time, end_time, num_hours_per_run, is_initial_run,
'Successfully started backfill summarize task. '
'<a href="/_ah/pipeline/status?root=%s">Monitor task</a>' %
def get_logs_table_expr(logs_base_dataset, start_time, end_time,
"""Returns a table expression for the logs table(s) to query over.
The simple (although non-default) case is that we just return a single
daily table, like "logs.requestlogs_20140421". The default case, though, is
that we want to select over a logs table for each hour, like
"logs_hourly.20140421_00, logs_hourly.20140421_01" (note that the comma
here is the BigQuery way to do UNION ALL).
logs_base_dataset: The base dataset name to use, usually "logs". This
is the name of the daily logs dataset, and appending "_hourly" to
the end finds the hourly logs dataset.
start_time: The start of the time range we are processing.
end_time: The end of the time range we are processing.
use_daily_logs: False if we should read from the hourly logs, True if
we should read from the daily logs.
if use_daily_logs:
dataset = logs_base_dataset
return dataset + '.' + log_to_bigquery.daily_logs_table_name(end_time)
dataset = logs_base_dataset + '_hourly'
# The name function takes the end time, so compute it from each start
# time by adding 1 hour.
return ', '.join(
dataset + '.' + log_to_bigquery.hourly_logs_table_name(
hour_start_time + 3600)
for hour_start_time in xrange(start_time, end_time, 3600)
def participants_table(unix_time):
"""Returns the name of the given participants table."""
return table_for_time('participants', unix_time)
def conversions_table(unix_time):
"""Returns the name of the given conversions table."""
return table_for_time('conversions', unix_time)
def pending_conversions_table(start_time, end_time):
"""Returns the name of the given pending conversions table."""
return table_for_time_range('pending_conversions', start_time, end_time)
def new_participation_events_table(start_time, end_time):
return table_for_time_range('new_participation_events', start_time,
def new_conversion_events_table(start_time, end_time):
return table_for_time_range('new_conversion_events', start_time, end_time)
def table_for_time(base_name, unix_time):
"""Returns the name to use for the given table as of the given time.
BigBingo tables are given timestamps to more reliably ensure that the
summarize task is working off of the correct data, and so that the
summarize task can fail at any point in the middle without causing
The given unix_time should be the start or end time for a summarize task,
so it should be exactly on an hour boundary. The actual hour given is in
return base_name + '_' + time.strftime('%Y_%m_%d_%H',
def table_for_time_range(base_name, start_time, end_time):
"""Returns the name to use for a table with data from a time range.
When a table represents data corresponding to a range of time (e.g. the
conversion events happening between 3pm and 6pm), we name it with a time
range. This is good practice because it helps in debugging and it guards
against pathological cases such as the time ranges [1pm, 3pm] and
[2pm, 3pm] being processed at the same time.
return table_for_time(table_for_time(base_name, start_time), end_time)
def raw_table(experiment_id):
"""The name of the table where the raw experiment data is stored.
This is a view in the publish dataset and a regular table in the archive.
It stores the complete per-bingo_id data for the experiment, to allow for
further analysis as desired.
return 'raw_' + experiment_id + '_data'
def totals_table(experiment_id):
"""The name of the table where summary experiment data is archived.
This only exists in the archive, and stores the snapshots every two hours
that are displayed in the BigBingo dashboard.
return 'historical_' + experiment_id + '_totals'
class BackfillSummarizePipeline(pipeline.Pipeline):
"""Run multiple summarize tasks in series.
See the documentation for StartSummarizeBackfill and SummarizePipeline for
descriptions of the arguments.
def run(self, dataset, logs_dataset, publish_dataset, archive_dataset,
start_time, end_time, num_hours_per_run, is_initial_run,
with pipeline.InOrder():
for batch_start_time in xrange(start_time, end_time,
num_hours_per_run * 3600):
batch_end_time = batch_start_time + num_hours_per_run * 3600
yield SummarizePipeline(
dataset, logs_dataset, publish_dataset, None,
batch_start_time, batch_end_time, is_initial_run,
is_initial_run = False
if archive_dataset:
# Always do all the archiving at the end.
experiments_to_archive = [ for e in config.get_archived_experiments()]
yield ArchiveExperimentsPipeline(
dataset, publish_dataset, archive_dataset, end_time,
class SummarizePipeline(pipeline.Pipeline):
"""Top-level pipeline for the summarize task.
dataset: The BigQuery dataset for the BigBingo input and output tables.
logs_dataset: The dataset containing the logs table to use.
publish_dataset: The dataset to copy the result tables to at the end,
or None if we shouldn't do any publishing.
archive_dataset: The dataset to which to archive any experiments that
should be archived, or None if we shouldn't do any archiving.
start_time: The inclusive low bound on the time range to process.
end_time: The exclusive high bound on the time range to process.
num_hours: The number of hours to process. This must be the difference
in hours between end_time and start_time.
is_initial_run: False if we should expect the previous tables to exist,
True otherwise.
use_daily_logs: True if we should
output_names = ['cost']
def run(self, dataset, logs_base_dataset, publish_dataset, archive_dataset,
start_time, end_time, is_initial_run, use_daily_logs):
# Before actually running summarize, make sure we keep the datastore
# BigBingo knowledge up-to-date and send any relevant notifications.
logs_table_expr = get_logs_table_expr(
logs_base_dataset, start_time, end_time, use_daily_logs)
parts_extracted = yield ExtractParticipationEventsPipeline(
dataset, logs_table_expr, start_time, end_time)
convs_extracted = yield ExtractConversionEventsPipeline(
dataset, logs_table_expr, start_time, end_time)
old_tables_checked = yield CheckOrCreateOldTablesPipeline(
dataset, is_initial_run, start_time)
with pipeline.After(old_tables_checked, parts_extracted):
parts_processed = yield ProcessParticipationEventsPipeline(
dataset, start_time, end_time)
# Processing conversions depends on the new participants table, so
# wait until parts_processed is ready.
with pipeline.After(convs_extracted, parts_processed):
convs_processed = yield ProcessConversionEventsPipeline(
dataset, start_time, end_time)
with pipeline.After(convs_processed):
convs_built = yield BuildConversionsTablePipeline(
dataset, start_time, end_time)
with pipeline.After(parts_processed):
parts_computed = yield ComputeParticipantCountSummaryPipeline(
dataset, end_time)
with pipeline.After(convs_built):
convs_computed = yield ComputeConversionCountSummaryPipeline(
dataset, end_time)
# Track all pipelines that did any queries (since those are the
# only actions that cost money) so we can return the total cost
# at the end.
futures_with_cost = [
parts_extracted, convs_extracted, parts_processed,
convs_processed, convs_built, parts_computed,
if publish_dataset:
with pipeline.After(parts_computed, convs_computed):
tables_published = yield PublishTablesPipeline(
dataset, publish_dataset, end_time)
if archive_dataset:
with pipeline.After(tables_published):
experiments_to_archive = [ for e in config.get_archived_experiments()]
experiments_archived = yield ArchiveExperimentsPipeline(
dataset, publish_dataset, archive_dataset, end_time,
total_cost = yield common.Sum(
*[future.cost for future in futures_with_cost])
yield bq_pipelines.ReturnOutputs("Success", cost=total_cost)
def finalized(self):
job_name='BigBingo Summarize')
class CheckOrCreateOldTablesPipeline(pipeline.Pipeline):
"""Sanity check the "previous" tables that we will be working off of.
The common case is that we're adding to existing experiment data, so the
tables corresponding to start_time should exist. If we know that we're
starting from scratch, though, we should just create stub tables instead.
def run(self, dataset, is_initial_run, start_time):
old_participants_table = participants_table(start_time)
old_conversions_table = conversions_table(start_time)
dataset, old_participants_table, PARTICIPANTS_SCHEMA,
patch_schema_if_necessary=False, ttl_days=7)
dataset, old_conversions_table, get_conversions_schema(),
create_if_necessary=is_initial_run, patch_schema_if_necessary=True,
# The conversion_snapshots table also needs to be updated with any new
# conversions, or else appending to that table will fail. It is not
# necessary for this operation to remove unused conversions, since
# BigQuery will automatically insert nulls for those columns when
# appending to the table.
# TODO(alan): The schema for this table will only grow over time, which
# can eventually cause problems and create bloat. One solution to this
# problem is to generate a new snapshots table every summarize run
# (since columns can be remove when making a new table), but that's not
# totally straightforward because of the way the table gets appended
# to. Another approach is to get rid of conversion columns in the
# schema for this table and just have a row for every conversion.
dataset, 'conversion_snapshots',
get_conversion_snapshots_schema(), create_if_necessary=True,
class ExtractParticipationEventsPipeline(pipeline.Pipeline):
"""Extract participation events from the logs.
You can't join on a repeated column, so this query extracts the events that
we're interested in into a flat table. It also gets rid of redundant events
(we know up-front that we can ignore participation events after the first
for each user and experiment).
output_names = ['cost']
def run(self, dataset, logs_table_expr, start_time, end_time):
# Note that the logservice considers the end time to be the time for
# the request, so we need to use that as our notion of "request time"
# or else some hours will be split across two tables.
query = """
bingo_participation_events.experiment AS experiment,
bingo_participation_events.bingo_id AS bingo_id,
bingo_participation_events.alternative AS alternative,
MIN(end_time) AS participation_time
FROM %(logs_table_expr)s
bingo_participation_events.experiment IS NOT NULL
end_time >= %(start_time)s
end_time < %(end_time)s
GROUP EACH BY experiment, bingo_id, alternative
""" % {
'logs_table_expr': logs_table_expr,
'start_time': start_time,
'end_time': end_time
yield bq_pipelines.QueryToTablePipeline(
query, dataset,
new_participation_events_table(start_time, end_time),
class ExtractConversionEventsPipeline(pipeline.Pipeline):
"""Builds a table of the conversion events for this summarize run."""
output_names = ['cost']
def run(self, dataset, logs_table_expr, start_time, end_time):
conversions = config.get_source_conversions()
query = """
bingo_conversion_events.bingo_id AS bingo_id,
end_time AS time,
FROM %(logs_table_expr)s
bingo_conversion_events.bingo_id IS NOT NULL
end_time >= %(start_time)s
end_time < %(end_time)s
bingo_conversion_events.conversion IN %(conversion_names)s
""" % {
'logs_table_expr': logs_table_expr,
'start_time': start_time,
'end_time': end_time,
# Store false as null since nulls are free.
'conversion_selects': ','.join(["""
IF(bingo_conversion_events.conversion = "%(conv_id)s",
true, null) AS conv_%(conv_id)s
""" % {
} for conv in conversions]),
'conversion_names': '(' +
','.join('"%s"' % for conv in conversions) + ')'
yield bq_pipelines.QueryToTablePipeline(
query, dataset, new_conversion_events_table(start_time, end_time),
class ProcessParticipationEventsPipeline(pipeline.Pipeline):
"""Determine the new set of participants for each experiment.
This stage unions the existing participants with the new participants and
does a GROUP BY to ensure that we don't include participants more than
output_names = ['cost']
def run(self, dataset, start_time, end_time):
old_participants_table = participants_table(start_time)
new_participants_table = participants_table(end_time)
events_table = new_participation_events_table(start_time, end_time)
query = """
MIN(participation_time) AS participation_time
%(events_table)s, -- UNION ALL
experiment, bingo_id, alternative
""" % {
'events_table': dataset + '.' + events_table,
'participants_table': dataset + '.' + old_participants_table
yield bq_pipelines.QueryToTablePipeline(
query, dataset, new_participants_table, result_table_ttl_days=7)
class ProcessConversionEventsPipeline(pipeline.Pipeline):
"""Computes the new conversion values to add for this summarize job.
Given the new_conversion_events table (an experiment-independent table of
conversion events for this summarize task) and the newly-computed
participants table, we determine what values to add to what conversions.
The output table has the same format as the full conversions table, but
only contains conversion numbers for this summarize task. The new
conversions table is computed in BuildConversionsTablePipeline.
output_names = ['cost']
def run(self, dataset, start_time, end_time):
conversions = config.get_source_conversions()
derived_conversions = config.get_derived_conversions()
new_participants_table = participants_table(end_time)
events_table = new_conversion_events_table(start_time, end_time)
pending_table = pending_conversions_table(start_time, end_time)
conversion_select_aggregators = [
'IFNULL(SUM(conv_%(conv_id)s), 0) AS conv_%(conv_id)s' % {