Skip to content

Instantly share code, notes, and snippets.

@roaramburu
Created October 22, 2019 16:47
Show Gist options
  • Save roaramburu/0688eab585e1372ead1905489c4acc7e to your computer and use it in GitHub Desktop.
Save roaramburu/0688eab585e1372ead1905489c4acc7e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# coding: utf-8
# # BlazingSQL TPC-H Benchmark Notebook
#
# In this notebook we will setup a distributed BlazingSQL cluster with Dask running cuDF on every node as well. We will query data residing on Google Cloud Storage (GCS) in Apache Parquet files.
#
# First we will import necessary packages and launch the distributed servicers.
# In[ ]:
from blazingsql import BlazingContext
from dask.distributed import Client
import time
client = Client('127.0.0.1:8786')
bc = BlazingContext(logs_destination="test_logs", dask_client=client, run_engine=False)
# ## Create Tables
#
# Here we are registering a Google Cloud Storage bucket, and then creating tables on the Parquet files inside said bucket.
# In[2]:
bc.gcs('gcs_100gb',project_id='blazingsql-enduser', bucket_name='bsql')
dir_data_fs = "gcs://gcs_100gb" + "/" + "tpch_sf100"
bc.create_table('nation',dir_data_fs+'/nation/0_0_0.parquet')
bc.create_table('region',dir_data_fs+'/region/0_0_0.parquet')
files = [dir_data_fs + '/orders/0_0_' + str(num) + '.parquet' for num in range(0,16)]
bc.create_table('orders',files)
files = [dir_data_fs + '/lineitem/0_0_' + str(num) + '.parquet' for num in range(0,72)]
bc.create_table('lineitem',files)
files = [dir_data_fs + '/customer/0_0_' + str(num) + '.parquet' for num in range(0,4)]
bc.create_table('customer',files)
files = [dir_data_fs + '/part/0_0_' + str(num) + '.parquet' for num in range(0,2)]
bc.create_table('part',files)
files = [dir_data_fs + '/partsupp/0_0_' + str(num) + '.parquet' for num in range(0,6)]
bc.create_table('partsupp',files)
bc.create_table('supplier',dir_data_fs+'/supplier/0_0_0.parquet')
# ## Run Queries
#
# ### Q3
# In[5]:
t0 = time.time()
queryId = 'TEST_03'
print(queryId)
query = """ select
l.l_orderkey, sum(l.l_extendedprice * (1 - l.l_discount)) as revenue, o.o_orderdate, o.o_shippriority
from
orders as o
INNER JOIN lineitem as l ON l.l_orderkey = o.o_orderkey
INNER JOIN customer as c ON c.c_custkey = o.o_custkey
where
c.c_mktsegment = 'BUILDING'
and o.o_orderdate < date '1995-03-15'
and l.l_shipdate > date '1995-03-15'
group by
l.l_orderkey, o.o_orderdate, o.o_shippriority
order by
revenue desc, o.o_orderdate"""
result = bc.sql(query).get()
t2 = time.time()
print(result.head())
print("query time")
print(t2 - t0)
# ### Q4
# In[4]:
t0 = time.time()
queryId = 'TEST_04'
print(queryId)
query = """ select
o_orderpriority, count(*) as order_count
from
orders
where
o_orderdate >= date '1993-07-01'
and o_orderdate < date '1994-10-01'
and exists (select
*
from
lineitem
where
l_orderkey = o_orderkey
and l_commitdate < l_receiptdate)
group by
o_orderpriority
order by
o_orderpriority"""
result = bc.sql(query).get()
print(result.head(2,2))
t2 = time.time()
print("query time")
print(t2 - t0)
# ### Q5
# In[3]:
t0 = time.time()
queryId = 'TEST_05'
print(queryId)
query = """ select
n_name, sum(l_extendedprice * (1 - l_discount)) as revenue
from orders
inner join lineitem on l_orderkey = o_orderkey
inner join customer on o_custkey = c_custkey
inner join supplier on l_suppkey = s_suppkey and c_nationkey = s_nationkey
inner join nation on n_nationkey = s_nationkey
inner join region on n_regionkey = r_regionkey
where
r_name = 'ASIA'
and o_orderdate >= date '1994-01-01'
and o_orderdate < date '1995-01-01'
group by
n_name
order by
revenue desc"""
result = bc.sql(query).get()
t2 = time.time()
print("query time")
print(t2 - t0)
print(result.head())
# ### Q6
# In[6]:
t0 = time.time()
queryId = 'TEST_06'
print(queryId)
query = """ select
sum(l_extendedprice*l_discount) as revenue
from
lineitem
where
l_shipdate >= date '1994-01-01'
and l_shipdate < date '1995-01-01'
and l_discount between 0.05 and 0.07
and l_quantity < 24"""
result = bc.sql(query).get()
t2 = time.time()
print("query time")
print(t2 - t0)
print(result.head())
# ### Q8
# In[ ]:
t0 = time.time()
queryId = 'TEST_08'
print(queryId)
query = """ select
o_year, sum(case when nationl = 'BRAZIL' then volume else 0 end) / sum(volume) as mkt_share
from (
select
extract(year from o.o_orderdate) as o_year, l.l_extendedprice * (1 - l.l_discount) as volume, n2.n_name as nationl
from
part as p
INNER JOIN lineitem as l ON p.p_partkey = l.l_partkey
INNER JOIN supplier as s ON s.s_suppkey = l.l_suppkey
INNER JOIN orders as o ON o.o_orderkey = l.l_orderkey
INNER JOIN customer as c ON c.c_custkey = o.o_custkey
INNER JOIN nation as n1 ON n1.n_nationkey = c.c_nationkey
INNER JOIN region as r ON r.r_regionkey = n1.n_regionkey
INNER JOIN nation as n2 ON n2.n_nationkey = s.s_nationkey
where
r.r_name = 'AMERICA'
and o.o_orderdate >= date '1995-01-01' and o.o_orderdate <= date '1996-12-31'
and p.p_type = 'ECONOMY ANODIZED STEEL' ) as all_nations
group by
o_year
order by
o_year"""
result = bc.sql(query).get()
t2 = time.time()
print("query time")
print(t2 - t0)
print(result.head())
# ### Q9
# In[ ]:
t0 = time.time()
queryId = 'TEST_09'
print(queryId)
query = """ select
nationl, o_year, sum(amount) as sum_profit
from
( select n_name as nationl, extract(year from o_orderdate) as o_year,
l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount
from lineitem
INNER JOIN orders ON o_orderkey = l_orderkey
INNER JOIN partsupp ON ps_suppkey = l_suppkey and ps_partkey = l_partkey
INNER JOIN part ON p_partkey = l_partkey
INNER JOIN supplier ON s_suppkey = l_suppkey
INNER JOIN nation ON n_nationkey = s_nationkey
where
p_name like '%green%' ) as profit
group by
nationl, o_year
order by
nationl, o_year desc"""
result = bc.sql(query).get()
print("query time")
print(t2 - t0)
print(result.head())
# ### Q13
# In[ ]:
queryId = 'TEST_13'
print(queryId)
query = """ select
c_count, count(*) as custdist
from
(select
c.c_custkey, count(o.o_orderkey)
from
customer as c
LEFT OUTER JOIN orders as o ON c.c_custkey = o.o_custkey
where o.o_comment not like '%special%requests%'
group by
c.c_custkey) as c_orders (c_custkey, c_count)
group by
c_count
order by
custdist desc, c_count desc"""
result = bc.sql(query).get()
print(result.head())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment