Skip to content

Instantly share code, notes, and snippets.

@roaramburu
Created November 13, 2019 18:24
Show Gist options
  • Save roaramburu/aacd61ce19dafca561e892d466ed656a to your computer and use it in GitHub Desktop.
Save roaramburu/aacd61ce19dafca561e892d466ed656a to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# BlazingSQL TPC-H Benchmark Notebook\n",
"\n",
"In this notebook we will setup a distributed BlazingSQL cluster with Dask running cuDF on every node as well. We will query data residing on Google Cloud Storage (GCS) in Apache Parquet files.\n",
"\n",
"First we will import necessary packages and launch the distributed servicers."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"from blazingsql import BlazingContext\n",
"from dask.distributed import Client\n",
"client = Client('blazingsql-dask-scheduler-svc:8786')\n",
"\n",
"client\n",
"num_runs = 1"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"BlazingContext ready\n"
]
}
],
"source": [
"# bc = BlazingContext(logs_destination=\"test_logs\", dask_client=client)\n",
"\n",
"bc = BlazingContext(connection = 'blazingsql-orchestrator-svc:8889', \n",
" dask_client=client, run_orchestrator=False, run_engine=False, \n",
" run_algebra=False, leave_processes_running=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"bc.ready()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"### Register Google Cloud Storage Bucket"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"OrderedDict([('type', 'gcs'),\n",
" ('project_id', 'blazingdb-demo'),\n",
" ('bucket_name', 'bsql-demos'),\n",
" ('use_default_adc_json_file', True),\n",
" ('adc_json_file', '')])"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"authority = \"gsbucket2\"\n",
"bc.gcs(authority,\n",
" project_id='blazingdb-demo',\n",
" bucket_name='bsql-demos',\n",
" use_default_adc_json_file=True,\n",
" adc_json_file='')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Create Tables"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"dir_data_fs_100 = \"gcs://gsbucket2\" + \"/\" + \"tpch_sf100\"\n",
"dir_data_fs_300 = \"gcs://gsbucket2\" + \"/\" + \"tpch_sf300\"\n",
"dir_data_fs_1000 = \"gcs://gsbucket2\" + \"/\" + \"tpch_sf1000\"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"base = dir_data_fs_100\n",
"bc.create_table('nation',dir_data_fs_100+'/nation/0_0_0.parquet')\n",
"bc.create_table('region',dir_data_fs_100+'/region/0_0_0.parquet')\n",
"\n",
"files = [base + '/orders/0_0_' + str(num) + '.parquet' for num in range(0,16)]\n",
"t = bc.create_table('orders',files)\n",
"# print(t)\n",
"print(len(t.datasource_files()))\n",
"\n",
"files = [base + '/lineitem/0_0_' + str(num) + '.parquet' for num in range(0,72)]\n",
"t = bc.create_table('lineitem',files)\n",
"# print(t)\n",
"print(len(t.datasource_files()))\n",
"\n",
"files = [base + '/customer/0_0_' + str(num) + '.parquet' for num in range(0,4)]\n",
"t = bc.create_table('customer',files)\n",
"# print(t)\n",
"print(len(t.datasource_files()))\n",
"\n",
"files = [base + '/part/0_0_' + str(num) + '.parquet' for num in range(0,2)]\n",
"t = bc.create_table('part',files)\n",
"# print(t)\n",
"print(len(t.datasource_files()))\n",
"\n",
"files = [base + '/partsupp/0_0_' + str(num) + '.parquet' for num in range(0,6)]\n",
"t = bc.create_table('partsupp',files)\n",
"# print(t)\n",
"print(len(t.datasource_files()))\n",
"\n",
"t = bc.create_table('supplier',base+'/supplier/0_0_0.parquet')\n",
"# print(t)\n",
"print(len(t.datasource_files()))"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"99\n",
"99\n",
"99\n",
"99\n",
"99\n",
"1\n"
]
}
],
"source": [
"base = dir_data_fs_300\n",
"bc.create_table('nation',dir_data_fs_100+'/nation/0_0_0.parquet')\n",
"bc.create_table('region',dir_data_fs_100+'/region/0_0_0.parquet')\n",
"\n",
"files = [base + '/orders/' + str(num) + '.parquet' for num in range(0,99)]\n",
"t = bc.create_table('orders',files)\n",
"# print(t)\n",
"print(len(t.datasource_files()))\n",
"\n",
"files = [base + '/lineitem/' + str(num) + '.parquet' for num in range(0,99)]\n",
"t = bc.create_table('lineitem',files)\n",
"# print(t)\n",
"print(len(t.datasource_files()))\n",
"\n",
"files = [base + '/customer/' + str(num) + '.parquet' for num in range(0,99)]\n",
"t = bc.create_table('customer',files)\n",
"# print(t)\n",
"print(len(t.datasource_files()))\n",
"\n",
"files = [base + '/part/' + str(num) + '.parquet' for num in range(0,99)]\n",
"t = bc.create_table('part',files)\n",
"# print(t)\n",
"print(len(t.datasource_files()))\n",
"\n",
"files = [base + '/partsupp/' + str(num) + '.parquet' for num in range(0,99)]\n",
"t = bc.create_table('partsupp',files)\n",
"# print(t)\n",
"print(len(t.datasource_files()))\n",
"\n",
"t = bc.create_table('supplier',base+'/supplier/0.parquet')\n",
"# print(t)\n",
"print(len(t.datasource_files()))"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"num_runs = 3"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Run Queries\n",
"### Q2"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"TEST_02\n"
]
}
],
"source": [
"%%time\n",
"t0 = time.time()\n",
"queryId = 'TEST_02'\n",
"print(queryId)\n",
"query = \"\"\" select \n",
" s.s_acctbal, s.s_name, n.n_name, p.p_partkey, p.p_mfgr, s.s_address, s.s_phone, s.s_comment\n",
" from \n",
" supplier as s \n",
" INNER JOIN nation as n ON s.s_nationkey = n.n_nationkey \n",
" INNER JOIN partsupp as ps ON s.s_suppkey = ps.ps_suppkey\n",
" INNER JOIN part as p ON p.p_partkey = ps.ps_partkey \n",
" INNER JOIN region as r ON r.r_regionkey = n.n_regionkey\n",
" where r.r_name = 'EUROPE' and p.p_size = 15\n",
" and p.p_type like '%BRASS'\n",
" and ps.ps_supplycost = (\n",
" select \n",
" min(psq.ps_supplycost)\n",
" from \n",
" partsupp as psq\n",
" INNER JOIN supplier as sq ON sq.s_suppkey = psq.ps_suppkey\n",
" INNER JOIN nation as nq ON nq.n_nationkey = sq.s_nationkey\n",
" INNER JOIN region as rq ON rq.r_regionkey = nq.n_regionkey\n",
" where\n",
" rq.r_name = 'EUROPE'\n",
" )\n",
" order by \n",
" s.s_acctbal desc, n.n_name, s.s_name, p.p_partkey\"\"\"\n",
"times=[]\n",
"for i in range(0,num_runs):\n",
" t0 = time.time()\n",
" result = bc.sql(query).get()\n",
" t2 = time.time()\n",
" times.append(t2 - t0)\n",
" print(result.head(5,15))\n",
" del result\n",
" time.sleep(3) \n",
"print(\"query times:\")\n",
"for i in range(0,num_runs):\n",
" print(times[i])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Q3"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"t0 = time.time()\n",
"queryId = 'TEST_03'\n",
"print(queryId)\n",
"query = \"\"\" select \n",
" l.l_orderkey, sum(l.l_extendedprice * (1 - l.l_discount)) as revenue, o.o_orderdate, o.o_shippriority\n",
" from \n",
" orders as o\n",
" INNER JOIN lineitem as l ON l.l_orderkey = o.o_orderkey\n",
" INNER JOIN customer as c ON c.c_custkey = o.o_custkey\n",
" where\n",
" c.c_mktsegment = 'BUILDING'\n",
" and o.o_orderdate < date '1995-03-15' \n",
" and l.l_shipdate > date '1995-03-15'\n",
" group by\n",
" l.l_orderkey, o.o_orderdate, o.o_shippriority\n",
" order by\n",
" revenue desc, o.o_orderdate\"\"\"\n",
"times=[]\n",
"for i in range(0,num_runs):\n",
" t0 = time.time()\n",
" result = bc.sql(query).get()\n",
" t2 = time.time()\n",
" times.append(t2 - t0)\n",
" print(result.head(5,15))\n",
" del result\n",
" time.sleep(3)\n",
"print(\"query times:\")\n",
"for i in range(0,num_runs):\n",
" print(times[i])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Q4"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"t0 = time.time()\n",
"queryId = 'TEST_04'\n",
"print(queryId)\n",
"query = \"\"\" select\n",
" o_orderpriority, count(*) as order_count\n",
" from\n",
" orders\n",
" where\n",
" o_orderdate >= date '1993-07-01'\n",
" and o_orderdate < date '1994-10-01'\n",
" and exists (select\n",
" *\n",
" from\n",
" lineitem\n",
" where\n",
" l_orderkey = o_orderkey\n",
" and l_commitdate < l_receiptdate)\n",
" group by\n",
" o_orderpriority \n",
" order by \n",
" o_orderpriority\"\"\"\n",
"times=[]\n",
"for i in range(0,num_runs):\n",
" t0 = time.time()\n",
" result = bc.sql(query).get()\n",
" t2 = time.time()\n",
" times.append(t2 - t0)\n",
" print(result.head(5,15))\n",
" del result\n",
" time.sleep(3)\n",
"print(\"query times:\")\n",
"for i in range(0,num_runs):\n",
" print(times[i])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Q5"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"t0 = time.time()\n",
"queryId = 'TEST_05'\n",
"print(queryId)\n",
"query = \"\"\" select\n",
" n_name, sum(l_extendedprice * (1 - l_discount)) as revenue\n",
" from orders\n",
" inner join lineitem on l_orderkey = o_orderkey\n",
" inner join customer on o_custkey = c_custkey\n",
" inner join supplier on l_suppkey = s_suppkey and c_nationkey = s_nationkey\n",
" inner join nation on n_nationkey = s_nationkey\n",
" inner join region on n_regionkey = r_regionkey\n",
" where\n",
" r_name = 'ASIA' \n",
" and o_orderdate >= date '1994-01-01'\n",
" and o_orderdate < date '1995-01-01' \n",
" group by\n",
" n_name\n",
" order by\n",
" revenue desc\"\"\"\n",
"times=[]\n",
"for i in range(0,num_runs):\n",
" t0 = time.time()\n",
" result = bc.sql(query).get()\n",
" t2 = time.time()\n",
" times.append(t2 - t0)\n",
" print(result.head(5,15))\n",
" del result\n",
" time.sleep(3)\n",
"print(\"query times:\")\n",
"for i in range(0,num_runs):\n",
" print(times[i])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Q6"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"t0 = time.time()\n",
"queryId = 'TEST_06'\n",
"print(queryId)\n",
"query = \"\"\" select\n",
" sum(l_extendedprice*l_discount) as revenue\n",
" from\n",
" lineitem\n",
" where\n",
" l_shipdate >= date '1994-01-01' \n",
" and l_shipdate < date '1995-01-01'\n",
" and l_discount between 0.05 and 0.07 \n",
" and l_quantity < 24\"\"\"\n",
"times=[]\n",
"for i in range(0,num_runs):\n",
" t0 = time.time()\n",
" result = bc.sql(query).get()\n",
" t2 = time.time()\n",
" times.append(t2 - t0)\n",
" print(result.head(5,15))\n",
" del result\n",
" time.sleep(3)\n",
"print(\"query times:\")\n",
"for i in range(0,num_runs):\n",
" print(times[i])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Q8"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"t0 = time.time()\n",
"queryId = 'TEST_08'\n",
"print(queryId)\n",
"query = \"\"\" select \n",
" o_year, sum(case when nationl = 'BRAZIL' then volume else 0 end) / sum(volume) as mkt_share\n",
" from (\n",
" select \n",
" extract(year from o.o_orderdate) as o_year, l.l_extendedprice * (1 - l.l_discount) as volume, n2.n_name as nationl\n",
" from \n",
" part as p\n",
" INNER JOIN lineitem as l ON p.p_partkey = l.l_partkey\n",
" INNER JOIN supplier as s ON s.s_suppkey = l.l_suppkey\n",
" INNER JOIN orders as o ON o.o_orderkey = l.l_orderkey \n",
" INNER JOIN customer as c ON c.c_custkey = o.o_custkey\n",
" INNER JOIN nation as n1 ON n1.n_nationkey = c.c_nationkey \n",
" INNER JOIN region as r ON r.r_regionkey = n1.n_regionkey\n",
" INNER JOIN nation as n2 ON n2.n_nationkey = s.s_nationkey\n",
" where \n",
" r.r_name = 'AMERICA' \n",
" and o.o_orderdate >= date '1995-01-01' and o.o_orderdate <= date '1996-12-31'\n",
" and p.p_type = 'ECONOMY ANODIZED STEEL' ) as all_nations\n",
" group by\n",
" o_year\n",
" order by\n",
" o_year\"\"\"\n",
"times=[]\n",
"for i in range(0,num_runs):\n",
" t0 = time.time()\n",
" result = bc.sql(query).get()\n",
" t2 = time.time()\n",
" times.append(t2 - t0)\n",
" print(result.head(5,15))\n",
" del result\n",
" time.sleep(3)\n",
"print(\"query times:\")\n",
"for i in range(0,num_runs):\n",
" print(times[i])\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Q13"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%time\n",
"queryId = 'TEST_13'\n",
"print(queryId)\n",
"query = \"\"\" select\n",
" c_count, count(*) as custdist\n",
" from\n",
" (select\n",
" c.c_custkey, count(o.o_orderkey)\n",
" from\n",
" customer as c\n",
" LEFT OUTER JOIN orders as o ON c.c_custkey = o.o_custkey\n",
" where o.o_comment not like '%special%requests%'\n",
" group by\n",
" c.c_custkey) as c_orders (c_custkey, c_count)\n",
" group by\n",
" c_count\n",
" order by\n",
" custdist desc, c_count desc\"\"\"\n",
"times=[]\n",
"for i in range(0,num_runs):\n",
" t0 = time.time()\n",
" result = bc.sql(query).get()\n",
" t2 = time.time()\n",
" times.append(t2 - t0)\n",
" print(result.head(5,15))\n",
" del result\n",
" time.sleep(3)\n",
"print(\"query times:\")\n",
"for i in range(0,num_runs):\n",
" print(times[i])\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment