Skip to content

Instantly share code, notes, and snippets.

@alangpierce
Last active May 26, 2022 02:05
  • Star 9 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save alangpierce/f0ad63643b446a4f84ad to your computer and use it in GitHub Desktop.
BigBingo (as of early July 2014)
"""Alternative deciders that can be used in BigBingo experiments.
An alternative decider is a function that determines which user gets which
alternative in a BigBingo experiment. It takes as arguments the experiment
itself and the bingo ID and returns the chosen alternative. See the Experiment
class in bigbingo/config.py for more info.
The alternative decider should take the following arguments:
experiment: A config.Experiment object containing all information about
the experiment (most notably, an OrderedDict mapping alternative
name to weight).
bingo_id: A string with the ID for the given user.
It should return one of the experiment alternative names, or None if the user
should not participate in the experiment. It should be a deterministic
function of experiment and bingo_id.
"""
import hashlib
import intl.request
def legacy(experiment, bingo_id):
"""An alternative decider function that emulates GAE/Bingo.
This function should match modulo_choose in gae_bingo/gae_bingo.py. This
decider should not be used for new experiments; it only exists for
experiments that were ported from GAE/Bingo to BigBingo and are still
running.
"""
alternatives_weight = sum(experiment.alternative_weights.itervalues())
sig = hashlib.md5(
experiment.legacy_hashable_name + bingo_id).hexdigest()
sig_num = int(sig, base=16)
index_weight = sig_num % alternatives_weight
current_weight = alternatives_weight
alternative_data = [
(index, alt, weight)
for (index, (alt, weight))
in enumerate(experiment.alternative_weights.iteritems())]
for _, alt, weight in sorted(
alternative_data,
key=lambda (alt_index, _, alt_weight): (alt_weight, alt_index),
reverse=True):
current_weight -= weight
if index_weight >= current_weight:
return alt
assert False, "Didn't find a suitable alternative."
def english_only(experiment, bingo_id):
"""Test the experiment on a (deterministic) pseudorandom set of
English-speaking users.
"""
if intl.request.locale_for_mo() == 'en':
return by_weights(experiment, bingo_id)
else:
return None
def by_weights(experiment, bingo_id):
"""Test the experiment on a (deterministic) pseudorandom set of users."""
total_weight = sum(experiment.alternative_weights.itervalues())
weight_index = hash_on_interval(experiment.id + bingo_id) * total_weight
current_index = 0
for alternative, weight in experiment.alternative_weights.iteritems():
if current_index <= weight_index < current_index + weight:
return alternative
current_index += weight
assert False, 'Hash was out of bounds when computing BigBingo alternative.'
def limited_to_range(decider, range_start, range_end):
"""Return a decider that only allows a predictable set of users.
This is useful for making experiments that are mutually exclusive with each
other and re-using the participants of one experiment in a later
experiment, since the assignment of bingo_id to location in the [0, 1)
range is experiment-independent.
Example: limited_to_range(english_only, 0.2, 0.4) returns an alternative
decider that only allows a specific 20% of users to be in the experiment,
and of those, only allows english users. If you add another experiment with
decider limited_to_range(by_weights, 0.4, 0.5), English and non-English
users will be included and the experiment will be disjoint with the first
experiment.
Arguments:
decider: The alternative decider to use if the user is in the
experiment.
range_start: The inclusive start of the range, between 0 and 1.
range_end: The exclusive end of the range, between 0 and 1.
"""
assert 0 <= range_start < range_end <= 1
def wrapper_decider(experiment, bingo_id):
if range_start <= hash_on_interval(bingo_id) < range_end:
return decider(experiment, bingo_id)
else:
return None
return wrapper_decider
def hash_on_interval(value):
"""Given a string, return a hash of the string in the interval [0, 1).
For example, you can multiply the result by n to get a deterministic hash
that's a float value between 0 (inclusive) and n (exclusive).
The common way to use this is to call hash_on_interval(base + bingo_id), in
which case the right way to think about it is that each bingo_id for a base
gets a random (but deterministic) point on the "hash space", the full set
of hash space mappings is re-randomzied for each base.
"""
# Using a cryptographic hash function is overkill here, but it looks like
# Python doesn't have a good non-cryptographic hash function that's stable
# across versions, and MD5 is still plenty fast for our use case.
hash_str = hashlib.md5(value).hexdigest()
hash_uint32 = int(hash_str, 16) & 0xFFFFFFFF
return float(hash_uint32) / (1 << 32)
Snapshot of Khan Academy's BigBingo A/B testing framework and related code.
Here's a basic overview:
-summarize.py is the most interesting file. It contains all stages of the
summarize task, as well as the publish and archive steps that happen at the
end.
-bq_pipelines.py contains lots of useful pipelines for interacting with
BigQuery. QueryToTableBatchPipeline can run many simultaneous queries, and will
properly handle all batching and retry logic.
-config.py is where all experiment configuraiton lives. For this Gist, I
replaced the big list of active and archived experiments with simple examples.
The summarize task depends heavily on the App Engine Pipeline API
( https://code.google.com/p/appengine-pipeline/ ), more specifically, on KA's
fork of it here: https://github.com/Khan/appengine-mapreduce .
"""API for interacting with BigBingo."""
from __future__ import absolute_import
import logging
from bigbingo import config
from bigbingo import log
import cookie_util
def ab_test(experiment_id, bingo_id):
"""Participates the given user in the given experiment.
Logs a participation event and determines which alternative to use for this
user. If this is the first participation event for this experiment and
user, the time is used as the user's participation time for the experiment
and conversions start being tracked for the user; otherwise, the BigBingo
summarizer will ignore the participation event.
Arguments:
experiment_id: A string name of the ID of the experiment.
bingo_id: A string identity for the user that should participate in the
experiment.
Returns:
A string name of the alternative for this user in the experiment.
"""
experiment = config.get_active_experiment_by_id(experiment_id)
if not isinstance(bingo_id, basestring):
logging.error(
'When calling bigbingo.ab_test on the experiment %s, the bingo_id '
'was %s. Non-strings were allowed in GAE/Bingo, but is no longer '
'allowed in BigBingo, so the event will be ignored.' %
(experiment_id, bingo_id))
return experiment.control
# If the user set the alternative in a cookie, don't log it as a real
# participation event, just return the alternative value.
cookie_alternative = _get_cookie_alternative(experiment)
if cookie_alternative is not None:
logging.info(
'bigbingo.ab_test was forced to return alternative "%s" for '
'experiment "%s" (bingo_id "%s") due to a cookie override. No '
'participation event will be logged.' %
(cookie_alternative, experiment_id, bingo_id))
return cookie_alternative
alternative = experiment.choose_alternative(bingo_id)
if alternative is None:
# The user should not participate in this experiment, so return the
# control and don't log anything.
return experiment.control
else:
log.log_participation_event(bingo_id, experiment.logged_name,
alternative)
return alternative
def get_alternative_for_user(experiment_id, bingo_id):
"""Determine which alternative a user would be in.
This is similar to ab_test, but doesn't actually enter the user into the
experiment. This is mostly useful for "passive" systems that want to get
the information without disturbing anything, so this function is also
designed to swallow and log exceptions rather than propagating them in most
cases.
Arguments:
experiment_id: An ID of a valid BigBingo experiment.
bingo_id: A string with the identity of the current user.
Returns:
A string with the alternative to return, or None if there was a
problem getting it.
"""
try:
experiment = config.get_active_experiment_by_id(experiment_id)
except KeyError:
logging.error('Tried to get the alternative for an experiment that is '
'not an active experiment: "%s".', experiment_id)
return None
# Any well-written choose_alternative function should never crash, but
# we want to be resistant to crashes in it anyway, which could especially
# come up if the bingo_id is accidentally None.
try:
alternative = experiment.choose_alternative(bingo_id)
except Exception:
logging.exception(
'Error when trying to determine alternative for bingo_id "%s" in '
'BigBingo experiment "%s".' % (bingo_id, experiment_id))
return None
return alternative or experiment.control
def _get_cookie_alternative(experiment):
"""Return any alternative override that the user may have set.
Returns either a valid alternative for the experiment or None if there
wasn't a valid alternative set.
"""
alt_index = cookie_util.get_cookie_value('bingo_force_' + experiment.id)
if not alt_index:
return None
try:
return experiment.alternatives[int(alt_index)]
except (IndexError, ValueError):
logging.warn('Cookie overriding experiment %s specified index %s, '
'which is not valid. Ignoring.' %
(experiment.id, alt_index))
return None
def mark_conversion(conversion_id, bingo_id):
"""Indicates that the given user has triggered the given conversion.
The BigBingo summarizer will count the conversion event toward all
experiments that the given user is currently a part of.
"""
try:
conv = config.get_source_conversion_by_id(conversion_id)
except KeyError:
conv = (try_conversion_id(conversion_id + '_count') or
try_conversion_id(conversion_id + '_binary'))
if conv:
logging.error(
'The BigBingo conversion "%s" is invalid, but we found one to '
'use instead: "%s".' % (conversion_id, conv.id))
else:
logging.error("The conversion \"%s\" was used, but is not a valid "
"BigBingo conversion. We'll treat the conversion_id "
"as if it's a valid logged_name.", conversion_id)
log.log_conversion_event(bingo_id, conversion_id)
return
log.log_conversion_event(bingo_id, conv.id)
def try_conversion_id(conversion_id):
try:
return config.get_source_conversion_by_id(conversion_id)
except KeyError:
return None
"""This module makes it easy for other modules to connect to BigQuery."""
import json
import logging
import os
import re
import time
from third_party.google_api_python_client import httplib2
from third_party.google_api_python_client.apiclient import (
discovery as apiclient_discovery,
errors as apiclient_errors,
)
from third_party.google_api_python_client.oauth2client import (
appengine as oauth2client_appengine,
client as oauth2client_client,
)
import ka_globals
import ka_root
# Hard-coded project ID.
BQ_PROJECT_ID = ''
# Keywords that BigQuery will refuse to parse as column names.
BQ_RESERVED_KEYWORDS = frozenset(["profile"])
class BigQueryError(Exception):
pass
class BigQueryService(object):
"""A wrapper for the BigQuery Python client service.
This page enumerates the calls available to the API:
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/python/latest/index.html
It doesn't describe the interface for making calls. Do so like this:
service = BigQueryService.get_service()
request = service.tables().list(<ARGS>)
response = request.execute()
# response is a Python object representation of the JSON response.
"""
_SERVICE = None
@classmethod
def get_service(cls):
if not cls._SERVICE:
# Service creation does an HTTP fetch so we only do it one
# per instance and cache the service object.
credentials = get_credentials()
# Use a timeout higher than the default 5 seconds since this
# service is currently only used for background jobs, not
# user-facing requests, and batching and BigQuery throttling can
# make us exceed that default.
http = httplib2.Http(timeout=30)
http = credentials.authorize(http)
cls._SERVICE = apiclient_discovery.build('bigquery', 'v2',
http=http)
return cls._SERVICE
def get_credentials():
if ka_globals.is_dev_server:
return oauth2client_client.AccessTokenCredentials(
get_dev_access_token(), 'log2bq/0.1')
else:
return oauth2client_appengine.AppAssertionCredentials(
scope='https://www.googleapis.com/auth/bigquery')
def get_dev_access_token():
"""Gets the developer's BigQuery access token for use in a dev environment.
You can get a token by first installing the BigQuery command line tool "bq"
in a new virtual environment (to avoid conflicts with App Engine):
virtualenv ~/.virtualenv/bigquery_env
source ~/.virtualenv/bigquery_env/bin/activate
pip install bigquery
bq ls
(If you don't yet have access to BigQuery, you'll need to ask someone for
access). The first time you run "bq ls", it will provide instructions for
authenticating, then create a JSON file called .bigquery.v2.token in your
home directory, which contains the access token as one of its fields.
Running "bq ls" again (in the right virtualenv) will refresh that token.
The most robust approach is to have a file called bigquery_token in the
webapp directory contains the contents of the .bigquery.v2.token file that
is created/updated in your home directory when running "bq ls". You can
accomplish this using a symlink or by making a shell script to both run "bq
ls" and copy the token file to the right place. (See
https://gist.github.com/alangpierce/8e01f71b664761cec0a3) (We can't read
this file directly since the app engine sandbox disallows reading files
outside of webapp and also disallows reading files starting with a dot.)
You can also provide the access token more directly by setting the
BQ_ACCESS_TOKEN environment variable in app.yaml or by temporarily changing
this code to return the access token as a string literal.
"""
env_token = os.getenv('BQ_ACCESS_TOKEN')
if env_token:
return env_token
token_file_path = ka_root.join('bigquery_token')
if not os.path.isfile(token_file_path):
raise BigQueryError('You do not have a BigQuery token configured. '
'See get_dev_access_token in bq_connection.py for '
'instructions on how to configure one.')
with open(token_file_path, 'r') as token_file:
token_config_obj = json.load(token_file)
return token_config_obj['access_token']
def ensure_bigquery_dataset_exists(name, description=""):
"""Create a new bigquery dataset if it does not already exist.
Datasets contain tables. When loading a new table, the
dataset must be specified. If the specified dataset does
not already exist then the call will fail.
Args:
bigquery: the BigQuery service interface.
name: the unique name of the dataset (alphanumeric or _)
description: (optional) string describing the dataset
"""
# Setup a dataset creation request
request = BigQueryService.get_service().datasets().insert(
projectId=BQ_PROJECT_ID, body={
"datasetReference": {
"datasetId": name,
"projectId": BQ_PROJECT_ID,
},
"friendlyName": name,
"description": description
})
# Try to create the dataset
# Catch the error that is returned if the dataset already exists
# pass forward all other errors
try:
request.execute()
except apiclient_errors.HttpError, e:
code = json.loads(e.content)['error']['code']
table_already_exists_code = 409
if code == table_already_exists_code:
pass
else:
raise
def ensure_bigquery_table_exists(
dataset, table, new_schema, create_if_necessary=False,
patch_schema_if_necessary=False, ttl_days=None):
"""Makes sure that the given table exists and has the given columns.
The schema check does not check for an exact schema match; instead it
ensures that all fields in the given schema exist within the given table.
By default, an exception is thrown if the given table does not exist or
does not match.
Arguments:
dataset: The name of the dataset of the table to check/create.
table: The name (not including the dataset) of the table to
check/create.
new_schema: A dict containing the schema to use when creating the
table, or the schema we expect the table to have if the table
already exists. The schema format is specified in the bigquery docs
(a link to them is in the BigQueryService docstring) for the
bq_service.tables().insert(...) call. For example, here's a schema
with a single nullable string, 'foo':
{'fields': [{'name': 'foo', 'type': 'STRING', 'mode', 'NULLABLE'}]}
Note that all three components of each field should be specified,
or else the schema check will fail if the table already exists.
create_if_necessary: If the table does not exist, we create it with the
given schema rather than throwing an exception.
patch_schema_if_necessary: If the table exists but does not contain
the given schema, it is patched to include the given schema. Note
that BigQuery does not allow columns to be removed with a patch
operation, so any previously-existing columns not in new_schema are
moved to the end of the table. Also, a failure can still occur here
if the schema update is invalid (for example, changing an integer
column to a string column). Not all valid patch options are
currently supported: in particular, the operation will fail if the
mode of a column is relaxed or if a column is added to a nested
record, even though both types of operations are supported by
BigQuery.
ttl_days: If present and this operation creates a new table, the table
is created with an expiration time that makes it expire after this
many days.
"""
old_schema = get_table_schema(dataset, table)
if not old_schema:
# Table does not exist.
if create_if_necessary:
create_empty_table(dataset, table, new_schema, ttl_days)
return
else:
raise BigQueryError('Expected table %s.%s to exist.' %
(dataset, table))
# Table exists, so validate the schema.
if schema_is_subset(new_schema, old_schema):
return
# Schema mismatch, so error or fix.
if patch_schema_if_necessary:
patch_schema_to_table(dataset, table, old_schema, new_schema)
else:
raise BigQueryError(
'Schema mismatch for table %s.%s. Expected schema %s to '
'contain %s.' % (dataset, table, old_schema, new_schema))
def schema_is_subset(subset_schema, full_schema):
"""Returns true if the given schema is a subset of the full schema."""
full_schema_fields_by_name = {
field['name']: field for field in full_schema['fields']}
for subset_field in subset_schema['fields']:
full_field = full_schema_fields_by_name.get(subset_field['name'])
if full_field is None or subset_field != full_field:
return False
return True
def patch_schema_to_table(dataset, table, old_schema, patch_schema):
"""Patch the given table to include the given schema.
If the patch_schema removes any fields from old_schema, the old fields are
kept and put at the end of the schema.
Arguments:
dataset: The dataset of the table to patch.
table: The table to patch.
old_schema: The current schema of the table to patch.
patch_schema: A schema containing the new fields.
"""
BigQueryService.get_service().tables().patch(
projectId=BQ_PROJECT_ID,
datasetId=dataset,
tableId=table,
body={'schema': get_schema_for_patch(old_schema, patch_schema)}
).execute()
def get_schema_for_patch(old_schema, patch_schema):
"""Returns the schema to send to patch patch_schema onto old_schema."""
patch_schema_names = {field['name'] for field in patch_schema['fields']}
new_schema_fields = list(patch_schema['fields'])
for old_field in old_schema['fields']:
if old_field['name'] not in patch_schema_names:
new_schema_fields.append(old_field)
return {
'fields': new_schema_fields
}
def get_table_schema(dataset, table):
"""If the table exists, returns its schema. Otherwise, returns None."""
table_service = BigQueryService.get_service().tables()
try:
get_result = table_service.get(
projectId=BQ_PROJECT_ID,
datasetId=dataset,
tableId=table
).execute()
return get_result['schema']
except apiclient_errors.HttpError as e:
# Return None if the table doesn't exist.
if json.loads(e.content)['error']['code'] == 404:
return None
else:
raise
def get_leaf_column_selectors(dataset, table):
"""Parses the table's schema to generate a list of column selectors.
BigQuery tables may have record fields, which have a hierarchical schema.
Each subfield is delimited with a dot, but BigQuery views cannot have dots
in their names. So, instead of defining the view like:
SELECT
__key__.namespace,
__key__.path
FROM
[MyTable]
You have to define it like:
SELECT
__key__.namespace as __key___namespace,
__key__.path as __key___path
FROM
[MyTable]
For more information, see http://stackoverflow.com/questions/23840038
"""
schema = get_table_schema(dataset, table)
if not schema:
raise BigQueryError('Expected table %s.%s to exist.' % (
dataset, table))
return ",\n".join([
_get_leaf_selectors("", top_field)
for top_field in schema["fields"]
])
def _get_leaf_selectors(prefix, field, depth=0):
"""Recursive helper for get_leaf_column_selectors()"""
field_name = field["name"]
if prefix:
field_name = prefix + "." + field_name
if 'fields' not in field:
if "." in field_name:
# If we translate user.email to user_email, the user_email field
# may already exist as a top-level field, so prepend an underscore
# to signify this is a record-turned-regular field. There shouldn't
# be any top-level actual fields that start with an underscore.
safe_name = field_name.replace(".", "_")
return "%s as _%s" % (field_name, safe_name)
elif depth == 0 and field_name.lower() in BQ_RESERVED_KEYWORDS:
# Reserved keywords mess up BigQuery's SQL parsing, so make sure we
# don't use the bare keyword as the column name
return "[%s]" % field_name
else:
return field_name
else:
# Recursive case
return ",\n".join([
_get_leaf_selectors(field_name, sub_field, depth + 1)
for sub_field in field["fields"]
])
def delete_table_if_exists(dataset, table):
"""If the table exists, delete it. Otherwise, do nothing."""
table_service = BigQueryService.get_service().tables()
try:
table_service.delete(
projectId=BQ_PROJECT_ID,
datasetId=dataset,
tableId=table
).execute()
except apiclient_errors.HttpError as e:
# Return (do nothing) if the table doesn't exist.
if json.loads(e.content)['error']['code'] == 404:
return
else:
raise
def create_empty_table(dataset, table, schema, ttl_days=None):
"""Create the given table
An exception will be thrown if the table already exists.
Arguments:
dataset: The name of the dataset of the table to check/create.
table: The name (not including the dataset) of the table to
check/create.
schema: A dict describing the schema of the new table. See the
docstring on ensure_bigquery_table_exists for a description of the
dict format.
ttl_days: If present, the table is set to expire after this many days.
"""
table_service = BigQueryService.get_service().tables()
table_service.insert(
projectId=BQ_PROJECT_ID,
datasetId=dataset,
body={
'tableReference': {
'projectId': BQ_PROJECT_ID,
'datasetId': dataset,
'tableId': table
},
'schema': schema,
}
).execute()
# We could technically set the expiration time in the regular request that
# creates the table, but setting it after-the-fact makes the code simpler.
if ttl_days is not None:
set_table_ttl_in_days(dataset, table, ttl_days)
def create_or_update_view(dataset, view_name, query):
"""Create a BigQuery view from from the given query.
If the table already exists, it is changed to a view (if necessary) and the
query for the view is updated to the given query.
Arguments:
dataset: The dataset where the view will go.
view_name: The name of the view to create or update.
query: A string with the SQL for the view.
"""
table_service = BigQueryService.get_service().tables()
table_body = {
'tableReference': {
'projectId': BQ_PROJECT_ID,
'datasetId': dataset,
'tableId': view_name
},
'view': {
'query': query
}
}
try:
table_service.insert(
projectId=BQ_PROJECT_ID,
datasetId=dataset,
body=table_body).execute()
except apiclient_errors.HttpError as e:
if json.loads(e.content)['error']['code'] == 409: # Already exists
table_service.update(
projectId=BQ_PROJECT_ID,
datasetId=dataset,
tableId=view_name,
body=table_body).execute()
else:
logging.error("Failed with query: %s", query)
raise
def set_table_ttl_in_days(dataset, table, num_days):
"""Set the table to expire after the given number of days.
Tables last forever by default, but this sets a time-to-live so that
BigQuery will later delete the table. This function relies on the system
clock to compute an expiration time.
Arguments:
dataset: The dataset containing the table to modify.
table: The name of the table to modify.
num_days: A number (not necessarily an integer) with the number of days
to keep the table around.
"""
assert num_days > 0
current_time_ms = time.time() * 1000
time_to_live_ms = num_days * 24 * 60 * 60 * 1000
expiration_time_ms = int(current_time_ms + time_to_live_ms)
BigQueryService.get_service().tables().patch(
projectId=BQ_PROJECT_ID,
datasetId=dataset,
tableId=table,
body={'expirationTime': expiration_time_ms}
).execute()
def _get_all_pages(query_function, parameters, key_to_merge):
"""Pages through results returned by the BigQuery API
Arguments:
query_function - the function to call, such as datasets().list
parameters - the initial parameters to send to the function. This will
be augmented with the page tokens if there are multiple pages.
key_to_merge - the key within the dictionary of results that contains
an array of objects to merge together to form the final list.
Returns:
A really long list in the format of the original API call.
"""
def get_list(the_result):
# Sometimes the query is valid, but returns nothing. In this case the
# key is not present, so we should just return an empty list.
if key_to_merge not in the_result:
return []
else:
return the_result[key_to_merge]
result = query_function(**parameters).execute()
final_result = {
key_to_merge: get_list(result)
}
while "nextPageToken" in result:
parameters["pageToken"] = result["nextPageToken"]
result = query_function(**parameters).execute()
final_result[key_to_merge].extend(get_list(result))
return final_result
def get_all_backup_dataset_names():
"""Return the names of all datastore backup datasets.
https://developers.google.com/bigquery/docs/reference/v2/datasets/list
"""
backup_dataset_names = []
parameters = {
"projectId": BQ_PROJECT_ID
}
datasets_list = _get_all_pages(
BigQueryService.get_service().datasets().list, parameters, "datasets")
for d in datasets_list['datasets']:
dataset_name = d['datasetReference']['datasetId']
# Must match YYYY_MM_DD format
if re.search('(^[0-9]{4}_[0-9]{2}_[0-9]{2}$)', dataset_name):
backup_dataset_names.append(dataset_name)
return backup_dataset_names
def get_all_tables_in_dataset(dataset):
"""Return the names of all a dataset's tables, or None for a bad dataset.
Note that there is currently no way to disambiguate between a view and a
table, so both are returned.
https://developers.google.com/bigquery/docs/reference/v2/tables/list
"""
parameters = {
"projectId": BQ_PROJECT_ID,
"datasetId": dataset,
"maxResults": 500,
}
try:
tables_list = _get_all_pages(
BigQueryService.get_service().tables().list, parameters, "tables")
return [table['tableReference']['tableId']
for table in tables_list['tables']]
except apiclient_errors.HttpError as e:
# Return None if the dataset doesn't exist.
if json.loads(e.content)['error']['code'] == 404:
return None
else:
raise
def get_most_recent_backup_dataset():
"""Return the name of the most recent datastore backup dataset."""
backup_names = get_all_backup_dataset_names()
if backup_names:
return sorted(backup_names)[-1]
else:
return None
"""Useful pipeline classes for interacting with BigQuery."""
import json
import logging
import os
import time
from third_party.google_api_python_client.apiclient import (
errors as apiclient_errors,
http as apiclient_http
)
from third_party.mapreduce.lib.pipeline import pipeline
from third_party.mapreduce.lib.pipeline import common
from bigquery import bq_connection
# In some situations, we want to submit lots of API requests at once. Batching
# helps with this, but BigQuery will give an error if we send way too many API
# requests in the same batch, so we limit the number of requests we're willing
# to send at once.
DEFAULT_NUM_REQUESTS_PER_BATCH = 10
# When hitting retriable errors (rate limit exceeded or internal BigQuery
# errors) when submitting HTTP requests, automatically retry up to this many
# times.
#
# Note that automatic pipeline retry accomplishes some of this, but this HTTP
# retry works better because only failed requests in a batch are retried rather
# than retrying the whole batch. Also, since we limit retrying to definitely-
# retriable errors, it's ok to be more aggressive about retrying.
NUM_HTTP_RETRY_ATTEMPTS = 10
# Sets a limit on the amount of time we're willing to wait before giving up on
# a query. This setting is necessary because sometimes, BigQuery will spend a
# very long time on a small number of queries. For example, there have been
# times where 50 queries are submitted at the same time and 48 of those queries
# finish within a few minutes, but the remaining two take multiple hours to
# finish. In these cases, simply submitting another copy of the two stalled
# queries would work, so we treat the time limit as a retriable error.
# TODO(alan): This time limit used to be 15 minutes, but I disabled the feature
# by setting it to None to work around a correctness issue in BigQuery that
# happens when you submit the same query twice:
# http://stackoverflow.com/questions/23424289/what-atomicity-guarantees-does-bigquery-provide-for-query-jobs
# If we keep running into issues, it may make sense to set this time limit
# again (potentially to a number higher than 15 minutes), and if we stop
# running into the issue, we may want to remove this feature completely.
QUERY_TIME_LIMIT_SECONDS = None
# Initial amount of time, in seconds, to sleep when waiting for a job to finish
# (assuming it's not done immediately).
JOB_WAIT_INITIAL_TIME = 5
# Increase the sleeping time by the following factor when waiting for a job to
# finish. Back off exponentially so the task hierarchy doesn't get ridiculously
# deep in the pipeline viewer when we wait on a long-running job like a batch
# query.
JOB_WAIT_BACKOFF_MULTIPLIER = 1.5
DEFAULT_QUERY_PRIORITY = os.environ.get('DEFAULT_BIGQUERY_PRIORITY', 'BATCH')
class ReturnOutputs(pipeline.Pipeline):
"""General-purpose mechanism for returning named outputs.
This makes it more convenient to return pipeline results as outputs, since
the normal self.fill mechanism requires returning already-resolved values,
not pipeline futures. To use this, this pipeline should be yielded last.
Also, any outputs specified will be ignored unless they are specified in
the output_names list of the calling pipeline.
For example, this code snippet lets you combine the results from two child
pipelines into three outputs (one default and two named):
default_and_foo = yield ComputeDefaultAndFoo()
bar = yield ComputeBar()
yield ReturnOutputs(default_and_foo, foo=default_and_foo.foo, bar=bar)
Note that this pipeline is unnecessary if the last pipeline yielded already
returns outputs with the desired names and values.
"""
def run(self, default_output, **kwargs):
for key in kwargs:
self.fill(key, kwargs[key])
return default_output
class ListIndex(pipeline.Pipeline):
"""Returns the element at the given index from the given list."""
def run(self, input_list, index):
return input_list[index]
class SmallResultQueryPipeline(pipeline.Pipeline):
"""Pipeline for running a query and getting its results.
This pipeline is convenient if the results are small (e.g. if you're
querying an aggregate) and you want to view the results directly in Python
rather than sending the results to a table.
Arguments:
query: A string with the text of the query to run.
Returns:
The query results as returned in the getQueryResults method in the jobs
section of the BigQuery API. For example,
results['rows'][0]['f'][0]['v'] gets the first field of the first row.
"""
def run(self, query):
job_metadata_list = yield RunJobBatchPipeline([{
'query': {
'query': query
}
}])
job_metadata = yield ListIndex(job_metadata_list, 0)
yield GetQueryResultsPipeline(job_metadata)
class GetQueryResultsPipeline(pipeline.Pipeline):
"""Fetches and returns the actual query results for a job.
Arguments:
job_metadata: A dict containing information about the job to get query
results for, as returned by insert or get in the jobs section of
the BigQuery API.
Returns:
The query results as returned in the getQueryResults method in the jobs
section of the BigQuery API. For example,
results['rows'][0]['f'][0]['v'] gets the first field of the first row.
"""
def run(self, job_metadata):
job_service = bq_connection.BigQueryService.get_service().jobs()
return job_service.getQueryResults(
projectId=job_metadata['jobReference']['projectId'],
jobId=job_metadata['jobReference']['jobId']).execute()
class QueryToTablePipeline(pipeline.Pipeline):
"""Pipeline for running a query and sending the output to a table.
This pipeline will submit a query and will not mark itself as done until
the query has either finished or failed.
Arguments:
query: A string with the text of the query to run.
output_dataset: The dataset of the output table.
output_table: The name of the output table.
create_if_needed: If true (the default), we create the table (rather
than failing) if it doesn't already exist.
append_to_table: If false (the default), we overwrite the table
contents rather than appending to the end of the table.
priority: A string, either 'INTERACTIVE' or 'BATCH'. In both cases, the
query will be submitted asynchronously and the pipeline will wait
until the query completes, but interactive queries generally run
faster and are subject to rate limits, while batch queries are
marked as lower priority in BigQuery's scheduler and have no
restriction on the number of concurrent queries. The default
priority is BATCH since pipelines are most commonly run in
background jobs where stability and isolation (i.e. not affecting
humans using the BigQuery webapp) are more important than low
latency.
allow_large_results: Setting to True (the default) may slow the query
down, but allows the query to return more than 128MB of results.
result_table_ttl_days: If specified, the resulting table is
asynchronously set to expire after the given number of days.
table_desc: A string with a human-readable description of this BigQuery
table. If this is None, we do not set the table description.
schema_desc: A list of dictionaries, where each dictionary is of the
form as described in [1] in the patch/body/schema/fields section.
If this is None, we do not set the schema descriptions.
Returns:
default: Metadata describing the status of the query job (the return
value of the "get" function in the "jobs" section of the BigQuery
API).
cost: The number of cents spent on the query.
[1] https://developers.google.com/resources/api-libraries/documentation/
bigquery/v2/python/latest/bigquery_v2.tables.html
"""
output_names = ['cost']
def run(self, query, output_dataset, output_table, create_if_needed=True,
append_to_table=False, priority=DEFAULT_QUERY_PRIORITY,
allow_large_results=True, result_table_ttl_days=None,
table_desc=None, schema_desc=None):
job_results = yield RunJobBatchPipeline(
[query_to_table_job_config(query, output_dataset, output_table,
create_if_needed, append_to_table,
priority, allow_large_results)],
time_limit=QUERY_TIME_LIMIT_SECONDS)
if result_table_ttl_days is not None:
with pipeline.After(job_results):
yield SetTableTTLInDaysPipeline(
output_dataset, output_table, result_table_ttl_days)
if table_desc is not None or schema_desc is not None:
with pipeline.After(job_results):
yield UpdateTableInfoPipeline(
output_dataset, output_table, table_desc, schema_desc)
cost = yield GetCostPipeline(job_results)
job_result = yield ListIndex(job_results, 0)
yield ReturnOutputs(job_result, cost=cost)
class QueryToTableBatchPipeline(pipeline.Pipeline):
"""Batched version of QueryToTablePipeline.
This pipeline lets you submit many queries at the same time and block until
all of them have finished. The API requests are batched, so this version is
more performant, and the requests are automatically throttled as necessary
to avoid hitting BigQuery's request limit.
Arguments:
query_specs: A list of dictionaries, each of which describes a query
to make and the table that it should output to. The dictionary keys
are the same as the arguments to QueryToTablePipeline, although
result_table_ttl_days is not yet supported for this pipeline.
Returns:
default: A list of job results, each of which has the same format as
the return value of bq_service.jobs().get(...).
cost: The combined cost of all queries.
"""
output_names = ['cost']
def run(self, query_specs):
job_configs = [query_to_table_job_config(**spec)
for spec in query_specs]
results = yield RunJobBatchPipeline(
job_configs, time_limit=QUERY_TIME_LIMIT_SECONDS)
cost = yield GetCostPipeline(results)
yield ReturnOutputs(results, cost=cost)
def query_to_table_job_config(
query, output_dataset, output_table, create_if_needed=True,
append_to_table=False, priority=DEFAULT_QUERY_PRIORITY,
allow_large_results=True):
"""Gets the 'configuration' dict to use when starting a BigQuery query.
See the documentation for bq_service.jobs().insert(...) for more
information.
"""
logging.info('Submitting query %s' % query)
create_mode = 'CREATE_IF_NEEDED' if create_if_needed else 'CREATE_NEVER'
# Note that the API also allows for a WRITE_EMPTY option where we
# expect the table to be empty, and there's no reason we can't add
# support for that here if we need to.
write_mode = 'WRITE_APPEND' if append_to_table else 'WRITE_TRUNCATE'
return {
'query': {
'query': query,
'priority': priority,
'destinationTable': {
'projectId': bq_connection.BQ_PROJECT_ID,
'datasetId': output_dataset,
'tableId': output_table
},
'createDisposition': create_mode,
'writeDisposition': write_mode,
'allowLargeResults': allow_large_results,
}
}
class GetCostPipeline(pipeline.Pipeline):
"""Given a list of query metadata, compute the total cost of all queries.
Arguments:
job_infos: A list of dicts, each containing information about a query
job. The format is the return format for bq_service.jobs().get(...), as
documented in the BigQuery API (see the documentation on
BigQueryService for a link).
Returns: The combined cost of all queries, in cents.
"""
def run(self, job_infos):
result = 0
for job_info in job_infos:
num_bytes = int(job_info['statistics']['query']
['totalBytesProcessed'])
# Queries cost 0.5 cents per GB of data touched.
result += 0.5 * num_bytes / (2 ** 30)
return result
class CopyTablePipeline(pipeline.Pipeline):
"""Copies the contents of the source table to the destination table.
Both tables are specified by their dataset name and table name. If
append_to_table is False and the destination table already exists, it is
overwritten with the contents of the source table.
This pipeline is not marked as finished until the copy job is complete.
"""
def run(self, src_dataset, src_table, dest_dataset, dest_table,
append_to_table=False, result_table_ttl_days=None):
result_list = yield RunJobBatchPipeline([copy_table_job_config(
src_dataset, src_table, dest_dataset, dest_table, append_to_table)
])
if result_table_ttl_days is not None:
with pipeline.After(result_list):
yield SetTableTTLInDaysPipeline(
dest_dataset, dest_table, result_table_ttl_days)
yield ListIndex(result_list, 0)
class CopyTableBatchPipeline(pipeline.Pipeline):
"""Performs multiple copy operations in a batch.
Arguments:
copy_specs: A list of dictionaries, each of which describes a copy
operation. The dict keys should be the same as the arguments to
CopyTablePipeline.
"""
def run(self, copy_specs):
yield RunJobBatchPipeline(
[copy_table_job_config(**copy_spec) for copy_spec in copy_specs])
def copy_table_job_config(src_dataset, src_table, dest_dataset, dest_table,
append_to_table=False):
write_mode = 'WRITE_APPEND' if append_to_table else 'WRITE_TRUNCATE'
return {
'copy': {
'sourceTable': {
'projectId': bq_connection.BQ_PROJECT_ID,
'datasetId': src_dataset,
'tableId': src_table
},
'destinationTable': {
'projectId': bq_connection.BQ_PROJECT_ID,
'datasetId': dest_dataset,
'tableId': dest_table
},
'createDisposition': 'CREATE_IF_NEEDED',
'writeDisposition': write_mode
}
}
class UpdateTableInfoPipeline(pipeline.Pipeline):
"""Updates the descriptions of an existing BigQuery table.
This pipeline assumes that the table already exists and the schema are as
specified. The entire purpose of this class as is is to add a description
for the table itself and/or for the schema in the table.
For an example of how to include descriptions when creating a table, please
refer to bigquery_reports/user_sessions.py.
TODO(ilan): It would be ideal to automatically generate the schema for our
existing tables based on the docstrings we've already written.
"""
def run(self, dataset_name, table_name, table_desc=None, schema_desc=None):
"""Updates the descriptions for the BigQuery table and its schema.
Args:
dataset_name: A string with the ID of the dataset where this table
resides.
table_name: A string with the ID of this table.
table_desc: A string with a human-readable description of this
BigQuery table. If this is None, we do not set the
table description.
schema_desc: A list of dictionaries, where each dictionary is of
the form as described in [1] in the patch/body/
schema/fields section. If this is None, we do not set
the schema descriptions.
[1] https://developers.google.com/resources/api-libraries/
documentation/bigquery/v2/python/latest/bigquery_v2.tables.html
"""
table_service = bq_connection.BigQueryService.get_service().tables()
# I'm really not sure what to put here, so I'm just copying from
# EnsureTablesDeletedPipeline for now
def handle_result(request_id, response, exception):
# Ignore missing tables
if exception and not (
isinstance(exception, apiclient_errors.HttpError) and
json.loads(exception.content)['error']['code'] == 404):
raise exception
run_http_requests(
[table_service.patch(
projectId=bq_connection.BQ_PROJECT_ID,
datasetId=dataset_name,
tableId=table_name,
body=patch_table_job_config(dataset_name, table_name,
table_desc, schema_desc))],
handle_result)
def patch_table_job_config(dataset_name, table_name, table_desc=None,
schema_desc=None):
"""Returns the job config dictionary for this table patch method call.
Args:
dataset_name: A string with the ID of the dataset where this table
resides.
table_name: A string with the ID of this table.
table_desc: A string with a human-readable description of this
BigQuery table. If this is None, we do not set the
table description.
schema_desc: A list of dictionaries, where each dictionary is of
the form as described in [1] in the patch/body/
schema/fields section. If this is None, we do not set
the schema descriptions.
[1] https://developers.google.com/resources/api-libraries/
documentation/bigquery/v2/python/latest/bigquery_v2.tables.html
Return:
A dictionary of the form required to run the patch job.
"""
result = {
'tableReference': {
'projectId': bq_connection.BQ_PROJECT_ID,
'tableId': table_name,
'datasetId': dataset_name,
},
}
if table_desc is not None:
result['description'] = table_desc
if schema_desc is not None:
result['schema'] = {
'fields': schema_desc,
}
return result
class LoadTablePipeline(pipeline.Pipeline):
"""Loads a table from Cloud Storage.
If the specified table does not already exist, it is created. This pipeline
is not marked as finished until the load job is complete, and the pipeline
is aborted if there is a job failure.
Arguments:
cloud_storage_paths: A list of strings specifying the cloud storage
files to load, e.g. "gs://foo/bar".
dataset: The dataset of the table to write to.
table: The name of the table to write to.
schema: The schema of the table. If the table already exists, this must
be a subset of the existing schema. See the documentation for
bq_service.jobs().insert(...) (the BigQueryService documentation
has a link) for a description of the schema format.
source_format: A string, either NEWLINE_DELIMITED_JSON or CSV.
write_mode: A string, either WRITE_APPEND or WRITE_TRUNCATE.
result_table_ttl_days: If specified, the resulting table is
asynchronously set to expire after the given number of days.
Returns:
Metadata about the job result, in the format of the return value of
bq_service.jobs().get(...).
"""
def run(self, cloud_storage_paths, dataset, table, schema, source_format,
write_mode, result_table_ttl_days=None):
result_list = yield RunJobBatchPipeline([{
'load': {
'sourceFormat': source_format,
'sourceUris': cloud_storage_paths,
'schema': schema,
'destinationTable': {
'projectId': bq_connection.BQ_PROJECT_ID,
'datasetId': dataset,
'tableId': table,
},
'createDisposition': 'CREATE_IF_NEEDED',
'writeDisposition': write_mode,
'encoding': 'UTF-8',
}
}])
if result_table_ttl_days is not None:
with pipeline.After(result_list):
yield SetTableTTLInDaysPipeline(
dataset, table, result_table_ttl_days)
yield ListIndex(result_list, 0)
class EnsureTablesDeletedPipeline(pipeline.Pipeline):
"""Deletes the tables with the given names in the given dataset.
Any tables that already don't exist are ignored. The delete operation
happens synchronously, so the tables will be deleted when the pipeline is
marked as finished.
"""
def run(self, dataset_name, table_names):
table_service = bq_connection.BigQueryService.get_service().tables()
def handle_result(request_id, response, exception):
# Ignore missing tables
if exception and not (
isinstance(exception, apiclient_errors.HttpError) and
json.loads(exception.content)['error']['code'] == 404):
raise exception
run_http_requests(
[table_service.delete(
projectId=bq_connection.BQ_PROJECT_ID,
datasetId=dataset_name,
tableId=table_name)
for table_name in table_names],
handle_result)
class SetTableTTLInDaysPipeline(pipeline.Pipeline):
"""Set the table to expire after the given number of days.
See bq_connection.set_table_ttl_in_days for more info.
"""
def run(self, dataset, table, num_days):
bq_connection.set_table_ttl_in_days(dataset, table, num_days)
def should_retry_job(job_info):
"""Return True if the given job failed and is eligible for retry.
Jobs should be retried if they failed due to BigQuery internal errors.
BigQuery job statuses provide a 'reason' error code that helps distinguish
between transient BigQuery errors and bugs in our code or other errors that
are not worth retrying.
Arguments:
job_info: The result type of bq_service.jobs().get(...), which provides
information about the status and other details of a job.
"""
return ('errorResult' in job_info['status'] and
job_info['status']['errorResult']['reason'] in
('internalError', 'backendError', 'quotaExceeded',
'ka:timeLimitExceeded'))
class RunJobBatchPipeline(pipeline.Pipeline):
"""Pipeline to run a list of BigQuery job and block on their completion.
Arguments:
configs: A list of job configurations. The format is defined in the
"configuration" section of the documentation for
bq_service.jobs().insert(...).
time_limit: The number of seconds we're willing to wait before
considering a job stalled and trying again. Or None if we should
wait indefinitely. If specified, the jobs passed in should all be
idempotent.
retries_remaining: The number of times we're willing to retry after
this time. This is mostly used internally.
Returns:
A list of metadata dicts, each of which has the format of the return
value of bq_service.jobs().get(...).
"""
def run(self, configs, time_limit=None, retries_remaining=2):
job_service = bq_connection.BigQueryService.get_service().jobs()
job_ids = [None] * len(configs)
def handler(index, response):
job_ids[index] = response['jobReference']['jobId']
run_http_job_requests(
[job_service.insert(
projectId=bq_connection.BQ_PROJECT_ID,
body={
'projectId': bq_connection.BQ_PROJECT_ID,
'configuration': config
})
for config in configs],
handler,
# Setting this to be higher than 1 causes things to break. Looks
# like a BigQuery bug.
num_requests_per_batch=1)
job_results = yield WaitForJobBatchPipeline(
job_ids, wait_time_seconds=JOB_WAIT_INITIAL_TIME,
time_remaining_seconds=time_limit)
yield ConsiderJobRetry(job_results, time_limit, retries_remaining)
class ConsiderJobRetry(pipeline.Pipeline):
"""If any jobs failed and should be retried, retry them.
If any jobs actually are retried successfully, we patch the given
job_results list so that the new successful results replace the old failed
results.
TODO(alan): In the quotaExceeded case, we can be smarter about our response
rather than just blindly retrying with a cap on the number of times. I
think these semantics make the most sense:
-If we're not making progress (every job had quotaExceeded), add in a delay
of 30 seconds or so before retrying to let the system calm down.
-If we're making progress (some jobs were successful, while others had
quotaExceeded), don't decrement the retry count (or maybe reset it) and
retry immediately.
This should make it possible to throw hundreds of queries at
RunJobBatchPipeline and have BigQuery process them as fast as it can
without ever failing, while still protecting against the case where an
individual query fails due to too many queries from other jobs.
Arguments:
job_results: A list of job result dicts (the return value of
bq_service.jobs().get(...)).
time_limit: The job time limit to send to RunJobBatchPipeline.
retries_remaining: The number of times we're willing to retry.
Returns:
The job_results list, which may be updated with newly-retried job info.
"""
def run(self, job_results, time_limit, retries_remaining):
# List of (index, config) pairs. We need to track the original indexes
# so we can replace the right ones when we finish retrying.
retry_configs = []
for index, job_result in enumerate(job_results):
if should_retry_job(job_result):
logging.info('The following failure occurred, and will be '
'considered for retry: %s' % job_result)
retry_configs.append((index, job_result['configuration']))
if retry_configs:
if retries_remaining == 0:
raise bq_connection.BigQueryError(
'The following jobs failed and were retried too many '
'times: %s' % [config for _, config in retry_configs])
new_job_results = yield RunJobBatchPipeline(
[config for _, config in retry_configs], time_limit,
retries_remaining - 1)
yield UpdateJobResultsList(job_results, new_job_results,
[index for index, _ in retry_configs])
else:
yield common.Return(job_results)
class UpdateJobResultsList(pipeline.Pipeline):
"""Update some entries in a list of job results with new results.
This is used after retrying some failed jobs to combine their new
successful result values with the values of the jobs that succeeded
originally.
Arguments:
original_job_results: A list of job results for the entire batch, some
of which need to be replaced.
new_job_results: A list of job results to replace the original results.
replacement_indices: A list of indices. The nth index is the place in
original_job_results to put new_job_results[n].
Returns:
A list based on original_job_results, but with some replacements made,
as described by new_job_results and replacement_indices.
"""
def run(self, original_job_results, new_job_results, replacement_indices):
for new_job_result, index in zip(new_job_results, replacement_indices):
original_job_results[index] = new_job_result
return original_job_results
class WaitForJobBatchPipeline(pipeline.Pipeline):
"""Pipeline which waits for a list of BigQuery jobs to finish.
Arguments:
job_ids: A list of BigQuery job IDs to wait on.
wait_time_seconds: The initial amount of time to wait for the job to
finish. This is used to implement exponential backoff (the pipeline
will always wait indefinitely for the jobs to finish).
time_remaining_seconds: Either an integer number of seconds or None. If
not None, this pipeline will stop waiting after that many seconds
have passed and any jobs that are still not done will have their
statuses modified so that they're marked as DONE with a
timeLimitExceeded error (we use the same error format that BigQuery
uses since error-handling code expects that format).
Returns:
A list of metadata dicts describing the jobs' results, as returned by
bq_service.jobs().get(...). See the docstring on BigQueryService for a
link to the docs. Since we retry until the jobs are finished, this
metadata will always show the jobs as finished.
"""
def run(self, job_ids, wait_time_seconds, time_remaining_seconds):
job_service = bq_connection.BigQueryService.get_service().jobs()
# This list starts out with all Nones and gets filled in.
completed_results = [None] * len(job_ids)
remaining_job_results = []
remaining_job_indices = []
def handle_result(index, response):
if response['status']['state'] in ('PENDING', 'RUNNING'):
remaining_job_results.append(response)
remaining_job_indices.append(index)
else:
completed_results[index] = response
run_http_job_requests(
[job_service.get(projectId=bq_connection.BQ_PROJECT_ID,
jobId=job_id)
for job_id in job_ids],
handle_result)
# Compute other_results, an ordered list of job results for jobs that
# we wait for recursively.
if remaining_job_results:
if (time_remaining_seconds is not None and
time_remaining_seconds <= 0):
# If we're out of time, pretend the job finished with an error
# so we retry it. In practice, these long-running jobs can go
# for hours before finishing, so it's generally best to give up
# and try again.
for job in remaining_job_results:
job['status']['state'] = 'DONE'
job['status']['errorResult'] = {
'reason': 'ka:timeLimitExceeded'
}
other_results = remaining_job_results
else:
# Otherwise, keep waiting.
remaining_job_ids = [job['jobReference']['jobId']
for job in remaining_job_results]
with pipeline.InOrder():
yield common.Delay(seconds=wait_time_seconds)
if time_remaining_seconds is not None:
time_remaining_seconds -= wait_time_seconds
other_results = yield WaitForJobBatchPipeline(
remaining_job_ids,
wait_time_seconds * JOB_WAIT_BACKOFF_MULTIPLIER,
time_remaining_seconds)
else:
other_results = []
yield UpdateJobResultsList(completed_results, other_results,
remaining_job_indices)
def run_http_job_requests(
requests, callback,
num_requests_per_batch=DEFAULT_NUM_REQUESTS_PER_BATCH):
"""Runs the given HTTP requests relating to BigQuery jobs.
If any HTTP request fails or any request returns a job failure, an
exception is thrown.
Arguments:
requests: A list of HttpRequest objects.
callback: A function taking an index and response parameter. It is
called for each request that is returned. The "index" parameter is
the index of the http request in the requests list, and the
response is the response to the HTTP request.
num_requests_per_batch: The number of requests we're we're willing to
send in the same request batch.
Unlike the more general batch call provided by BatchHttpRequest, the given
callback should only have a single parameter for the response.
"""
def new_callback(request_id, response, exception):
if exception:
raise exception
# If the job failed and is eligible for retry, return the failure
# normally instead of failing now so that the parent pipeline has a
# chance to handle the failure.
if ('errorResult' in response['status'] and
not should_retry_job(response)):
raise bq_connection.BigQueryError(
'Job failed: %s' % response['status']['errorResult'])
index = int(request_id)
callback(index, response)
run_http_requests(requests, new_callback, num_requests_per_batch)
def run_http_requests(
requests, callback,
num_requests_per_batch=DEFAULT_NUM_REQUESTS_PER_BATCH):
"""Runs all HttpRequests as a batch with the same callback.
If BigQuery sends back errors because the request rate limit is exceeded,
this function attempts to back off and retry those failures later, although
it eventually will give up.
Arguments:
requests: A list of HttpRequest objects (such as those returned by
function calls on bq_service).
callback: A callback to run on each result. See the documentation on
BatchHttpRequest for more details. The request_ids provided to the
callback will be the string value of the index into the requests
list, but the order in which the callbacks are called is not
guaranteed to be the same as the order of the requests.
num_requests_per_batch: The number of requests we're willing to send in
the same request batch. This throttling exists to avoid BigQuery
errors from sending too many requests at once, and to work around
a bug where BigQuery will only let you insert one job at a time.
"""
request_ids = [str(num) for num in xrange(len(requests))]
for id_and_request_batch in partition(zip(request_ids, requests),
num_requests_per_batch):
requests_by_id = dict(id_and_request_batch)
run_http_requests_with_retry(
requests_by_id, callback,
retries_remaining=NUM_HTTP_RETRY_ATTEMPTS - 1)
def run_http_requests_with_retry(requests_by_id, callback, retries_remaining):
"""Run the given requests as a batch, retrying as necessary.
If we hit any rate limiting errors or internal BigQuery errors, we sleep
for half a second and try again with the failed requests. If the retry
limit is exceeded, it is reported to the callback in the same way that
other exceptions are reported.
Arguments:
requests_by_id: A dictionary mapping string request ID to the request
itself.
callback: A callback to handle the requests, as described in the docs
for BatchHttpRequest.
retries_remaining: The number of times we're willing to retry again
before giving up.
"""
ids_needing_retry = []
def retry_collecting_callback(request_id, response, exception):
if should_retry_http_exception(exception) and retries_remaining > 0:
ids_needing_retry.append(request_id)
else:
callback(request_id, response, exception)
http_batch = apiclient_http.BatchHttpRequest()
for request_id, request in requests_by_id.iteritems():
http_batch.add(request, callback=retry_collecting_callback,
request_id=request_id)
http_batch.execute()
# If we get a retriable exception (which happens in the case of rate
# limiting and internal BigQuery errors), back off a bit to let the system
# settle down, then retry.
if ids_needing_retry:
logging.warning('After submitting %s requests at once, %s needed to '
'be retried.' %
(len(requests_by_id), len(ids_needing_retry)))
time.sleep(0.5)
requests_by_id_to_retry = {request_id: requests_by_id[request_id]
for request_id in ids_needing_retry}
run_http_requests_with_retry(requests_by_id_to_retry, callback,
retries_remaining - 1)
def should_retry_http_exception(exception):
"""Returns True if the given HTTP exception should be retried.
The two main cases that are worth retrying are when we've hit BigQuery API
request rate limits or when BigQuery has some other error and gives a
message saying "Unexpected. Please try again.".
"""
if not exception or not isinstance(exception, apiclient_errors.HttpError):
return False
content = json.loads(exception.content)
code, message = content['error']['code'], content['error']['message']
return ((code == 403 and message.startswith('Exceeded rate limits')) or
(code == 500 and 'Please try again' in message))
def partition(source_list, partition_size):
"""Break the given list up into a list of lists, each of the given size.
Every resulting list will have size partition_size except possibly the last
one, and no list will be empty.
"""
return [source_list[n:n + partition_size]
for n in xrange(0, len(source_list), partition_size)]
"""Module responsible for providing the active experiments and conversions."""
from __future__ import absolute_import
import collections
from third_party import markdown
from bigbingo import alternative_deciders
class Experiment(object):
"""Configuration for an experiment.
Fields:
id: A string identifying the experiment, which can consist only of
alphanumeric characters and underscores. This string is used in
table names, so it is recommended that the ID be relatively short.
Every experiment must have a unique ID.
logged_name: The name of the experiment that shows up in logs and in
strings in BigQuery. For legacy experiments, this is display_name,
and for new-style experiments, this is experiment_id. This field
only exists for transition reasons, and should be removed when the
last legacy experiment is removed.
display_name: A human-readable name for the experiment, used for
display purposes, which is allowed to have arbitrary characters.
legacy_hashable_name: For legacy experiments, the name for hashing
purposes, which is either the family name or the canonical name of
the experiment. None if the experiment is not a legacy experiment.
alternative_weights: An OrderedDict from alternative name (a string) to
weight (an integer).
control: A string of the experiment's control, which is used for
display purposes.
alternative_decider: A function from (Experiment, bingo_id) to
alternative which determines the alternative assigned to the given
user. This function should be stable in the sense that it always
gives the same alternative to any particular user. If the function
returns None, the user will be excluded from the test entirely.
owner: A string name of the person responsible for the experiment.
description: A string explaining the experiment, which will show up
when viewing the experiment in the BigBingo dashboard.
conclusion: Specify None for active experiments. For finished
experiments, provide a string explaining any results and
conclusions from the experiment.
"""
# We can't serialize alternative_decider because it's a function. We also
# shouldn't serialize alternative_weights because it's scary to send
# alternative strings as dictionary keys since they could get inadvertently
# camel-cased.
_serialize_blacklist = ['alternative_decider', 'alternative_weights']
def __init__(self, exp_id, logged_name, display_name, legacy_hashable_name,
alternative_weights, control, alternative_decider, owner,
description, conclusion):
assert control in alternative_weights, (
'The control must be one of the alternatives.')
assert alternative_decider is not None
self.id = exp_id
self.logged_name = logged_name
self.display_name = display_name
self.legacy_hashable_name = legacy_hashable_name
self.alternative_weights = alternative_weights
self.control = control
self.alternative_decider = alternative_decider
self.owner = owner
self.description = description
self.conclusion = conclusion
def choose_alternative(self, bingo_id):
result = self.alternative_decider(self, bingo_id)
if result is not None and result not in self.alternatives:
raise ValueError(
'The chosen alternative in experiment %s was unexpectedly %s, '
'was expecting one of %s.' % (self.id, result,
self.alternatives))
return result
@property
def alternatives(self):
return self.alternative_weights.keys()
@property
def is_archived(self):
return self.conclusion is not None
@property
def description_html(self):
return markdown.markdown(self.description)
@property
def conclusion_html(self):
if self.conclusion is not None:
return markdown.markdown(self.conclusion)
@property
def is_previewable(self):
return not self.is_archived
class Conversion(object):
"""Configuration for a conversion.
Fields:
id: A string identifying the conversion, which must consist only of
alphanumeric characters and underscores.
column_name: The string to use in column names in BigQuery. This is
often the same as the ID, but may be different for compatibility
reasons.
logged_name: The name of the conversion that shows up in the logs.
display_name: A friendly name of the conversion.
"""
# Ignore logged_name for consistency with DerivedConversion (and since it's
# more of an implementation detail anyway).
_serialize_blacklist = ['logged_name']
def __init__(self, conv_id, column_name, logged_name, display_name):
self.id = conv_id
self.column_name = column_name
self.logged_name = logged_name
self.display_name = display_name
class DerivedConversion(object):
"""Configuration for a derived conversion.
A derived conversion looks like a regular conversion but isn't valid to
log. Instead, it's created by the summarize task based on other conversion
values using the "aggregator" field.
"""
_serialize_blacklist = ['aggregator']
def __init__(self, conv_id, display_name, aggregator):
self.id = conv_id
self.column_name = conv_id
self.display_name = display_name
self.aggregator = aggregator
def experiment(exp_id, display_name, alts, owner, description, conclusion=None,
control=None, decider=alternative_deciders.english_only):
"""Creates a standard BigBingo experiment.
The experiment may not be used in GAE/Bingo. See the fields on the
Experiment class for information on how to call this function.
"""
if control is None:
control = default_control(alts)
if description:
description = strip_indentation(description)
if conclusion:
conclusion = strip_indentation(conclusion)
return Experiment(
exp_id=exp_id, logged_name=exp_id, display_name=display_name,
legacy_hashable_name=None, alternative_weights=alts, control=control,
alternative_decider=decider, owner=owner, description=description,
conclusion=conclusion)
def default_control(alts):
"""Find the alternative that looks most like the control."""
for alternative in alts:
if any(word in alternative.lower()
for word in ('old', 'original', 'false', 'control')):
return alternative
return alts.keys()[0]
def strip_indentation(s):
return '\n'.join(line.lstrip() for line in s.split('\n'))
def bigbingo_conversion(conversion_id, display_name):
return Conversion(conv_id=conversion_id, column_name=conversion_id,
logged_name=conversion_id, display_name=display_name)
def alternatives(*alts):
""""Specifies a list of equal-weighted alternatives."""
return weighted_alternatives(*((alt, 1) for alt in alts))
def weighted_alternatives(*weighted_alts):
"""Specifies a list of weighted alternatives for an experiment.
Each argument should be a pair of (string, int) that specifies the name and
weight of the given alternative.
"""
for name, weight in weighted_alts:
assert isinstance(name, basestring), 'Alternatives must be strings.'
assert isinstance(weight, int), 'Weights must be integers.'
return collections.OrderedDict(weighted_alts)
# This list started out as alphabetical and then became chronologically ordered
# So, you should put new experiments at the bottom.
_ACTIVE_EXPERIMENTS = [
experiment(
exp_id='sample_expt',
display_name='Sample Experiment',
alts=alternatives('control', 'alt_1', 'alt_2'),
owner='alan',
description="""
This is an example experiment description.
Arbitrary markdown is allowed here.
""")
]
# Organized roughly in reverse chronological order. Experiments at the top
# were archived more recently.
_ARCHIVED_EXPERIMENTS = [
experiment(
exp_id='sample_archived_expt',
display_name='Sample Archived Experiment',
alts=weighted_alternatives(('control', 9), ('alt', 1)),
owner='alan',
description="""
All experiments end up here when they're archived. This makes sure
there is a record of all past experiments and lets the dashboard
still display old experiment information.
""",
conclusion="""
All archived experiments need to have a conclusion filled in.
"""
)
]
# Set of string experiment names that are allowed to be used in the code, but
# are not tracked by BigBingo. This should be the logged name, so for legacy
# experiments (the common case), the full experiment name.
_UNKNOWN_EXPERIMENT_WHITELIST = {
}
# A source conversion is a conversion that is logged directly. This is contrast
# to a derived conversion, which is computed during the BigBingo process.
# Alphabetical order, with deprecated conversions at the bottom.
# It's currently impossible to remove conversions from the BigBingo system.
_SOURCE_CONVERSIONS = [
bigbingo_conversion('problem_attempt', 'Attempted to do a problem'),
bigbingo_conversion('return_visit', 'Returned to the site'),
]
# These conversions are allowed to be used in the code, but are not tracked by
# BigBingo.
_UNKNOWN_SOURCE_CONVERSION_WHITELIST = {
}
# A derived conversion is something that is computed from other existing
# conversion values and simple rules. For example, we may be interested in
# the question "did the user do any exercise problems the day after being
# exposed to an experiment?". This can be computed from the
# `problem_attempt_count` conversion by only counting the ones that were
# between 24hr and 48hr after the participation event.
#
# The format of an entry is as follows:
# '<derived_conversion_name>': {
# # A sub-section of a SELECT statement that deals with the column
# # of the resulting conversion. Records being aggregated are
# # rows from a join of the conversion events, and participation
# # events.
# 'aggregator': '<part of a SELECT>'
# }
_DERIVED_CONVERSIONS = [
DerivedConversion(
'problem_attempt_next_day', 'Problem attempt next day',
aggregator="""
SUM(IF(
conv_problem_attempt AND
logs.time >= part.participation_time + (24 * 3600) AND
logs.time < part.participation_time + (48 * 3600),
1, 0))
"""),
]
# Normally, experiments can't be deleted; removing them config.py will cause
# the notification system to complain until it is added back. If you really
# want to remove an experiment, though, you can add its ID here and the next
# summarize task will forget it ever existed. This is mostly useful for testing
# or to remove experiments that never should have existed in the first place.
_ALLOWED_EXPERIMENT_IDS_TO_DELETE = []
_LOGGED_EXPERIMENT_NAME_SET = {exp.logged_name for exp in _ACTIVE_EXPERIMENTS}
_LOGGED_SOURCE_CONVERSION_NAME_SET = {conversion.logged_name
for conversion in _SOURCE_CONVERSIONS}
_ACTIVE_EXPERIMENTS_BY_ID = {exp.id: exp for exp in _ACTIVE_EXPERIMENTS}
_ARCHIVED_EXPERIMENTS_BY_ID = {exp.id: exp for exp in _ARCHIVED_EXPERIMENTS}
_FINAL_CONVERSIONS_BY_ID = collections.OrderedDict(
[(conversion.id, conversion) for conversion in _SOURCE_CONVERSIONS] +
[(conversion.id, conversion) for conversion in _DERIVED_CONVERSIONS]
)
def get_active_experiments():
return _ACTIVE_EXPERIMENTS
def get_archived_experiments():
return _ARCHIVED_EXPERIMENTS
def get_active_experiment_by_id(exp_id):
"""Returns the experiment with the given ID.
Raises KeyError if the experiment doesn't exist.
"""
return _ACTIVE_EXPERIMENTS_BY_ID[exp_id]
def get_possibly_archived_experiment(exp_id):
"""Returns the given experiment, even if it's in the archive.
Returns None if the experiment doesn't exist.
"""
return (_ACTIVE_EXPERIMENTS_BY_ID.get(exp_id) or
_ARCHIVED_EXPERIMENTS_BY_ID.get(exp_id))
def is_experiment_valid(experiment_name):
"""Determines if the experiment string is allowed to be logged."""
return (experiment_name in _LOGGED_EXPERIMENT_NAME_SET or
is_experiment_whitelisted(experiment_name))
def is_experiment_whitelisted(experiment_name):
return experiment_name in _UNKNOWN_EXPERIMENT_WHITELIST
def get_conversion_by_id(conv_id):
"""Gets a conversion by ID, or crashes if the conversion doesn't exist.
Note that the result might be a regular Conversion or it might be a
DerivedConversion.
"""
return _FINAL_CONVERSIONS_BY_ID[conv_id]
def get_source_conversion_by_id(conv_id):
conv = get_conversion_by_id(conv_id)
if not isinstance(conv, Conversion):
raise KeyError('The conversion %s was not a source conversion.' %
conv_id)
return conv
def is_source_conversion_id_valid(conv_id):
try:
get_source_conversion_by_id(conv_id)
return True
except KeyError:
return False
def get_source_conversions():
return _SOURCE_CONVERSIONS
def get_source_conversion_ids():
"""Return a list of all conversions to extract from the logs.
Note the final list of conversions tracked and aggregated may be
expanded to include derived conversions. See get_final_conversion_ids()
and get_derived_conversions() for more details.
"""
return [conversion.id for conversion in _SOURCE_CONVERSIONS]
def is_source_conversion_valid(conversion_name):
return (conversion_name in _LOGGED_SOURCE_CONVERSION_NAME_SET or
conversion_name in _UNKNOWN_SOURCE_CONVERSION_WHITELIST)
def get_final_conversion_columns():
"""Return the final list of column names to use."""
return [conv.id for conv in _FINAL_CONVERSIONS_BY_ID.itervalues()]
def is_final_conversion_id_valid(conversion_id):
return conversion_id in _FINAL_CONVERSIONS_BY_ID
def get_derived_conversions():
"""Return information about derived conversions.
See the comment on _DERIVED_CONVERSIONS for details of the format.
"""
return _DERIVED_CONVERSIONS
def get_allowed_deletions():
return _ALLOWED_EXPERIMENT_IDS_TO_DELETE
"""Logic for config information kept in the datastore.
Although the primary source of truth for BigBingo configuration is config.py,
we store some information in the datastore so that we can detect what changed
and send the appropriate notifications.
"""
from __future__ import absolute_import
from google.appengine.ext import ndb
from third_party import alertlib
import db_util
from bigbingo import config
@db_util.disable_ndb_memcache
class BigBingoExperiment(ndb.Model):
"""Entity tracking the existence of an experiment.
Note that config.py is still the canonical source of truth for the active
experiments and their configuration. The datastore is useful for detecting
when config.py is different from last time and responding appropriately
(e.g. sending notifications for new/stopped experiments and giving a
warning when an experiment has mysteriously disappeared.)
"""
start_time = ndb.DateTimeProperty(indexed=False, auto_now_add=True)
last_updated = ndb.DateTimeProperty(indexed=False, auto_now=True)
owner = ndb.StringProperty(indexed=False)
is_archived = ndb.BooleanProperty(indexed=False)
def send_notification(subject, notification, short_notification, color):
"""Sends a notification to HipChat.
Arguments:
subject: The subject line for the email to send.
notification: A possibly-long string with the notification text.
short_notification: A version of the notification that can fit
comfortably in a HipChat message.
color: A valid color to use in HipChat. See the documentation on
send_to_hipchat for more details.
"""
alertlib.Alert(short_notification, html=True).send_to_hipchat(
'1s and 0s', color, sender='BigBingo Baboon')
alertlib.Alert(notification, subject, html=True).send_to_email(
'experiments-blackhole@khanacademy.org')
def notify_new_active_experiment(experiment):
subject = 'Experiment started: %s' % experiment.display_name
format_str = ("%s's new BigBingo experiment %s just started.\n"
"Description: %s")
link = experiment_link(experiment)
notification = format_str % (experiment.owner, link,
experiment.description_html)
short_notification = format_str % (experiment.owner, link,
truncate(experiment.description))
send_notification(subject, notification, short_notification, 'purple')
def notify_archived_experiment(experiment):
subject = 'Experiment stopped: %s' % experiment.display_name
format_str = "%s's BigBingo experiment %s just stopped.\nConclusion: %s"
link = experiment_link(experiment)
notification = format_str % (experiment.owner, link,
experiment.conclusion_html)
short_notification = format_str % (experiment.owner, link,
truncate(experiment.conclusion))
send_notification(subject, notification, short_notification, 'purple')
def notify_missing_experiment(experiment_entity):
subject = 'Missing experiment: %s' % experiment_entity.key.string_id()
message = (
'Error: The BigBingo experiment %s, owned by %s, has gone missing! '
'Please add it back to bigbingo/config.py. If you meant to stop the '
'experiment, you can do so by adding it to the list of archived '
'experiments.' %
(experiment_entity.key.string_id(), experiment_entity.owner))
send_notification(subject, message, message, 'red')
def notify_unarchived_experiment(experiment):
subject = 'Unarchived experiment: %s' % experiment.display_name
message = (
"That's strange, the %s's BigBingo experiment %s was moved from the "
"archive back to being an active experiment. If you actually want to "
"resume the experiment, you'll have missing data unless you manually "
"re-run the summarize task over the time that the experiment was "
"archived." % (experiment.owner, experiment_link(experiment)))
send_notification(subject, message, message, 'yellow')
def notify_new_archived_experiment(experiment):
subject = 'New archived experiment: %s' % experiment.display_name
message = (
"That's strange, %s's BigBingo experiment %s was seen in the list of "
"archived experiments, but it hasn't ever been active!" %
(experiment.owner, experiment_link(experiment)))
send_notification(subject, message, message, 'yellow')
def experiment_link(experiment):
return ('<a href="http://khanacademy.org/bigbingo/experiment/%s">%s</a>' %
(experiment.id, experiment.display_name))
def truncate(string):
return string if len(string) < 50 else (string[:50] + '...')
def warn_for_missing_experiments(experiment_entities):
# If any experiments are in the datastore but not the config, send a
# notification that you're not supposed to do that. Note that we
# intentionally do NOT remove the entity from the datastore, since we want
# to keep poking HipChat until it gets fixed. If people really want to make
# an exception to that rule, though, they can set config.py to allow some
# experiments to be deleted, in which case we actually delete them here.
keys_to_delete = []
for entity in experiment_entities:
experiment_id = entity.key.string_id()
if config.get_possibly_archived_experiment(experiment_id) is None:
if experiment_id in config.get_allowed_deletions():
keys_to_delete.append(entity.key)
else:
notify_missing_experiment(entity)
if keys_to_delete:
ndb.delete_multi(keys_to_delete)
def update_config_and_notify():
"""Update the datastore with any config changes and send notifications.
Every change to config.py is worth broadcasting, so we keep track of the
previous set of experiments so we can see what changed. For new and
newly-stopped experiments, we announce them so people have an understanding
of what experiments are going on. If an experiment starts in the archive
or changes from archived to active, it is probably worth having a human
sanity-check it anyway, so we broadcast warnings for those cases. Finally,
if an experiment used to exist and doesn't anymore, we repeatedly give an
error, because the experiment was really supposed to be archived instead.
"""
# There currently aren't that many experiments, so it should be fine to
# just fetch all of them. Also, this query is only eventually consistent,
# but that's fine since it runs so infrequently and occasional incorrect
# notifications aren't a big deal anyway.
all_experiment_entities = BigBingoExperiment.query().fetch()
experiment_entities_by_key = {
entity.key: entity for entity in all_experiment_entities}
warn_for_missing_experiments(all_experiment_entities)
entities_to_store = []
for experiment in (config.get_active_experiments() +
config.get_archived_experiments()):
experiment_key = ndb.Key(BigBingoExperiment, experiment.id)
experiment_entity = experiment_entities_by_key.get(experiment_key)
if experiment_entity is None:
# The entry in config.py is new.
if experiment.is_archived:
notify_new_archived_experiment(experiment)
else:
notify_new_active_experiment(experiment)
entities_to_store.append(
BigBingoExperiment(key=experiment_key, owner=experiment.owner,
is_archived=experiment.is_archived))
elif experiment_entity.is_archived != experiment.is_archived:
# The entry in config.py moved between the archive and active list.
if experiment.is_archived:
notify_archived_experiment(experiment)
else:
notify_unarchived_experiment(experiment)
experiment_entity.is_archived = experiment.is_archived
entities_to_store.append(experiment_entity)
if entities_to_store:
ndb.put_multi(entities_to_store)
The MIT License (MIT)
Copyright (c) 2014 Khan Academy
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""Utilities for dealing with BigBingo events in the logs."""
from __future__ import absolute_import
import json
import logging
from bigbingo import config
import event_log
import ka_globals
_PARTICIPATION_EVENT_PREFIX = 'BINGO_PARTICIPATION_EVENT:'
_CONVERSION_EVENT_PREFIX = 'BINGO_CONVERSION_EVENT:'
class InvalidBingoEventError(Exception):
pass
def log_participation_event(bingo_identity, experiment_name, alternative):
if not config.is_experiment_valid(experiment_name):
message = ('The experiment named "%s" was used, but was not found in '
'bigbingo/config.py . All active experiments must be registered '
'there (or added to the experiment whitelist).' % experiment_name)
if ka_globals.is_dev_server:
raise InvalidBingoEventError(message)
else:
logging.error(message)
logging.debug(_PARTICIPATION_EVENT_PREFIX +
json.dumps({
'bingo_id': bingo_identity,
'experiment': experiment_name,
'alternative': alternative
}))
def log_conversion_event(bingo_identity, conversion_id):
"""Records that the given conversion event happened.
Normal application code should not call this directly; it should use
bigbingo.mark_conversion instead.
"""
event_log.log_event('bingo.param', conversion_id)
logging.debug(_CONVERSION_EVENT_PREFIX +
json.dumps({
'bingo_id': bingo_identity,
'conversion': conversion_id,
}))
def parse_participation_events(log_lines):
"""Returns a generator for all participation events in the given logs.
The yielded results are dicts constructed from the logged JSON.
Arguments:
log_lines: A list of strings, which typically correspond to all log
lines in a particular request.
"""
return _parse_events_with_prefix(log_lines, _PARTICIPATION_EVENT_PREFIX)
def parse_conversion_events(log_lines):
"""Returns a generator for all conversion events in the given logs.
The input and output format are the same as parse_participation_events, so
see its docstring for more details.
"""
return _parse_events_with_prefix(log_lines, _CONVERSION_EVENT_PREFIX)
def _parse_events_with_prefix(log_lines, prefix):
for line in log_lines:
if not line.startswith(prefix):
continue
yield json.loads(line[len(prefix):])
"""Summarize task for BigBingo experiments
This task regularly runs in the background to aggregate the latest bingo log
data. The raw data that we operate on are the bingo_participation_events and
bingo_conversion_events records generated from the request logs in
log_to_bigquery.py.
"""
from __future__ import absolute_import
import time
from third_party.mapreduce.lib.pipeline import common
from third_party.mapreduce.lib.pipeline import pipeline
from bigbingo import config
from bigbingo import datastore_config
from bigquery import bq_connection
from bigquery import bq_pipelines
import log_to_bigquery
import pipeline_util
import request_handler
import user_util
# All fields should be nullable because the participants table is replaced by
# the result of the query when the summarize process runs, and the inferred
# schema has every field as nullable when writing a query result to a table.
PARTICIPANTS_SCHEMA = {
'fields': [
{'name': 'experiment', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'bingo_id', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'alternative', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'participation_time', 'type': 'FLOAT', 'mode': 'NULLABLE'},
]
}
def get_conversions_schema():
return {
'fields': [
{'name': 'experiment', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'bingo_id', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'alternative', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'last_updated_time', 'type': 'FLOAT', 'mode': 'NULLABLE'}
] + [
{'name': 'conv_' + conv_col, 'type': 'INTEGER', 'mode': 'NULLABLE'}
for conv_col in config.get_final_conversion_columns()
]
}
def get_conversion_snapshots_schema():
fields = [
{'name': 'experiment', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'alternative', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'snapshot_time', 'type': 'INTEGER', 'mode': 'NULLABLE'}
]
for conv_id in config.get_final_conversion_columns():
fields.extend([
{'name': 'count_' + conv_id,
'type': 'INTEGER', 'mode': 'NULLABLE'},
{'name': 'binary_' + conv_id,
'type': 'INTEGER', 'mode': 'NULLABLE'}
])
return {'fields': fields}
class StartSummarize(request_handler.RequestHandler):
"""Kicks off the BigBingo summarize task to process a time range of logs.
Arguments:
dataset: The name of the dataset to work with. In production, this is
always the "bigbingo" dataset, but for manual testing, you should
choose a different one, such as "bigbingo_test". This paramater is
required since a mistake could cause data corruption.
logs_dataset: The name of the "base dataset" used in LogToBigQuery.
In practice, and by default, this is a dataset called "logs". The
actual logs table(s) used are either in this dataset or in the
hourly version of this dataset (normally "logs_hourly"), depending
on whether use_daily_logs is set.
publish_dataset: The name of the dataset to copy result tables to, or
None if no publishing should occur.
archive_dataset: The name of the dataset to which to archive old
experiments, or None if no archiving should occur.
end_time: The upper bound on the time range to process, specified as a
UNIX timestamp in seconds. This time must be on an hour boundary.
If not specified, the hour boundary before the most recent one is
used (e.g. end_time is 3:00 if the current time is 4:30). This
allows the LogToBigQuery process to have enough time to export the
logs to BigQuery before we use them.
num_hours: The number of hours of data to process (going backward from
end_time). All hours processed must fall within the same day (UTC).
If not specified, we process 2 hours.
is_initial_run: If 0 (the default) we expect to find tables from a
previous run of the summarize task. If 1, dummy empty versions of
these tables will be created.
use_daily_logs: If 0 (the default), we read from logs tables that each
contain an hour of data. If 1, we read from the daily logs table
containing the hours that we are processing. It is usually
preferable to use the hourly logs tables so that the summarize task
will automatically fail if LogToBigQuery is failing.
"""
# The admin requirement is enforced by the /admin path. We can't directly
# use admin_required here since it stops cron from being able to access it.
@user_util.manual_access_checking
def get(self):
dataset = self.request_string('dataset')
if not dataset:
raise ValueError('The dataset name must be specified.')
logs_dataset = self.request_string('logs_dataset', 'logs')
publish_dataset = self.request_string('publish_dataset', None)
archive_dataset = self.request_string('archive_dataset', None)
# If unspecified, end_time is computed in get_and_validate_time_window.
end_time = self.request_int('end_time', -1)
num_hours = self.request_int('num_hours', 2)
is_initial_run = self.request_bool('is_initial_run', False)
use_daily_logs = self.request_bool('use_daily_logs', False)
start_time, end_time = self.get_and_validate_time_window(
end_time, num_hours, use_daily_logs)
summarize_pipeline = SummarizePipeline(
dataset, logs_dataset, publish_dataset, archive_dataset,
start_time, end_time, is_initial_run,
use_daily_logs).with_params(target='mapreduce')
summarize_pipeline.start(queue_name='bigbingo-queue')
self.response.out.write(
'Successfully started summarize task. '
'<a href="/_ah/pipeline/status?root=%s">Monitor task</a>' %
summarize_pipeline.pipeline_id)
@staticmethod
def get_and_validate_time_window(end_time, num_hours, use_daily_logs):
"""Validates and returns a [start_time, end_time) time range to use.
The time window is defined by ending at end_time and going back
num_hours hours. The start and end time need to be hour-aligned.
Arguments:
end_time: The last timestamp to process, as a UNIX time in seconds,
or -1. If this argument is -1, the hour mark before the most
recent one is used.
num_hours: The size of the window to process, in hours.
use_daily_logs: True if the daily logs table should be used instead
of the hourly logs tables.
Returns:
A pair (start_time, end_time). The end_time is returned since it is
filled in when it is -1.
"""
if end_time == -1:
current_time_seconds = time.time()
end_time = (int(current_time_seconds / 3600) - 1) * 3600
if end_time % 3600 != 0:
raise ValueError('The end_time parameter must be exactly on an '
'hour boundary.')
start_time = end_time - num_hours * 3600
if use_daily_logs and (
start_time // (24 * 3600) != (end_time - 1) // (24 * 3600)):
raise ValueError('The specified time window crosses multiple '
'days, which is not allowed.')
return start_time, end_time
class StartSummarizeBackfill(request_handler.RequestHandler):
"""Kicks off multiple summarize tasks in series.
The time range can span multiple days, but each individual summarize task
should be contained within one day. The arguments are very similar to the
arguments for StartSummarize.
Arguments:
dataset: The name of the dataset for the BigBingo tables.
logs_dataset: The base dataset containing the logs to process.
publish_dataset: (Optional) The dataset to copy result tables to.
archive_dataset: The name of the dataset to which to archive old
experiments, or None if no archiving should occur. For backfill
jobs not running up to the present, this should be None unless you
know what you're doing. (Otherwise, data on experiments recently
archived from the time after your backfill ends until the
experiment was archived could be lost.)
start_time: The start of the time range to process. This must be on an
hour boundary.
end_time: The end of the time range to process. This must be on an hour
boundary.
num_hours_per_run: The size of each summarize task to run. Larger
values make the backfill faster and cheaper, but more likely to
fail.
is_initial_run: False if we expect previous tables to already exist.
use_daily_logs: True if we should use daily rather than hourly logs.
"""
@user_util.admin_required
def get(self):
dataset = self.request_string('dataset')
if not dataset:
raise ValueError('The dataset name must be specified.')
logs_dataset = self.request_string('logs_dataset', 'logs')
publish_dataset = self.request_string('publish_dataset', None)
archive_dataset = self.request_string('archive_dataset', None)
start_time = self.request_int('start_time')
end_time = self.request_int('end_time')
num_hours_per_run = self.request_int('num_hours_per_run')
is_initial_run = self.request_bool('is_initial_run', False)
use_daily_logs = self.request_bool('use_daily_logs', False)
# TODO(alan): This first check is overly defensive in a lot of cases.
if 24 % num_hours_per_run != 0:
raise ValueError('num_hours_per_run must evenly divide a day.')
if start_time % (num_hours_per_run * 3600) != 0:
raise ValueError('start_time must be on a boundary defined by '
'num_hours_per_run.')
if end_time % (num_hours_per_run * 3600) != 0:
raise ValueError('end_time must be on a boundary defined by '
'num_hours_per_run.')
summarize_pipeline = BackfillSummarizePipeline(
dataset, logs_dataset, publish_dataset, archive_dataset,
start_time, end_time, num_hours_per_run, is_initial_run,
use_daily_logs).with_params(target='mapreduce')
summarize_pipeline.start(queue_name='bigbingo-queue')
self.response.out.write(
'Successfully started backfill summarize task. '
'<a href="/_ah/pipeline/status?root=%s">Monitor task</a>' %
summarize_pipeline.pipeline_id)
def get_logs_table_expr(logs_base_dataset, start_time, end_time,
use_daily_logs):
"""Returns a table expression for the logs table(s) to query over.
The simple (although non-default) case is that we just return a single
daily table, like "logs.requestlogs_20140421". The default case, though, is
that we want to select over a logs table for each hour, like
"logs_hourly.20140421_00, logs_hourly.20140421_01" (note that the comma
here is the BigQuery way to do UNION ALL).
Arguments:
logs_base_dataset: The base dataset name to use, usually "logs". This
is the name of the daily logs dataset, and appending "_hourly" to
the end finds the hourly logs dataset.
start_time: The start of the time range we are processing.
end_time: The end of the time range we are processing.
use_daily_logs: False if we should read from the hourly logs, True if
we should read from the daily logs.
"""
if use_daily_logs:
dataset = logs_base_dataset
return dataset + '.' + log_to_bigquery.daily_logs_table_name(end_time)
else:
dataset = logs_base_dataset + '_hourly'
# The name function takes the end time, so compute it from each start
# time by adding 1 hour.
return ', '.join(
dataset + '.' + log_to_bigquery.hourly_logs_table_name(
hour_start_time + 3600)
for hour_start_time in xrange(start_time, end_time, 3600)
)
def participants_table(unix_time):
"""Returns the name of the given participants table."""
return table_for_time('participants', unix_time)
def conversions_table(unix_time):
"""Returns the name of the given conversions table."""
return table_for_time('conversions', unix_time)
def pending_conversions_table(start_time, end_time):
"""Returns the name of the given pending conversions table."""
return table_for_time_range('pending_conversions', start_time, end_time)
def new_participation_events_table(start_time, end_time):
return table_for_time_range('new_participation_events', start_time,
end_time)
def new_conversion_events_table(start_time, end_time):
return table_for_time_range('new_conversion_events', start_time, end_time)
def table_for_time(base_name, unix_time):
"""Returns the name to use for the given table as of the given time.
BigBingo tables are given timestamps to more reliably ensure that the
summarize task is working off of the correct data, and so that the
summarize task can fail at any point in the middle without causing
problems.
The given unix_time should be the start or end time for a summarize task,
so it should be exactly on an hour boundary. The actual hour given is in
GMT.
"""
return base_name + '_' + time.strftime('%Y_%m_%d_%H',
time.gmtime(unix_time))
def table_for_time_range(base_name, start_time, end_time):
"""Returns the name to use for a table with data from a time range.
When a table represents data corresponding to a range of time (e.g. the
conversion events happening between 3pm and 6pm), we name it with a time
range. This is good practice because it helps in debugging and it guards
against pathological cases such as the time ranges [1pm, 3pm] and
[2pm, 3pm] being processed at the same time.
"""
return table_for_time(table_for_time(base_name, start_time), end_time)
def raw_table(experiment_id):
"""The name of the table where the raw experiment data is stored.
This is a view in the publish dataset and a regular table in the archive.
It stores the complete per-bingo_id data for the experiment, to allow for
further analysis as desired.
"""
return 'raw_' + experiment_id + '_data'
def totals_table(experiment_id):
"""The name of the table where summary experiment data is archived.
This only exists in the archive, and stores the snapshots every two hours
that are displayed in the BigBingo dashboard.
"""
return 'historical_' + experiment_id + '_totals'
class BackfillSummarizePipeline(pipeline.Pipeline):
"""Run multiple summarize tasks in series.
See the documentation for StartSummarizeBackfill and SummarizePipeline for
descriptions of the arguments.
"""
def run(self, dataset, logs_dataset, publish_dataset, archive_dataset,
start_time, end_time, num_hours_per_run, is_initial_run,
use_daily_logs):
with pipeline.InOrder():
for batch_start_time in xrange(start_time, end_time,
num_hours_per_run * 3600):
batch_end_time = batch_start_time + num_hours_per_run * 3600
yield SummarizePipeline(
dataset, logs_dataset, publish_dataset, None,
batch_start_time, batch_end_time, is_initial_run,
use_daily_logs)
is_initial_run = False
if archive_dataset:
# Always do all the archiving at the end.
experiments_to_archive = [
e.id for e in config.get_archived_experiments()]
yield ArchiveExperimentsPipeline(
dataset, publish_dataset, archive_dataset, end_time,
experiments_to_archive)
class SummarizePipeline(pipeline.Pipeline):
"""Top-level pipeline for the summarize task.
Arguments:
dataset: The BigQuery dataset for the BigBingo input and output tables.
logs_dataset: The dataset containing the logs table to use.
publish_dataset: The dataset to copy the result tables to at the end,
or None if we shouldn't do any publishing.
archive_dataset: The dataset to which to archive any experiments that
should be archived, or None if we shouldn't do any archiving.
start_time: The inclusive low bound on the time range to process.
end_time: The exclusive high bound on the time range to process.
num_hours: The number of hours to process. This must be the difference
in hours between end_time and start_time.
is_initial_run: False if we should expect the previous tables to exist,
True otherwise.
use_daily_logs: True if we should
"""
output_names = ['cost']
def run(self, dataset, logs_base_dataset, publish_dataset, archive_dataset,
start_time, end_time, is_initial_run, use_daily_logs):
# Before actually running summarize, make sure we keep the datastore
# BigBingo knowledge up-to-date and send any relevant notifications.
datastore_config.update_config_and_notify()
logs_table_expr = get_logs_table_expr(
logs_base_dataset, start_time, end_time, use_daily_logs)
parts_extracted = yield ExtractParticipationEventsPipeline(
dataset, logs_table_expr, start_time, end_time)
convs_extracted = yield ExtractConversionEventsPipeline(
dataset, logs_table_expr, start_time, end_time)
old_tables_checked = yield CheckOrCreateOldTablesPipeline(
dataset, is_initial_run, start_time)
with pipeline.After(old_tables_checked, parts_extracted):
parts_processed = yield ProcessParticipationEventsPipeline(
dataset, start_time, end_time)
# Processing conversions depends on the new participants table, so
# wait until parts_processed is ready.
with pipeline.After(convs_extracted, parts_processed):
convs_processed = yield ProcessConversionEventsPipeline(
dataset, start_time, end_time)
with pipeline.After(convs_processed):
convs_built = yield BuildConversionsTablePipeline(
dataset, start_time, end_time)
with pipeline.After(parts_processed):
parts_computed = yield ComputeParticipantCountSummaryPipeline(
dataset, end_time)
with pipeline.After(convs_built):
convs_computed = yield ComputeConversionCountSummaryPipeline(
dataset, end_time)
# Track all pipelines that did any queries (since those are the
# only actions that cost money) so we can return the total cost
# at the end.
futures_with_cost = [
parts_extracted, convs_extracted, parts_processed,
convs_processed, convs_built, parts_computed,
convs_computed]
if publish_dataset:
with pipeline.After(parts_computed, convs_computed):
tables_published = yield PublishTablesPipeline(
dataset, publish_dataset, end_time)
futures_with_cost.append(tables_published)
if archive_dataset:
with pipeline.After(tables_published):
experiments_to_archive = [
e.id for e in config.get_archived_experiments()]
experiments_archived = yield ArchiveExperimentsPipeline(
dataset, publish_dataset, archive_dataset, end_time,
experiments_to_archive)
futures_with_cost.append(experiments_archived)
total_cost = yield common.Sum(
*[future.cost for future in futures_with_cost])
yield bq_pipelines.ReturnOutputs("Success", cost=total_cost)
def finalized(self):
pipeline_util.send_pipeline_result_email(
self,
sender='big-bingo-bot',
to='analytics-admin',
job_name='BigBingo Summarize')
class CheckOrCreateOldTablesPipeline(pipeline.Pipeline):
"""Sanity check the "previous" tables that we will be working off of.
The common case is that we're adding to existing experiment data, so the
tables corresponding to start_time should exist. If we know that we're
starting from scratch, though, we should just create stub tables instead.
"""
def run(self, dataset, is_initial_run, start_time):
old_participants_table = participants_table(start_time)
old_conversions_table = conversions_table(start_time)
bq_connection.ensure_bigquery_table_exists(
dataset, old_participants_table, PARTICIPANTS_SCHEMA,
create_if_necessary=is_initial_run,
patch_schema_if_necessary=False, ttl_days=7)
bq_connection.ensure_bigquery_table_exists(
dataset, old_conversions_table, get_conversions_schema(),
create_if_necessary=is_initial_run, patch_schema_if_necessary=True,
ttl_days=7)
# The conversion_snapshots table also needs to be updated with any new
# conversions, or else appending to that table will fail. It is not
# necessary for this operation to remove unused conversions, since
# BigQuery will automatically insert nulls for those columns when
# appending to the table.
# TODO(alan): The schema for this table will only grow over time, which
# can eventually cause problems and create bloat. One solution to this
# problem is to generate a new snapshots table every summarize run
# (since columns can be remove when making a new table), but that's not
# totally straightforward because of the way the table gets appended
# to. Another approach is to get rid of conversion columns in the
# schema for this table and just have a row for every conversion.
bq_connection.ensure_bigquery_table_exists(
dataset, 'conversion_snapshots',
get_conversion_snapshots_schema(), create_if_necessary=True,
patch_schema_if_necessary=True)
class ExtractParticipationEventsPipeline(pipeline.Pipeline):
"""Extract participation events from the logs.
You can't join on a repeated column, so this query extracts the events that
we're interested in into a flat table. It also gets rid of redundant events
(we know up-front that we can ignore participation events after the first
for each user and experiment).
"""
output_names = ['cost']
def run(self, dataset, logs_table_expr, start_time, end_time):
# Note that the logservice considers the end time to be the time for
# the request, so we need to use that as our notion of "request time"
# or else some hours will be split across two tables.
query = """
SELECT
bingo_participation_events.experiment AS experiment,
bingo_participation_events.bingo_id AS bingo_id,
bingo_participation_events.alternative AS alternative,
MIN(end_time) AS participation_time
FROM %(logs_table_expr)s
WHERE
bingo_participation_events.experiment IS NOT NULL
AND
end_time >= %(start_time)s
AND
end_time < %(end_time)s
GROUP EACH BY experiment, bingo_id, alternative
""" % {
'logs_table_expr': logs_table_expr,
'start_time': start_time,
'end_time': end_time
}
yield bq_pipelines.QueryToTablePipeline(
query, dataset,
new_participation_events_table(start_time, end_time),
result_table_ttl_days=1)
class ExtractConversionEventsPipeline(pipeline.Pipeline):
"""Builds a table of the conversion events for this summarize run."""
output_names = ['cost']
def run(self, dataset, logs_table_expr, start_time, end_time):
conversions = config.get_source_conversions()
query = """
SELECT
bingo_conversion_events.bingo_id AS bingo_id,
end_time AS time,
%(conversion_selects)s
FROM %(logs_table_expr)s
WHERE
bingo_conversion_events.bingo_id IS NOT NULL
AND
end_time >= %(start_time)s
AND
end_time < %(end_time)s
AND
bingo_conversion_events.conversion IN %(conversion_names)s
""" % {
'logs_table_expr': logs_table_expr,
'start_time': start_time,
'end_time': end_time,
# Store false as null since nulls are free.
'conversion_selects': ','.join(["""
IF(bingo_conversion_events.conversion = "%(conv_id)s",
true, null) AS conv_%(conv_id)s
""" % {
'conv_id': conv.id
} for conv in conversions]),
'conversion_names': '(' +
','.join('"%s"' % conv.id for conv in conversions) + ')'
}
yield bq_pipelines.QueryToTablePipeline(
query, dataset, new_conversion_events_table(start_time, end_time),
result_table_ttl_days=1)
class ProcessParticipationEventsPipeline(pipeline.Pipeline):
"""Determine the new set of participants for each experiment.
This stage unions the existing participants with the new participants and
does a GROUP BY to ensure that we don't include participants more than
once.
"""
output_names = ['cost']
def run(self, dataset, start_time, end_time):
old_participants_table = participants_table(start_time)
new_participants_table = participants_table(end_time)
events_table = new_participation_events_table(start_time, end_time)
query = """
SELECT
experiment,
bingo_id,
alternative,
MIN(participation_time) AS participation_time
FROM
%(events_table)s, -- UNION ALL
%(participants_table)s
GROUP EACH BY
experiment, bingo_id, alternative
""" % {
'events_table': dataset + '.' + events_table,
'participants_table': dataset + '.' + old_participants_table
}
yield bq_pipelines.QueryToTablePipeline(
query, dataset, new_participants_table, result_table_ttl_days=7)
class ProcessConversionEventsPipeline(pipeline.Pipeline):
"""Computes the new conversion values to add for this summarize job.
Given the new_conversion_events table (an experiment-independent table of
conversion events for this summarize task) and the newly-computed
participants table, we determine what values to add to what conversions.
The output table has the same format as the full conversions table, but
only contains conversion numbers for this summarize task. The new
conversions table is computed in BuildConversionsTablePipeline.
"""
output_names = ['cost']
def run(self, dataset, start_time, end_time):
conversions = config.get_source_conversions()
derived_conversions = config.get_derived_conversions()
new_participants_table = participants_table(end_time)
events_table = new_conversion_events_table(start_time, end_time)
pending_table = pending_conversions_table(start_time, end_time)
conversion_select_aggregators = [
'IFNULL(SUM(conv_%(conv_id)s), 0) AS conv_%(conv_id)s' % {
'conv_id': conv.id
} for conv in conversions
]
derived_conversion_aggregators = [
'%(aggregator)s AS conv_%(conv_id)s' % {
'conv_id': conv.id,
'aggregator': conv.aggregator,
}
for conv in derived_conversions
]
# Note: The JOIN EACH in this query might in the future fail with
# "Resources exceeded during query execution" as we get more
# participants. If that happens, we should fix this by changing the
# participants table to a subquery that only includes rows that are
# also in the logs table. This should make it small enough to be
# suitable as the right side of a JOIN.
# In particular, we should NOT switch the order of the tables, since
# the logs table might have many, many rows for a single bingo_id (e.g.
# if a bot is spamming us), and uneven key distributions on the right
# side of a JOIN can cause a failure.
query = """
SELECT
part.experiment AS experiment,
part.bingo_id AS bingo_id,
part.alternative AS alternative,
MAX(logs.time) AS last_updated_time,
%(conversion_selects)s
FROM
%(new_conversion_events_table)s logs
JOIN EACH
%(participants_table)s part
ON
logs.bingo_id = part.bingo_id
WHERE logs.time >= part.participation_time
GROUP EACH BY
experiment, bingo_id, alternative
""" % {
'new_conversion_events_table': dataset + '.' + events_table,
'participants_table': dataset + '.' + new_participants_table,
# For each conversion, take the sum over a boolean column which
# consists of true values (treated as 1s) and nulls. Since the
# sum over nulls is null, we need to convert that back to 0.
# Note that keeping null for the conversion total would work
# just fine and save quite a bit of space, but we need the
# table to be big enough that we can be confident that the
# inner GROUP EACH BY in ComputeConversionCountSummaryPipeline
# won't fail (a bigger table helps because BigQuery allocates
# resources based on the size of the tables being queried).
'conversion_selects': ','.join(
conversion_select_aggregators +
derived_conversion_aggregators)
}
yield bq_pipelines.QueryToTablePipeline(
query, dataset, pending_table, result_table_ttl_days=1)
class BuildConversionsTablePipeline(pipeline.Pipeline):
"""Compute the new per-user conversion summaries."""
output_names = ['cost']
def run(self, dataset, start_time, end_time):
conversion_columns = config.get_final_conversion_columns()
old_conversions_table = conversions_table(start_time)
new_conversions_table = conversions_table(end_time)
pending_table = pending_conversions_table(start_time, end_time)
# We have to be careful with this query, since it is by far the most
# likely to run out of resources. One simple way to merge the tables
# would be to union them, group by experiment and bingo_id, and take
# the sum over every conversion, but this ends up using more space than
# necessary, since it effectively needs to build a giant hash table
# with all bingo_ids in both tables. Instead, we take advantage of the
# fact that the pending conversions table is much smaller than the old
# conversions table, which means that it's much less memory-intensive
# to make a hash table of pending conversions and do lookups in that.
# We accomplish this by using a LEFT OUTER JOIN to find which rows in
# the old conversions table are being updated and which ones are
# staying the same and only doing the GROUP BY logic for the ones that
# are being updated.
conversions_table_columns = [
field['name'] for field in get_conversions_schema()['fields']]
columns_from_join = ', '.join('t1.%(col)s AS %(col)s' % {'col': col}
for col in conversions_table_columns)
untouched_old_rows_template = """
SELECT %(columns_from_join)s
FROM %(old_conversions)s t1
LEFT OUTER JOIN EACH %(pending_conversions)s t2
ON t1.experiment = t2.experiment
AND t1.bingo_id = t2.bingo_id
AND t1.alternative = t2.alternative
WHERE t2.bingo_id IS NULL
"""
touched_old_rows_template = """
SELECT %(columns_from_join)s
FROM %(old_conversions)s t1
JOIN EACH %(pending_conversions)s t2
ON t1.experiment = t2.experiment
AND t1.bingo_id = t2.bingo_id
AND t1.alternative = t2.alternative
"""
updated_and_new_rows_template = """
SELECT experiment, bingo_id, alternative,
MAX(last_updated_time) AS last_updated_time,
%(conversion_selects)s
FROM (""" + touched_old_rows_template + """),
%(pending_conversions)s
GROUP EACH BY experiment, alternative, bingo_id
"""
# There are three cases here:
# 1.) A row is in old_conversions but not pending_conversions.
# 2.) A row is in both tables.
# 3.) A row is only in pending_conversions.
# The first case is handled by untouched_old_rows_template and the
# other two cases are handled by updated_and_new_rows_template.
query = ("""
SELECT experiment, bingo_id, alternative, last_updated_time,
%(convert_null_selects)s
FROM
(""" + untouched_old_rows_template + """),
(""" + updated_and_new_rows_template + """)
""") % {
'old_conversions': dataset + '.' + old_conversions_table,
'pending_conversions': dataset + '.' + pending_table,
'columns_from_join': columns_from_join,
'conversion_selects': ','.join(
['SUM(IFNULL(conv_%(column)s, 0)) AS conv_%(column)s' % {
'column': conv_col
} for conv_col in conversion_columns]),
'convert_null_selects': ', '.join(
'IF(%(conv)s = 0, null, %(conv)s) as %(conv)s' % {
'conv': 'conv_' + conv_col
}
for conv_col in conversion_columns)
}
yield bq_pipelines.QueryToTablePipeline(
query, dataset, new_conversions_table, result_table_ttl_days=7)
class ComputeParticipantCountSummaryPipeline(pipeline.Pipeline):
"""Computes the latest participant counts for each experiment.
The resulting counts are done on an per-alternative basis and are appended
to the end of the participant_snapshots table.
"""
output_names = ['cost']
def run(self, dataset, end_time):
new_participants_table = participants_table(end_time)
query = """
SELECT
experiment,
alternative,
COUNT(*) AS num_participants,
%(end_time)s AS snapshot_time
FROM %(new_participants_table)s
GROUP EACH BY experiment, alternative
""" % {
'end_time': end_time,
'new_participants_table':
dataset + '.' + new_participants_table
}
yield bq_pipelines.QueryToTablePipeline(
query, dataset, 'participant_snapshots', append_to_table=True)
class ComputeConversionCountSummaryPipeline(pipeline.Pipeline):
"""Computes the latest conversion counts for each experiment.
For each conversion, we separately compute the conversion as a count
conversion and as a binary conversion, and we output a column for each. For
example, the conversion foo (which has column conv_foo in the conversions
table) would have count_foo and binary_foo as columns in the conversion
snapshots table.
The results are appended to the end of the conversion_snapshots table.
"""
output_names = ['cost']
def run(self, dataset, end_time):
conversion_columns = config.get_final_conversion_columns()
# For each conversion, we output a column for the total number of
# conversions across all users and a column for the number of users
# with a nonzero conversion count.
conversion_selects = ','.join(
['SUM(conv_%(column)s) AS count_%(column)s, '
'SUM(conv_%(column)s > 0) AS binary_%(column)s' % {
'column': conv_col
} for conv_col in conversion_columns])
new_conversions_table = conversions_table(end_time)
query = """
SELECT
experiment,
alternative,
%(end_time)s AS snapshot_time,
%(conversion_selects)s
FROM
%(new_conversions_table)s
GROUP EACH BY experiment, alternative
""" % {
'new_conversions_table': dataset + '.' + new_conversions_table,
'end_time': end_time,
'conversion_selects': conversion_selects,
}
yield bq_pipelines.QueryToTablePipeline(
query, dataset, 'conversion_snapshots', append_to_table=True)
class PublishTablesPipeline(pipeline.Pipeline):
"""Copy the result tables to a common dataset.
The dataset containing all timestamped tables can get kind of crowded, so
this makes it convenient for people to look at the most up-to-date data.
-We copy the latest participants and conversions tables as
source_participants and source_conversions.
-We create (or update) a view for each experiment that joins and filters
the source tables into meaningful data.
-We join the participation and conversion snapshots tables into a single
table called historical_experiment_totals. We also remove any duplicates
(which can happen in rare failure cases).
-We extract the latest historical totals into a table called
experiment_totals.
These table names are convenient because the most useful tables are first
when ordered alphabetically.
"""
output_names = ['cost']
def run(self, dataset, publish_dataset, end_time):
copy_specs = []
def add_copy_spec(src_table, dest_table):
copy_specs.append({
'src_dataset': dataset,
'src_table': src_table,
'dest_dataset': publish_dataset,
'dest_table': dest_table
})
add_copy_spec(participants_table(end_time), 'source_participants')
add_copy_spec(conversions_table(end_time), 'source_conversions')
with pipeline.InOrder():
yield bq_pipelines.CopyTableBatchPipeline(copy_specs)
views_created = yield CreatePublishViewsPipeline(publish_dataset)
conversion_columns = config.get_final_conversion_columns()
conversion_selects = ','.join(
['MAX(conv.count_%(column)s) AS count_%(column)s, '
'MAX(conv.binary_%(column)s) AS binary_%(column)s' % {
'column': conv_col
} for conv_col in conversion_columns])
extract_historical_totals_query = """
SELECT
part.experiment AS experiment,
part.alternative AS alternative,
part.snapshot_time AS snapshot_time,
MAX(part.num_participants) AS num_participants,
%(conversion_selects)s
FROM
%(conversion_snapshots_table)s conv
JOIN EACH
%(participant_snapshots_table)s part
ON part.experiment = conv.experiment
AND part.alternative = conv.alternative
AND part.snapshot_time = conv.snapshot_time
GROUP EACH BY experiment, alternative, snapshot_time
""" % {
'participant_snapshots_table': dataset + '.participant_snapshots',
'conversion_snapshots_table': dataset + '.conversion_snapshots',
'conversion_selects': conversion_selects
}
extract_latest_totals_query = """
SELECT * FROM %(historical_experiment_totals)s
WHERE snapshot_time = %(end_time)s
""" % {
'historical_experiment_totals':
publish_dataset + '.historical_experiment_totals',
'end_time': end_time
}
with pipeline.InOrder():
query1_result = yield bq_pipelines.QueryToTablePipeline(
extract_historical_totals_query, publish_dataset,
'historical_experiment_totals')
query2_result = yield bq_pipelines.QueryToTablePipeline(
extract_latest_totals_query, publish_dataset,
'experiment_totals')
total_cost = yield common.Sum(query1_result.cost, query2_result.cost)
# Explicitly make ReturnOutputs block on the CopyTable pipeline above.
# Since ReturnOutputs is the last pipeline yielded, the pipeline API
# will view its completion as the completion of the whole publish
# pipeline, so we need to make sure it doesn't run until all of the
# intermediate steps are done.
with pipeline.After(views_created):
yield bq_pipelines.ReturnOutputs("Success", cost=total_cost)
class CreatePublishViewsPipeline(pipeline.Pipeline):
"""Create views in the publish dataset pointing into the raw data tables.
These views make it easy for people to dig into experiment data manually
without having to worry about the slightly-weird way that BigBingo stores
its data.
The view does three things that humans would otherwise have to do:
-It joins the participants and conversions tables, which is necessary since
the conversions table doesn't have rows for bingo_ids that have never
triggered any conversions.
-It filters out the rows to only include the experiment in question.
-It converts all null conversion values to 0, since nulls have weird
behavior (e.g. they're ignored when taking averages).
Note that this only actually changes anything if there was a schema change,
but we write every view every time since it's easy and making views is
cheap.
"""
def run(self, publish_dataset):
experiments = config.get_active_experiments()
# If the experiment was just archived, it will still have a view here.
# That view might be out of date if the schema has changed, so we
# should updated it, too.
publish_tables = set(bq_connection.get_all_tables_in_dataset(
publish_dataset))
experiments.extend(
experiment for experiment in config.get_archived_experiments()
if raw_table(experiment.id) in publish_tables)
for experiment in experiments:
view_name = raw_table(experiment.id)
conversion_column_names = [
'conv_' + conv_col
for conv_col in config.get_final_conversion_columns()]
# Unlike all other queries, this one is visible in the BigQuery UI,
# so we make an attempt at making it look nice.
query = """\
SELECT parts.bingo_id AS bingo_id,
parts.alternative AS alternative,
parts.participation_time AS participation_time,
convs.last_updated_time AS last_conversion_updated_time,
%(conv_selects)s
FROM
(SELECT * FROM %(dataset)s.source_participants
WHERE experiment = "%(exp_name)s") parts
LEFT OUTER JOIN EACH
(SELECT * FROM %(dataset)s.source_conversions
WHERE experiment = "%(exp_name)s") convs
ON parts.alternative = convs.alternative
AND parts.bingo_id = convs.bingo_id""" % {
'exp_name': experiment.logged_name,
'dataset': publish_dataset,
'conv_selects': ',\n '.join(
'IFNULL(%(column)s, 0) AS %(column)s' % {'column': column}
for column in conversion_column_names
)
}
bq_connection.create_or_update_view(publish_dataset, view_name,
query)
class ArchiveExperimentsPipeline(pipeline.Pipeline):
"""Archive the given experiments if they have not already been.
This pipeline moves the summary and raw data for the experiment into the
archive dataset, and removes it from the live datasets.
If an experiment has already been archived, it
ignores the experiment, so this pipeline should be idempotent; if no
experiments have been archived since the last time it was run, it will do
nothing.
-We copy the view raw_<experiment>_data to a real table in the archive.
-We copy all snapshots from historical_experiment_totals to
historical_<experiment>_totals in the archive,
-We remove all data from the experiment from:
-the view raw_<experiment>_data (removing the view since it will be
empty)
-participant_snapshots and conversion_snapshots
-participants_<timestamp> and conversions_<timestamp>
"""
output_names = ['cost', 'num_archived']
def run(self, dataset, publish_dataset, archive_dataset, timestamp,
experiment_ids):
experiments = [config.get_possibly_archived_experiment(exp_id)
for exp_id in experiment_ids]
archive_tables = set(bq_connection.get_all_tables_in_dataset(
archive_dataset))
publish_tables = set(bq_connection.get_all_tables_in_dataset(
publish_dataset))
# Since the totals table is created after the raw table, we can
# assume that the totals schema exists if and only if the
# experiment has been successfully archived. (Here
# successfully does not necessarily include deleting the data,
# but if that doesn't happen we don't worry about it.) We also want to
# check that the view in the publish dataset exists; if not, we won't
# be able to archive the experiment. (If the view exists but the
# experiment has no data in the other tables, everything will work as
# expected and make empty tables.) This should really never happen
# unless no summarize jobs run between the deploy where an experiment
# is created and the deploy where it is archived.
unarchived_experiments = [
experiment for experiment in experiments
if totals_table(experiment.id) not in archive_tables
and raw_table(experiment.id) in publish_tables]
if not unarchived_experiments:
yield bq_pipelines.ReturnOutputs("Success", num_archived=0, cost=0)
else:
archive_futures = []
for experiment in unarchived_experiments:
archive_result = yield ArchiveSingleExperimentPipeline(
publish_dataset, archive_dataset, experiment.id,
experiment.logged_name)
archive_futures.append(archive_result)
with pipeline.After(*archive_futures):
# To save on queries, we remove all the data from the main
# dataset at once. This must happen after the data we want
# gets archived, of course. Needless to say, we want to be
# extra-careful with this part.
experiment_ids = [exp.id for exp in unarchived_experiments]
remove_result = yield RemoveExperimentsPipeline(
dataset, publish_dataset, timestamp, experiment_ids)
total_cost = yield common.Sum(
remove_result.cost, *[p.cost for p in archive_futures])
yield bq_pipelines.ReturnOutputs(
"Success", num_archived=len(archive_futures), cost=total_cost)
class ArchiveSingleExperimentPipeline(pipeline.Pipeline):
"""Copies the tables to archive a single experiment.
This pipeline does not remove anything from the main dataset, so it should
be idempotent. It will be called from ArchiveExperimentsPipeline, which
gives more detail as to exactly which tables get copied.
"""
output_names = ['cost']
def run(self, publish_dataset, archive_dataset, experiment_id,
experiment_name):
with pipeline.InOrder():
# Do this in order so that the existence of the totals table
# guarantees that everything worked.
dump_query = """
SELECT *
FROM %(publish_dataset)s.%(raw_table)s
""" % {
'publish_dataset': publish_dataset,
'raw_table': raw_table(experiment_id),
}
raw_dump = yield bq_pipelines.QueryToTablePipeline(
dump_query, archive_dataset, raw_table(experiment_id))
totals_query = """
SELECT *
FROM %(publish_dataset)s.historical_experiment_totals
WHERE experiment = "%(experiment)s"
""" % {
'publish_dataset': publish_dataset,
'experiment': experiment_name
}
totals_dump = yield bq_pipelines.QueryToTablePipeline(
totals_query, archive_dataset, totals_table(experiment_id))
total_cost = yield common.Sum(raw_dump.cost, totals_dump.cost)
yield bq_pipelines.ReturnOutputs("Success", cost=total_cost)
class RemoveExperimentsPipeline(pipeline.Pipeline):
"""Removes the given experiments from the main dataset.
In particular, removes the views publish_dataset.raw_<experiment>_data, and
removes all of the experiments from participant_snapshots,
conversion_snapshots, participants_<timestamp>, and
conversions_<timestamp>. They may still persist in the publish dataset
until the next summarize task run, but that's fine.
Timestamp should be a Unix timestamp in seconds.
Needless to say, be careful with this; it deletes data, without directly
checking if that data has been archived elsewhere.
TODO(benkraft): There is a potential race condition here: if this is
running at the same time as a summarize task, the summarize task might be
generating the next participants_<timestamp> and conversions_<timestamp>
while we're busy removing this data from the old one. This would cause the
data to not get deleted from the main dataset. This isn't a big problem,
but we might want to run something to garbage-collect these occasionally.
Another potential race condition could happen if the snapshots tables are
updated by the summarize task while this task is running. This could cause
the newly summarized data to get clobbered by the archiver, since this
pipeline is not atomic.
"""
output_names = ['cost']
def run(self, dataset, publish_dataset, timestamp, experiment_ids):
views_to_delete = [raw_table(e) for e in experiment_ids]
experiment_names = [
config.get_possibly_archived_experiment(e).logged_name
for e in experiment_ids]
yield bq_pipelines.EnsureTablesDeletedPipeline(
publish_dataset, views_to_delete)
delete_part_snapshots = yield RemoveExperimentsFromTablePipeline(
dataset, 'participant_snapshots', experiment_names)
delete_conv_snapshots = yield RemoveExperimentsFromTablePipeline(
dataset, 'conversion_snapshots', experiment_names)
delete_part_timestamp = yield RemoveExperimentsFromTablePipeline(
dataset, participants_table(timestamp), experiment_names,
result_table_ttl_days=7)
delete_conv_timestamp = yield RemoveExperimentsFromTablePipeline(
dataset, conversions_table(timestamp), experiment_names,
result_table_ttl_days=7)
total_cost = yield common.Sum(
delete_part_snapshots.cost, delete_conv_snapshots.cost,
delete_part_timestamp.cost, delete_conv_timestamp.cost)
yield bq_pipelines.ReturnOutputs("Success", cost=total_cost)
class RemoveExperimentsFromTablePipeline(pipeline.Pipeline):
"""Removes some experiments from a table.
Queries the table for all except the given experiments, saving the result
in a temporary table, and then clobbering the original table with that
temporary table. See the warning about potential race conditions in
RemoveExperimentsPipeline.
Since this operation overwrites the table, the expiration time may need to
be reset, in which case result_table_ttl_days should be specified.
"""
output_names = ['cost']
def run(self, dataset, table_name, experiment_names,
result_table_ttl_days=None):
tmp_table_name = table_name + '_tmp'
query = """
SELECT *
FROM %(dataset)s.%(table_name)s
WHERE experiment NOT IN("%(experiments)s")
""" % {
'dataset': dataset,
'table_name': table_name,
'experiments': '", "'.join(experiment_names)
}
with pipeline.InOrder():
clone = yield bq_pipelines.QueryToTablePipeline(query, dataset,
tmp_table_name)
yield bq_pipelines.CopyTablePipeline(dataset, tmp_table_name,
dataset, table_name,
result_table_ttl_days=result_table_ttl_days)
yield bq_pipelines.EnsureTablesDeletedPipeline(dataset,
[tmp_table_name])
yield bq_pipelines.ReturnOutputs("Success", cost=clone.cost)
@StevenACoffman
Copy link

StevenACoffman commented Aug 1, 2016

Super neato. Thanks! I found my way here from your blog post. Any updates on this code since 2014, or better yet plans to turn this into a reusable open source project like GAEBingo was?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment