Instantly share code, notes, and snippets.

What would you like to do?
Serverless Distributed Decision Forests with AWS Lambda
# build the environment ... this will take awhile
docker run -v $(pwd):/outputs -it amazonlinux:2016.09 /bin/bash /outputs/
# should end with something like "venv compressed size 51M"
# unpackage that environment
unzip -d ve_package/
"Version": "2012-10-17",
"Statement": [
"Effect": "Allow",
"Action": [
"Resource": "arn:aws:logs:*:*:*"
"Effect": "Allow",
"Action": [
"Resource": [
#!/usr/bin/env python
# -*- coding: utf-8 -*-
lamlearn -- using AWS lambda to do distributed Decision Forest Learning
License: BSD
Author: Josh Bloom (
GE Digital, 2017. All rights reserved.
import time
from StringIO import StringIO
import json
from multiprocessing import Process, TimeoutError
import numpy as np
from sklearn.externals import joblib
import boto3
# this is the topic that triggers lambda ... we could invoke
# lambda directly but this allows others
# listeners to the SNS as well
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:YOURSNS:sklearn_test"
SEED = 1234
S3_FOLDER = "lambda_learn"
sns = boto3.client('sns')
def trigger_sns(seed=1234,
"""fires off an AWS SNS message, triggering a build of a grove"""
m = {"seed": seed, "build_id": build_id, "n_estimators": nest,
"grove_id": grove_id}
response = sns.publish(TopicArn=SNS_TOPIC_ARN, Message=json.dumps(m))
def combine_rfs(rf_a, rf_b):
""" merges two sklearn estimators """
rf_a.estimators_ += rf_b.estimators_
rf_a.n_estimators = len(rf_a.estimators_)
return rf_a
def assemble_clf(model_name):
""" combine the models build by all the groves """
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(S3_BUCKET_NAME)
print "Getting all the grove models from s3..."
groves = [joblib.load(StringIO(x.get()['Body'].read()))
for x in my_bucket.objects.filter(
Prefix=S3_FOLDER + "/models/%s" % model_name)]
print "Merging grove models..."
rf_combined = reduce(combine_rfs, groves)
return rf_combined
def learn(model_name="hack1", ntrees=30, grove_size=10, maxtime=100,
pred=False, do_parallel=True):
""" fire off many grove learns via AWS SNS messages """
nmess = int(ntrees / grove_size)
print "n_groves: %i" % nmess
if not do_parallel:
# launch synchronously
start = time.time()
for i in range(nmess):
print "triggering grove %i" % i
trigger_sns(seed=np.random.randint(0, 1e6, 1)[0],
# launch asynchronously
processes = []
for i in range(nmess):
kws = {"seed": np.random.randint(0, 1e6, 1)[0],
"nest": grove_size,
"build_id": model_name,
"grove_id": i}
print kws
processes.append(Process(target=trigger_sns, kwargs=kws))
print "triggering %i groves asynchronously" % nmess
start = time.time()
# Run
for p in processes:
# wait until all messages are fired
for p in processes:
start_look = time.time()
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(S3_BUCKET_NAME)
finished = False
print "Waiting for all the λ's to complete"
while (time.time() < start + maxtime) and not finished:
if len([x for x in my_bucket.objects.filter(
Prefix=S3_FOLDER + "/models/%s" % model_name)]) >= nmess:
finished = True
tot_time = time.time() - start
tot_start_look = time.time() - start_look
if finished:
print "🤑 Completed in %f sec ... found all %i groves" % \
(tot_time, nmess)
print "🤑 time since last λ fired until complete: %f sec" % \
print "The joblib save files for each grove are in: s3://" + \
S3_BUCKET_NAME + "/" + \
S3_FOLDER + "/models/%s" % model_name
if pred:
print "Prediction..."
clf = assemble_clf(model_name)
print clf
from sklearn.datasets import load_digits
digits = load_digits()
n_samples = len(digits.images)
X_digits =
y_digits =
print "Score: %f" % clf.score(X_digits[1000:], y_digits[1000:])
print "🙁 Not completed ... did not find all %i groves" % nmess
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("name", help="model name (e.g. smelly-owl-1)",
parser.add_argument("-g", "--grove", help="grove size [default=10]",
type=int, default=10)
parser.add_argument("-t", "--trees", help="n trees total [default=30]",
type=int, default=30)
parser.add_argument("-p", "--pred", help="predict",
default=False, action="store_true")
parser.add_argument("-s", "--s", help="run synchronously",
default=False, action="store_true")
args = parser.parse_args()
learn(, ntrees=args.trees, grove_size=args.grove,
pred=args.pred, do_parallel=not args.s)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
""" main - builds decision forest models triggered by SNS messages
License: BSD
Author: Josh Bloom (
GE Digital, 2017. All rights reserved.
save as ve_package/
but you'll want to test this either inside the amazonlinux
docker container or in your local environment (and not
inside the ve_package/ directory)
import os
import ctypes
import json
import traceback
import boto3
import sklearn
import numpy as np
from sklearn import datasets
from sklearn.externals import joblib
S3_FOLDER = "lambda_learn"
def _save_model_to_s3(clf, build_id=1, grove_id=2):
# Saving to s3 directly doesn't work, so need local save and copy:
bucketname = S3_BUCKET_NAME
# only /tmp is writeable in Lambda
local_file = "/tmp/build_%s_grove_%s.sav" % (build_id, grove_id)
s3_file = "%s/models/%s/grove_%s.sav" % (
S3_FOLDER, build_id, grove_id)
print "Saving model to local file: %s" % local_file
joblib.dump(clf, local_file)
# Copy file to s3:
s3 = boto3.resource('s3')
print "Uploading %s --> s3://%s/%s" % (
local_file, bucketname, s3_file)
s3.meta.client.upload_file(local_file, bucketname, s3_file)
# Return s3 file:
return {(build_id, grove_id): s3_file}
def load_data():
# Load small MNIST ... TOY FOR NOW
# you could pull from disk or HBase here
# eg. "s3://your-bucket/lambda_learn/feats_train.csv"
from sklearn.datasets import load_digits
digits = load_digits()
n_samples = len(digits.images)
data = digits.images.reshape((n_samples, -1))
return (data,
def learn_model(train_X, train_y, n_estimators=10):
# Create and fit a RF Classifier
from sklearn.ensemble import RandomForestClassifier
clf = RandomForestClassifier(n_estimators=n_estimators), train_y)
return clf
def handler(event, context):
# print("Received event: " + json.dumps(event, indent=2))
message = event['Records'][0]['Sns']['Message']
timestamp = event['Records'][0]['Sns']['Timestamp']
sig = event['Records'][0]['Sns']['Signature']
print("message: " + message)
print("timestamp: " + timestamp)
print("sig: " + sig)
mm = json.loads(message)
# {"seed": 1234, "model": "m1"}
n_estimators = mm[u'n_estimators']
build_id = mm[u'build_id']
grove_id = mm[u'grove_id']
seed = mm[u'seed']
print traceback.print_exc()
n_estimators = 10
build_id = "test_m1"
grove_id = 2
seed = 10
print {"nest": n_estimators,
"build_id": build_id,
"grove_id": grove_id,
"seed": seed}
train_X, train_y = load_data()
clf = learn_model(train_X, train_y, n_estimators=n_estimators)
rez = _save_model_to_s3(clf, build_id=build_id, grove_id=grove_id)
print rez
print traceback.print_exc()
return {"rez": repr(rez),
"nest": n_estimators,
"build_id": build_id,
"grove_id": grove_id, "seed": seed}
def test():
print handler(1, 1)
mkdir SDDF; cd SDDF
docker pull amazonlinux:2016.09
git clone
cd sklearn-build-lambda
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment