import logging
import os
import sys
import time
from google.appengine.ext import db
import instance_cache
import user_util
Based on by Juan Pablo Guereca with additional modifications
for thread safety and simplified to reduce time spent in critical areas.
Module which implements a per GAE instance data cache, similar to what
you can achieve with APC in PHP instances.
Each GAE instance caches the global scope, keeping the state of every
variable on the global scope.
An overview of the generic helper classes that could make the pipeline library much easier and more powerful to use while running on pristine production data.
class DataWriter(object):
"""Used by the QueryDrainerPipeline to coalesce intermediate results
into their final resting place.
def write_metadata(self, metadata):
raise NotImplemented()
def write_result(self, data):
"""Writes all data."""
A quick experiment with the combiner_spec parameter in the appengine MapreducePipeline()
import random
import logging
def map(data):
# Generate a random key from 1..10
key = random.randint(1, 10)"%s %s", key, data)
yield (key, data)
A custom appengine-mapreduce input_reader and output_writer. The ContentRevisionsInputReader iterates over a complex data structure we use to keep track of modifications to each of the content nodes within the Khan Academy content management system. This is a utility class that is used to intelligently rollout schema changes to these nodes. The …
class ContentRevisionsInputReader(
"""Mapper that loads the latest commit's revisions from the datastore."""
def __init__(self, kinds, start, step, current=0):
# List of content kinds to filter on
self._kinds = kinds
# Start index in snapshot array
self._start = start
# Stride through the snapshot array
self._step = step
A technique for storing a JSON-serializable datetime as a computed property so that mapreduce can filter against it.
class UserData(db.Model):
birthdate = db.DateProperty(indexed=False)
def compute_birthdate_str(self):
if self.birthdate:
return self.birthdate.isoformat()
return self.birthdate
# You will probably need to add this to index.yaml
A way to programmatically list the versions deployed to a Google AppEngine application by impersonating an RPC from to Google's management servers, using an oauth2 token from an administrator of the application.
# Here's how I got this to work:
# 1. Download this client library into your GAE project:
# 2. Copy this file from the GAE SDK installed on your development machine
# google/appengine/tools/
# 3. Modify the import statements as necessary
# 4. Create a file that defines a appcfg_refresh_token property
# 5. Obtain the refresh token by
# Calling list_versions . --oauth2, this will open a browser so you can login with your Google Account
A high level overview of the pipeline job Khan Academy uses to download analytics data about each of its videos to perform more complex cross-video analysis.
class YouTubeQueryMasterPipeline(pipeline.Pipeline):
def run(self, user_ids):
"""Launches worker pipeline jobs to query the YouTube API.
user_ids: The user_ids of stored OAuth2 credentials.
for user_id in user_ids:
A function to programmatically build and query funnel data via the MixPanel API.
class MixPanel(object):
def get_page_view_funnel(self, content_urls):
# Build up the events array. Each "event" is a step in the funnel
events = []
for cu in content_urls:
"event": "Page View",
"selector": 'properties["Page View Page"] == "%s"' % (cu,),
public function checkBuyOrders($currentBuyPrice) {
$sqlWhere = Array('status' => 'ACTIVE', 'action' => 'BUY', "$currentBuyPrice <= `at_price`");
foreach ($this->context->orders->findAll()->where($sqlWhere) as $order) { //double check the price for each before buying
try {
$userAssociatedWithOrder = $this->context->authenticator->getUser($order->user_id);
if(!empty($order->at_price) && !empty($order->amount) && $userAssociatedWithOrder->coinbase_access_token){ //his Coinbase API tokens are set
$totalBuyPrice = $this->context->coinbase->user($order->user_id)->order($order)->getBuyPrice($order->amount);
if ($totalBuyPrice !== NULL && $totalBuyPrice->subtotal->amount <= $order->at_price * $order->amount) {
$result = $this->context->coinbase->user($order->user_id)->order($order)->buy($order->amount); //Buy the coins
$this->context->orders->findAll()->get($order->order_id)->update(Array('status' => 'EXECUTED')); //Update order status