Created
June 22, 2017 13:51
-
-
Save profjsb/ca4baa728f9ffe11f0bd6a8feaad279b to your computer and use it in GitHub Desktop.
Serverless Distributed Decision Forests with AWS Lambda
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# build the environment ... this will take awhile | |
docker run -v $(pwd):/outputs -it amazonlinux:2016.09 /bin/bash /outputs/build.sh | |
# should end with something like "venv compressed size 51M" | |
# unpackage that environment | |
unzip venv.zip -d ve_package/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"Version": "2012-10-17", | |
"Statement": [ | |
{ | |
"Effect": "Allow", | |
"Action": [ | |
"logs:CreateLogGroup", | |
"logs:CreateLogStream", | |
"logs:PutLogEvents" | |
], | |
"Resource": "arn:aws:logs:*:*:*" | |
}, | |
{ | |
"Effect": "Allow", | |
"Action": [ | |
"s3:GetObject", | |
"s3:PutObject", | |
"s3:ListBucket" | |
], | |
"Resource": [ | |
"arn:aws:s3:::YOUR-BUCKET-NAME/*" | |
] | |
} | |
] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" | |
lamlearn -- using AWS lambda to do distributed Decision Forest Learning | |
License: BSD | |
Author: Josh Bloom (josh.bloom@ge.com) | |
GE Digital, 2017. All rights reserved. | |
""" | |
import time | |
from StringIO import StringIO | |
import json | |
from multiprocessing import Process, TimeoutError | |
import numpy as np | |
from sklearn.externals import joblib | |
import boto3 | |
# this is the topic that triggers lambda ... we could invoke | |
# lambda directly but this allows others | |
# listeners to the SNS as well | |
SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:YOURSNS:sklearn_test" | |
SEED = 1234 | |
S3_BUCKET_NAME = "YOUR-BUCKET-NAME" | |
S3_FOLDER = "lambda_learn" | |
np.random.seed(SEED) | |
sns = boto3.client('sns') | |
def trigger_sns(seed=1234, | |
nest=10, | |
build_id="m6", | |
grove_id=1): | |
"""fires off an AWS SNS message, triggering a build of a grove""" | |
m = {"seed": seed, "build_id": build_id, "n_estimators": nest, | |
"grove_id": grove_id} | |
response = sns.publish(TopicArn=SNS_TOPIC_ARN, Message=json.dumps(m)) | |
def combine_rfs(rf_a, rf_b): | |
""" merges two sklearn estimators """ | |
rf_a.estimators_ += rf_b.estimators_ | |
rf_a.n_estimators = len(rf_a.estimators_) | |
return rf_a | |
def assemble_clf(model_name): | |
""" combine the models build by all the groves """ | |
s3 = boto3.resource('s3') | |
my_bucket = s3.Bucket(S3_BUCKET_NAME) | |
print "Getting all the grove models from s3..." | |
groves = [joblib.load(StringIO(x.get()['Body'].read())) | |
for x in my_bucket.objects.filter( | |
Prefix=S3_FOLDER + "/models/%s" % model_name)] | |
print "Merging grove models..." | |
rf_combined = reduce(combine_rfs, groves) | |
return rf_combined | |
def learn(model_name="hack1", ntrees=30, grove_size=10, maxtime=100, | |
pred=False, do_parallel=True): | |
""" fire off many grove learns via AWS SNS messages """ | |
nmess = int(ntrees / grove_size) | |
print "n_groves: %i" % nmess | |
if not do_parallel: | |
# launch synchronously | |
start = time.time() | |
for i in range(nmess): | |
print "triggering grove %i" % i | |
trigger_sns(seed=np.random.randint(0, 1e6, 1)[0], | |
nest=grove_size, | |
build_id=model_name, | |
grove_id=i) | |
else: | |
# launch asynchronously | |
processes = [] | |
for i in range(nmess): | |
kws = {"seed": np.random.randint(0, 1e6, 1)[0], | |
"nest": grove_size, | |
"build_id": model_name, | |
"grove_id": i} | |
print kws | |
processes.append(Process(target=trigger_sns, kwargs=kws)) | |
print "triggering %i groves asynchronously" % nmess | |
start = time.time() | |
# Run | |
for p in processes: | |
p.start() | |
# wait until all messages are fired | |
for p in processes: | |
p.join() | |
start_look = time.time() | |
s3 = boto3.resource('s3') | |
my_bucket = s3.Bucket(S3_BUCKET_NAME) | |
finished = False | |
print "Waiting for all the λ's to complete" | |
while (time.time() < start + maxtime) and not finished: | |
if len([x for x in my_bucket.objects.filter( | |
Prefix=S3_FOLDER + "/models/%s" % model_name)]) >= nmess: | |
finished = True | |
time.sleep(1) | |
tot_time = time.time() - start | |
tot_start_look = time.time() - start_look | |
if finished: | |
print "🤑 Completed in %f sec ... found all %i groves" % \ | |
(tot_time, nmess) | |
print "🤑 time since last λ fired until complete: %f sec" % \ | |
(tot_start_look) | |
print "The joblib save files for each grove are in: s3://" + \ | |
S3_BUCKET_NAME + "/" + \ | |
S3_FOLDER + "/models/%s" % model_name | |
if pred: | |
print "Prediction..." | |
clf = assemble_clf(model_name) | |
print clf | |
from sklearn.datasets import load_digits | |
digits = load_digits() | |
n_samples = len(digits.images) | |
X_digits = digits.data | |
y_digits = digits.target | |
print "Score: %f" % clf.score(X_digits[1000:], y_digits[1000:]) | |
else: | |
print "🙁 Not completed ... did not find all %i groves" % nmess | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser() | |
parser.add_argument("name", help="model name (e.g. smelly-owl-1)", | |
type=str) | |
parser.add_argument("-g", "--grove", help="grove size [default=10]", | |
type=int, default=10) | |
parser.add_argument("-t", "--trees", help="n trees total [default=30]", | |
type=int, default=30) | |
parser.add_argument("-p", "--pred", help="predict", | |
default=False, action="store_true") | |
parser.add_argument("-s", "--s", help="run synchronously", | |
default=False, action="store_true") | |
args = parser.parse_args() | |
learn(model_name=args.name, ntrees=args.trees, grove_size=args.grove, | |
pred=args.pred, do_parallel=not args.s) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
""" main - builds decision forest models triggered by SNS messages | |
License: BSD | |
Author: Josh Bloom (josh.bloom@ge.com) | |
GE Digital, 2017. All rights reserved. | |
save as ve_package/main.py | |
but you'll want to test this either inside the amazonlinux | |
docker container or in your local environment (and not | |
inside the ve_package/ directory) | |
""" | |
import os | |
import ctypes | |
import json | |
import traceback | |
import boto3 | |
import sklearn | |
import numpy as np | |
from sklearn import datasets | |
from sklearn.externals import joblib | |
S3_BUCKET_NAME = "YOUR-BUCKET-NAME" | |
S3_FOLDER = "lambda_learn" | |
def _save_model_to_s3(clf, build_id=1, grove_id=2): | |
# Saving to s3 directly doesn't work, so need local save and copy: | |
bucketname = S3_BUCKET_NAME | |
# only /tmp is writeable in Lambda | |
local_file = "/tmp/build_%s_grove_%s.sav" % (build_id, grove_id) | |
s3_file = "%s/models/%s/grove_%s.sav" % ( | |
S3_FOLDER, build_id, grove_id) | |
print "Saving model to local file: %s" % local_file | |
joblib.dump(clf, local_file) | |
# Copy file to s3: | |
s3 = boto3.resource('s3') | |
print "Uploading %s --> s3://%s/%s" % ( | |
local_file, bucketname, s3_file) | |
s3.meta.client.upload_file(local_file, bucketname, s3_file) | |
# Return s3 file: | |
return {(build_id, grove_id): s3_file} | |
def load_data(): | |
# Load small MNIST ... TOY FOR NOW | |
# you could pull from disk or HBase here | |
# eg. "s3://your-bucket/lambda_learn/feats_train.csv" | |
from sklearn.datasets import load_digits | |
digits = load_digits() | |
n_samples = len(digits.images) | |
data = digits.images.reshape((n_samples, -1)) | |
return (data, digits.target) | |
def learn_model(train_X, train_y, n_estimators=10): | |
# Create and fit a RF Classifier | |
from sklearn.ensemble import RandomForestClassifier | |
clf = RandomForestClassifier(n_estimators=n_estimators) | |
clf.fit(train_X, train_y) | |
return clf | |
def handler(event, context): | |
try: | |
# print("Received event: " + json.dumps(event, indent=2)) | |
message = event['Records'][0]['Sns']['Message'] | |
timestamp = event['Records'][0]['Sns']['Timestamp'] | |
sig = event['Records'][0]['Sns']['Signature'] | |
print("message: " + message) | |
print("timestamp: " + timestamp) | |
print("sig: " + sig) | |
mm = json.loads(message) | |
# {"seed": 1234, "model": "m1"} | |
n_estimators = mm[u'n_estimators'] | |
build_id = mm[u'build_id'] | |
grove_id = mm[u'grove_id'] | |
seed = mm[u'seed'] | |
except: | |
print traceback.print_exc() | |
n_estimators = 10 | |
build_id = "test_m1" | |
grove_id = 2 | |
seed = 10 | |
print {"nest": n_estimators, | |
"build_id": build_id, | |
"grove_id": grove_id, | |
"seed": seed} | |
try: | |
np.random.seed(seed) | |
train_X, train_y = load_data() | |
clf = learn_model(train_X, train_y, n_estimators=n_estimators) | |
rez = _save_model_to_s3(clf, build_id=build_id, grove_id=grove_id) | |
print rez | |
except: | |
print traceback.print_exc() | |
return {"rez": repr(rez), | |
"nest": n_estimators, | |
"build_id": build_id, | |
"grove_id": grove_id, "seed": seed} | |
def test(): | |
print handler(1, 1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mkdir SDDF; cd SDDF | |
docker pull amazonlinux:2016.09 | |
git clone https://github.com/ryansb/sklearn-build-lambda.git | |
cd sklearn-build-lambda |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment