Skip to content

Instantly share code, notes, and snippets.

@jbcrail
Created February 5, 2017 21:51
Show Gist options
  • Save jbcrail/28ae2092a53946681c56a9dea6563c2d to your computer and use it in GitHub Desktop.
Save jbcrail/28ae2092a53946681c56a9dea6563c2d to your computer and use it in GitHub Desktop.
BayesDB Parallelization Demo
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 1. Automated cluster provisioning using dask-ec2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```\n",
"$ pip install dask-ec2\n",
"$ dask-ec2 up\n",
" --keyname ${AWS_KEY}\n",
" --keypair ${SSL_KEY}\n",
" --type ${EC2_INSTANCE_TYPE}\n",
" --count ${NUM_NODES}\n",
" --volume-size ${DISK_SIZE_GB}\n",
" --no-notebook\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The dask-ec2 command will set aside one of the created nodes as the scheduler for the worker nodes. The following commands assume that two nodes were provisioned (one scheduler, one worker). The installation of bayeslite and its dependencies need to be repeated for all nodes (scheduler and workers). This can be simplified with improvements to bayeslite's setup.py."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```\n",
"$ dask-ec2 ssh 1\n",
"$ DEPS=\"cgpm crosscat bayeslite-apsw bayeslite\"\n",
"$ for DEP in ${DEPS}\n",
"do\n",
" git clone http://github.com/probcomp/${DEP}\n",
" cd ${DEP} && python setup.py build && pip install . && cd ..\n",
"done\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 2. Bootstrap simulated database"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The \"cluster\" command is a custom Python to handle common tasks. Available subcommands are \"upload\" and \"analyze\". See appendix for source code."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```\n",
"$ python bootstrap.py\n",
"$ ./cluster --bucket ${S3_BUCKET} upload ${BAYESDB_INSTANCE}\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### bootstrap.py"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```python\n",
"import os\n",
"import socket\n",
"\n",
"import bayeslite\n",
"\n",
"\n",
"if __name__=='__main__':\n",
" os.environ['BAYESDB_WIZARD_MODE'] = '1'\n",
" with bayeslite.bayesdb_open(filename) as bdb:\n",
" data_multivariate = [\n",
" ('foo', 6, 7, None),\n",
" ('bar', 1, 1, 2),\n",
" ('baz', 100, 100, 200),\n",
" ('quux', 1000, 2000, 3000),\n",
" ('zot', 0, 2, 2),\n",
" ('mumble', 20, 10, 30),\n",
" ('frotz', 4, 13, 17),\n",
" ('gargle', 34, 2, 36),\n",
" ('mumph', 78, 4, 82),\n",
" ('hunf', 90, 1, 91),\n",
" ('blort', 80, 80, 160)\n",
" ]\n",
" bdb.sql_execute('CREATE TABLE t(x TEXT, y NUMERIC, z NUMERIC, w NUMERIC)')\n",
" for row in data_multivariate:\n",
" bdb.sql_execute('INSERT INTO t (x, y, z, w) VALUES (?, ?, ?, ?)', row)\n",
" bdb.execute('''\n",
" CREATE POPULATION t_p FOR t WITH SCHEMA {\n",
" MODEL y, z, w AS NUMERICAL;\n",
" IGNORE x\n",
" }\n",
" ''')\n",
" bdb.execute('CREATE GENERATOR t_g FOR t_p USING crosscat()')\n",
" bdb.execute('INITIALIZE 10 MODELS FOR t_g')\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 3. Run stateless analysis using dask on local cluster"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```\n",
"$ time python dask-local-stateless.py\n",
"Completed: 20 iterations in 0.006460 seconds.\n",
"[(49.42808813754712,), (38.68859819807129,), (68.27989051257737,), (61.778704630672266,), (121.8726137770563,), (91.7205437793169,), (34.89512801651901,), (87.5497744842578,), (27.160744144262782,), (14.359817629540146,)]\n",
"\n",
"real 0m2.578s\n",
"user 0m1.071s\n",
"sys 0m0.600s\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### dask-local-stateless.py"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```python\n",
"import os\n",
"import socket\n",
"\n",
"import bayeslite\n",
"from distributed import Client, LocalCluster\n",
"\n",
"\n",
"def query():\n",
" os.environ['BAYESDB_WIZARD_MODE'] = '1'\n",
" with bayeslite.bayesdb_open() as bdb:\n",
" # Disable multiprocessing engine for Dask\n",
" bdb.metamodels['cgpm'].set_multiprocess(False)\n",
"\n",
" data_multivariate = [\n",
" ('foo', 6, 7, None),\n",
" ('bar', 1, 1, 2),\n",
" ('baz', 100, 100, 200),\n",
" ('quux', 1000, 2000, 3000),\n",
" ('zot', 0, 2, 2),\n",
" ('mumble', 20, 10, 30),\n",
" ('frotz', 4, 13, 17),\n",
" ('gargle', 34, 2, 36),\n",
" ('mumph', 78, 4, 82),\n",
" ('hunf', 90, 1, 91),\n",
" ('blort', 80, 80, 160)\n",
" ]\n",
" bdb.sql_execute(\n",
" 'CREATE TABLE t(x TEXT, y NUMERIC, z NUMERIC, w NUMERIC)')\n",
" for row in data_multivariate:\n",
" bdb.sql_execute(\n",
" 'INSERT INTO t (x, y, z, w) VALUES (?, ?, ?, ?)', row)\n",
" bdb.execute('''\n",
" CREATE POPULATION t_p FOR t WITH SCHEMA {\n",
" MODEL y, z, w AS NUMERICAL;\n",
" IGNORE x\n",
" }\n",
" ''')\n",
" bdb.execute('CREATE GENERATOR t_g FOR t_p')\n",
" bdb.execute('INITIALIZE 1 MODEL FOR t_g')\n",
" bdb.execute('ANALYZE t_g FOR 20 ITERATION WAIT ( OPTIMIZED )')\n",
"\n",
" result = bdb.execute('''\n",
" SIMULATE y FROM t_p\n",
" GIVEN oid = 1, w = 3 LIMIT 10;\n",
" ''').fetchall()\n",
"\n",
" return result\n",
"\n",
"\n",
"if __name__=='__main__':\n",
" cluster = LocalCluster()\n",
" client = Client(cluster)\n",
" task = client.submit(query)\n",
" print(task.result())\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 4. Run stateless analysis using dask on EC2 cluster"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```\n",
"$ time python dask-ec2-stateless.py\n",
"[(49.42808813754712,), (38.68859819807129,), (68.27989051257737,), (61.778704630672266,), (121.8726137770563,), (91.7205437793169,), (34.89512801651901,), (87.5497744842578,), (27.160744144262782,), (14.359817629540146,)]\n",
"\n",
"real 0m1.179s\n",
"user 0m0.506s\n",
"sys 0m0.154s\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### dask-ec2-stateless.py"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```python\n",
"import os\n",
"import socket\n",
"\n",
"import bayeslite\n",
"from distributed import Client\n",
"\n",
"\n",
"def query():\n",
" os.environ['BAYESDB_WIZARD_MODE'] = '1'\n",
" with bayeslite.bayesdb_open() as bdb:\n",
" # Disable multiprocessing engine for Dask\n",
" bdb.metamodels['cgpm'].set_multiprocess(False)\n",
"\n",
" data_multivariate = [\n",
" ('foo', 6, 7, None),\n",
" ('bar', 1, 1, 2),\n",
" ('baz', 100, 100, 200),\n",
" ('quux', 1000, 2000, 3000),\n",
" ('zot', 0, 2, 2),\n",
" ('mumble', 20, 10, 30),\n",
" ('frotz', 4, 13, 17),\n",
" ('gargle', 34, 2, 36),\n",
" ('mumph', 78, 4, 82),\n",
" ('hunf', 90, 1, 91),\n",
" ('blort', 80, 80, 160)\n",
" ]\n",
" bdb.sql_execute(\n",
" 'CREATE TABLE t(x TEXT, y NUMERIC, z NUMERIC, w NUMERIC)')\n",
" for row in data_multivariate:\n",
" bdb.sql_execute(\n",
" 'INSERT INTO t (x, y, z, w) VALUES (?, ?, ?, ?)', row)\n",
" bdb.execute('''\n",
" CREATE POPULATION t_p FOR t WITH SCHEMA {\n",
" MODEL y, z, w AS NUMERICAL;\n",
" IGNORE x\n",
" }\n",
" ''')\n",
" bdb.execute('CREATE GENERATOR t_g FOR t_p')\n",
" bdb.execute('INITIALIZE 1 MODEL FOR t_g')\n",
" bdb.execute('ANALYZE t_g FOR 20 ITERATION WAIT ( OPTIMIZED )')\n",
"\n",
" result = bdb.execute('''\n",
" SIMULATE y FROM t_p\n",
" GIVEN oid = 1, w = 3 LIMIT 10;\n",
" ''').fetchall()\n",
"\n",
" return result\n",
"\n",
"\n",
"if __name__=='__main__':\n",
" address = \"{}:{}\".format(host, 8786)\n",
" client = Client(address)\n",
" task = client.submit(query)\n",
" print(task.result())\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### 5. Run stateful analysis using dask on EC2 cluster"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The following command assumes the BayesDB instance was created with 10 models. The BayesDB instance is uploaded to the given S3 bucket and each worker node copies locally prior to analysis."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The command will then create 10 jobs that are executed by the workers. Each line represents an analysis result for each model. The merge step for all analysis results is not implemented at this time."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```\n",
"$ ./cluster --bucket ${S3_BUCKET} analyze -m 0-9 ${CLUSTER_ROOT_ADDRESS} ${BAYESDB_INSTANCE}\n",
"analyzing database (simulate.bdb)\n",
"[(68.44879985673725,), (39.19803755284626,), (57.83632083190053,), (187.87527696804176,), (-7.37146061821425,), (105.35129673496527,), (156.3770783100769,), (182.16702988014572,), (-11.953060800697891,), (-23.25023795525391,)]\n",
"[(71.96460230508964,), (46.367638176929205,), (62.068987599900765,), (184.85865646255232,), (7.93847974375069,), (101.2872819847918,), (153.86177008119003,), (168.37734456155388,), (5.613419920035298,), (-6.629220236876655,)]\n",
"[(71.96460230508964,), (46.367638176929205,), (62.068987599900765,), (184.85865646255232,), (7.93847974375069,), (101.2872819847918,), (153.86177008119003,), (168.37734456155388,), (5.613419920035298,), (-6.629220236876655,)]\n",
"[(59.964025190358164,), (39.94363406398395,), (52.24679131631255,), (147.91810231235712,), (9.798844918411511,), (83.00943940065602,), (123.81797713973219,), (135.49144531101808,), (7.920677533495031,), (-1.5709962859286932,)]\n",
"[(71.96460230508964,), (46.367638176929205,), (62.068987599900765,), (184.85865646255232,), (7.93847974375069,), (101.2872819847918,), (153.86177008119003,), (168.37734456155388,), (5.613419920035298,), (-6.629220236876655,)]\n",
"[(59.964025190358164,), (39.94363406398395,), (52.24679131631255,), (147.91810231235712,), (9.798844918411511,), (83.00943940065602,), (123.81797713973219,), (135.49144531101808,), (7.920677533495031,), (-1.5709962859286932,)]\n",
"[(61.11837171046152,), (38.16002129103355,), (52.15318280828974,), (163.78302893784,), (4.043815052282547,), (86.97533848348814,), (135.39170788726034,), (147.11353417045154,), (2.193581042222725,), (-9.117566630506296,)]\n",
"[(71.96460230508964,), (46.367638176929205,), (62.068987599900765,), (184.85865646255232,), (7.93847974375069,), (101.2872819847918,), (153.86177008119003,), (168.37734456155388,), (5.613419920035298,), (-6.629220236876655,)]\n",
"[(71.96460230508964,), (46.367638176929205,), (62.068987599900765,), (184.85865646255232,), (7.93847974375069,), (101.2872819847918,), (153.86177008119003,), (168.37734456155388,), (5.613419920035298,), (-6.629220236876655,)]\n",
"[(71.96460230508964,), (46.367638176929205,), (62.068987599900765,), (184.85865646255232,), (7.93847974375069,), (101.2872819847918,), (153.86177008119003,), (168.37734456155388,), (5.613419920035298,), (-6.629220236876655,)]\n",
"\n",
"real 0m2.288s\n",
"user 0m0.199s\n",
"sys 0m0.109s\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Future Work"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"* In order to parallelize ANALYZE statement, we drop all models except one and then run the ANALYZE statement. Find out if we can support executing an ANALYZE statement on individual models without need to drop models.\n",
"* Individual results are returned without merging. Explore how to merge those results and return to client.\n",
"* Currently, we parallelize at the statement level (above the bayeslite library). Explore if we can parallelize at the cgpm level (below the bayeslite library).\n",
"* Speak with @fsaad about other parallelization strategies."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Appendix. Cluster CLI"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```python\n",
"#!/usr/bin/env python\n",
"\n",
"import os\n",
"import os.path\n",
"import re\n",
"\n",
"import boto3\n",
"import botocore\n",
"import click\n",
"import distributed\n",
"\n",
"\n",
"DEFAULT_S3_BUCKET = 'probcomp-lambda'\n",
"\n",
"\n",
"def get_session(ctx):\n",
" if ctx.obj['PROFILE']:\n",
" return boto3.Session(profile_name=ctx.obj['PROFILE'])\n",
" return boto3.session.Session()\n",
"\n",
"\n",
"@click.group()\n",
"@click.option('--role', envvar='AWS_ROLE_ARN', type=str, help='Role ARN')\n",
"@click.option('--profile', default='default', help='AWS credential profile')\n",
"@click.option('--bucket', envvar='AWS_S3_BUCKET', default=DEFAULT_S3_BUCKET,\n",
" type=str, help='S3 bucket to upload deployment')\n",
"@click.pass_context\n",
"def cli(ctx, role, profile, bucket):\n",
" ctx.obj['ROLE'] = role\n",
" ctx.obj['PROFILE'] = profile\n",
" ctx.obj['BUCKET'] = bucket\n",
"\n",
"\n",
"@cli.command('upload')\n",
"@click.argument('database')\n",
"@click.pass_context\n",
"def cluster_upload(ctx, database):\n",
" \"\"\"\n",
" Upload to AWS S3 bucket\n",
" \"\"\"\n",
" session = get_session(ctx)\n",
" s3_client = session.client('s3')\n",
"\n",
" click.secho(\"creating bucket if not available ({})\".format(ctx.obj['BUCKET']), fg='green')\n",
"\n",
" try:\n",
" s3_client.head_bucket(Bucket=ctx.obj['BUCKET'])\n",
" except botocore.exceptions.ClientError:\n",
" s3_client.create_bucket(Bucket=ctx.obj['BUCKET'])\n",
"\n",
" click.secho(\"uploading database ({})\".format(database), fg='green')\n",
"\n",
" with open(database, 'rb') as f:\n",
" s3_client.upload_fileobj(f, ctx.obj['BUCKET'], os.path.basename(database))\n",
"\n",
"\n",
"def validate_modelset(ctx, param, values):\n",
" models = []\n",
" for value in values:\n",
" value = value.replace(' ', '')\n",
" if '-' in value:\n",
" (lower, upper) = map(int, value.split('-'))\n",
" models.extend(list(range(lower, upper+1)))\n",
" else:\n",
" models.append(int(value))\n",
" return sorted(models)\n",
"\n",
"\n",
"@cli.command('analyze')\n",
"@click.argument('host')\n",
"@click.argument('database')\n",
"@click.option('-m', '--modelset', callback=validate_modelset, multiple=True, default='1')\n",
"@click.pass_context\n",
"def cluster_analyze(ctx, host, database, modelset):\n",
" click.secho(\"analyzing database ({})\".format(database), fg='green')\n",
"\n",
" session = get_session(ctx)\n",
" credentials = session.get_credentials().get_frozen_credentials()\n",
"\n",
" def exclude(model, modelset):\n",
" return list(set(modelset) - set([model]))\n",
"\n",
" def analyze(database, model, excluded, s3_bucket, s3_key, s3_secret):\n",
" import bayeslite\n",
" import s3fs\n",
" import tempfile\n",
"\n",
" # Copy DB instance from S3 and place in local unique location\n",
" s3 = s3fs.S3FileSystem(anon=False, key=s3_key, secret=s3_secret)\n",
" s3_path = os.path.join(s3_bucket, database)\n",
" _, local_path = tempfile.mkstemp()\n",
" s3.get(s3_path, local_path)\n",
"\n",
" result = None\n",
" with bayeslite.bayesdb_open(local_path) as bdb:\n",
" # Enable ANALYZE statements\n",
" os.environ['BAYESDB_WIZARD_MODE'] = '1'\n",
"\n",
" # Disable multiprocessing engine for Dask\n",
" bdb.metamodels['cgpm'].set_multiprocess(False)\n",
"\n",
" for model in excluded:\n",
" bdb.execute('DROP MODELS {} FROM t_g'.format(model))\n",
" bdb.execute('ANALYZE t_g FOR 20 ITERATIONS WAIT')\n",
" result = bdb.execute('SIMULATE y FROM t_p GIVEN oid = 1, w = 3 LIMIT 10').fetchall()\n",
"\n",
" os.remove(local_path)\n",
" return result\n",
"\n",
" address = '{}:{}'.format(host, 8786)\n",
" client = distributed.Client(address)\n",
" tasks = [client.submit(analyze, database, model, exclude(model, modelset), ctx.obj['BUCKET'], credentials.access_key, credentials.secret_key) for model in modelset]\n",
" results = client.gather(tasks)\n",
" for result in results:\n",
" click.echo(result)\n",
" # TODO: Merge multiple results\n",
"\n",
"\n",
"if __name__ == '__main__':\n",
" cli(obj={})\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.12"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment