Skip to content

Instantly share code, notes, and snippets.

@MattFaus
MattFaus / SortedGcsCsvShardFileMergeReader.py
Last active February 23, 2022 11:58
Merge-reads several sorted .csv files stored on Google Cloud Storage.
class SortedGcsCsvShardFileMergeReader(object):
"""Merges several sorted .csv files stored on GCS.
This class is both an iterator and a context manager.
Let's say there are 2 .csv files stored on GCS, with contents like:
/bucket/file_1.csv:
[0, "Matt"],
[0, "Sam"],
@MattFaus
MattFaus / ContentRevisionsInputReader.py
Last active August 21, 2021 08:11
A custom appengine-mapreduce input_reader and output_writer. The ContentRevisionsInputReader iterates over a complex data structure we use to keep track of modifications to each of the content nodes within the Khan Academy content management system. This is a utility class that is used to intelligently rollout schema changes to these nodes. The …
class ContentRevisionsInputReader(
third_party.mapreduce.input_readers.InputReader):
"""Mapper that loads the latest commit's revisions from the datastore."""
def __init__(self, kinds, start, step, current=0):
# List of content kinds to filter on
self._kinds = kinds
# Start index in snapshot array
self._start = start
# Stride through the snapshot array
self._step = step
@MattFaus
MattFaus / appengine_config.py
Last active August 3, 2018 12:28
All of the code necessary to implement and test protobuf projection in a Google Appengine web application.
import db_util
db_util.enable_db_protobuf_projection()
db_util.enable_ndb_protobuf_projection()
@MattFaus
MattFaus / bq_property_transform.py
Created March 22, 2014 00:28
The mapreduce job we use to transform datastore backups into JSON files that we then load into BigQuery.
mr_pipeline = mapreduce_pipeline.MapperPipeline(
'bq_property_transform',
'extbackup.bq_property_transform.property_transform_mapper',
'third_party.mapreduce.input_readers.RecordsReader',
'third_party.mapreduce.output_writers.FileOutputWriter',
params={
'input_reader': {
'files': list_backup_files(kind, backup_date),
},
'output_writer': {
@MattFaus
MattFaus / mixpanel_funnels.py
Created January 2, 2014 23:03
A function to programmatically build and query funnel data via the MixPanel API.
class MixPanel(object):
def get_page_view_funnel(self, content_urls):
# Build up the events array. Each "event" is a step in the funnel
events = []
for cu in content_urls:
events.append({
"event": "Page View",
"selector": 'properties["Page View Page"] == "%s"' % (cu,),
})
@MattFaus
MattFaus / youtube_query_pipeline.py
Created December 17, 2013 22:36
A high level overview of the pipeline job Khan Academy uses to download analytics data about each of its videos to perform more complex cross-video analysis.
class YouTubeQueryMasterPipeline(pipeline.Pipeline):
def run(self, user_ids):
"""Launches worker pipeline jobs to query the YouTube API.
Arguments:
user_ids: The user_ids of stored OAuth2 credentials.
"""
try:
for user_id in user_ids:
@MattFaus
MattFaus / serverside_appcfg.py
Created November 2, 2013 04:22
A way to programmatically list the versions deployed to a Google AppEngine application by impersonating an RPC from appcfg.py to Google's management servers, using an oauth2 token from an administrator of the application.
# Here's how I got this to work:
# 1. Download this client library into your GAE project:
# https://developers.google.com/api-client-library/python/start/installation#appengine
# https://code.google.com/p/google-api-python-client/downloads/detail?name=google-api-python-client-gae-1.2.zip&can=2&q=
# 2. Copy this file from the GAE SDK installed on your development machine
# google/appengine/tools/appengine_rpc_httplib2.py
# 3. Modify the import statements as necessary
# 4. Create a secrets.py file that defines a appcfg_refresh_token property
# 5. Obtain the refresh token by
# Calling appcfg.py list_versions . --oauth2, this will open a browser so you can login with your Google Account
@MattFaus
MattFaus / datetime_mapreduce.py
Created October 7, 2013 18:22
A technique for storing a JSON-serializable datetime as a computed property so that mapreduce can filter against it.
class UserData(db.Model):
birthdate = db.DateProperty(indexed=False)
def compute_birthdate_str(self):
if self.birthdate:
return self.birthdate.isoformat()
return self.birthdate
# You will probably need to add this to index.yaml
@MattFaus
MattFaus / combiner_spec_test.py
Created October 1, 2013 00:41
A quick experiment with the combiner_spec parameter in the appengine MapreducePipeline()
import random
import logging
def map(data):
try:
# Generate a random key from 1..10
key = random.randint(1, 10)
logging.info("%s %s", key, data)
yield (key, data)
@MattFaus
MattFaus / SimpleGaeMapReduce.py
Created July 24, 2013 02:14
An overview of the generic helper classes that could make the pipeline library much easier and more powerful to use while running on pristine production data.
class DataWriter(object):
"""Used by the QueryDrainerPipeline to coalesce intermediate results
into their final resting place.
"""
def write_metadata(self, metadata):
raise NotImplemented()
def write_result(self, data):
"""Writes all data."""