Created
October 22, 2019 16:47
-
-
Save roaramburu/0688eab585e1372ead1905489c4acc7e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
# # BlazingSQL TPC-H Benchmark Notebook | |
# | |
# In this notebook we will setup a distributed BlazingSQL cluster with Dask running cuDF on every node as well. We will query data residing on Google Cloud Storage (GCS) in Apache Parquet files. | |
# | |
# First we will import necessary packages and launch the distributed servicers. | |
# In[ ]: | |
from blazingsql import BlazingContext | |
from dask.distributed import Client | |
import time | |
client = Client('127.0.0.1:8786') | |
bc = BlazingContext(logs_destination="test_logs", dask_client=client, run_engine=False) | |
# ## Create Tables | |
# | |
# Here we are registering a Google Cloud Storage bucket, and then creating tables on the Parquet files inside said bucket. | |
# In[2]: | |
bc.gcs('gcs_100gb',project_id='blazingsql-enduser', bucket_name='bsql') | |
dir_data_fs = "gcs://gcs_100gb" + "/" + "tpch_sf100" | |
bc.create_table('nation',dir_data_fs+'/nation/0_0_0.parquet') | |
bc.create_table('region',dir_data_fs+'/region/0_0_0.parquet') | |
files = [dir_data_fs + '/orders/0_0_' + str(num) + '.parquet' for num in range(0,16)] | |
bc.create_table('orders',files) | |
files = [dir_data_fs + '/lineitem/0_0_' + str(num) + '.parquet' for num in range(0,72)] | |
bc.create_table('lineitem',files) | |
files = [dir_data_fs + '/customer/0_0_' + str(num) + '.parquet' for num in range(0,4)] | |
bc.create_table('customer',files) | |
files = [dir_data_fs + '/part/0_0_' + str(num) + '.parquet' for num in range(0,2)] | |
bc.create_table('part',files) | |
files = [dir_data_fs + '/partsupp/0_0_' + str(num) + '.parquet' for num in range(0,6)] | |
bc.create_table('partsupp',files) | |
bc.create_table('supplier',dir_data_fs+'/supplier/0_0_0.parquet') | |
# ## Run Queries | |
# | |
# ### Q3 | |
# In[5]: | |
t0 = time.time() | |
queryId = 'TEST_03' | |
print(queryId) | |
query = """ select | |
l.l_orderkey, sum(l.l_extendedprice * (1 - l.l_discount)) as revenue, o.o_orderdate, o.o_shippriority | |
from | |
orders as o | |
INNER JOIN lineitem as l ON l.l_orderkey = o.o_orderkey | |
INNER JOIN customer as c ON c.c_custkey = o.o_custkey | |
where | |
c.c_mktsegment = 'BUILDING' | |
and o.o_orderdate < date '1995-03-15' | |
and l.l_shipdate > date '1995-03-15' | |
group by | |
l.l_orderkey, o.o_orderdate, o.o_shippriority | |
order by | |
revenue desc, o.o_orderdate""" | |
result = bc.sql(query).get() | |
t2 = time.time() | |
print(result.head()) | |
print("query time") | |
print(t2 - t0) | |
# ### Q4 | |
# In[4]: | |
t0 = time.time() | |
queryId = 'TEST_04' | |
print(queryId) | |
query = """ select | |
o_orderpriority, count(*) as order_count | |
from | |
orders | |
where | |
o_orderdate >= date '1993-07-01' | |
and o_orderdate < date '1994-10-01' | |
and exists (select | |
* | |
from | |
lineitem | |
where | |
l_orderkey = o_orderkey | |
and l_commitdate < l_receiptdate) | |
group by | |
o_orderpriority | |
order by | |
o_orderpriority""" | |
result = bc.sql(query).get() | |
print(result.head(2,2)) | |
t2 = time.time() | |
print("query time") | |
print(t2 - t0) | |
# ### Q5 | |
# In[3]: | |
t0 = time.time() | |
queryId = 'TEST_05' | |
print(queryId) | |
query = """ select | |
n_name, sum(l_extendedprice * (1 - l_discount)) as revenue | |
from orders | |
inner join lineitem on l_orderkey = o_orderkey | |
inner join customer on o_custkey = c_custkey | |
inner join supplier on l_suppkey = s_suppkey and c_nationkey = s_nationkey | |
inner join nation on n_nationkey = s_nationkey | |
inner join region on n_regionkey = r_regionkey | |
where | |
r_name = 'ASIA' | |
and o_orderdate >= date '1994-01-01' | |
and o_orderdate < date '1995-01-01' | |
group by | |
n_name | |
order by | |
revenue desc""" | |
result = bc.sql(query).get() | |
t2 = time.time() | |
print("query time") | |
print(t2 - t0) | |
print(result.head()) | |
# ### Q6 | |
# In[6]: | |
t0 = time.time() | |
queryId = 'TEST_06' | |
print(queryId) | |
query = """ select | |
sum(l_extendedprice*l_discount) as revenue | |
from | |
lineitem | |
where | |
l_shipdate >= date '1994-01-01' | |
and l_shipdate < date '1995-01-01' | |
and l_discount between 0.05 and 0.07 | |
and l_quantity < 24""" | |
result = bc.sql(query).get() | |
t2 = time.time() | |
print("query time") | |
print(t2 - t0) | |
print(result.head()) | |
# ### Q8 | |
# In[ ]: | |
t0 = time.time() | |
queryId = 'TEST_08' | |
print(queryId) | |
query = """ select | |
o_year, sum(case when nationl = 'BRAZIL' then volume else 0 end) / sum(volume) as mkt_share | |
from ( | |
select | |
extract(year from o.o_orderdate) as o_year, l.l_extendedprice * (1 - l.l_discount) as volume, n2.n_name as nationl | |
from | |
part as p | |
INNER JOIN lineitem as l ON p.p_partkey = l.l_partkey | |
INNER JOIN supplier as s ON s.s_suppkey = l.l_suppkey | |
INNER JOIN orders as o ON o.o_orderkey = l.l_orderkey | |
INNER JOIN customer as c ON c.c_custkey = o.o_custkey | |
INNER JOIN nation as n1 ON n1.n_nationkey = c.c_nationkey | |
INNER JOIN region as r ON r.r_regionkey = n1.n_regionkey | |
INNER JOIN nation as n2 ON n2.n_nationkey = s.s_nationkey | |
where | |
r.r_name = 'AMERICA' | |
and o.o_orderdate >= date '1995-01-01' and o.o_orderdate <= date '1996-12-31' | |
and p.p_type = 'ECONOMY ANODIZED STEEL' ) as all_nations | |
group by | |
o_year | |
order by | |
o_year""" | |
result = bc.sql(query).get() | |
t2 = time.time() | |
print("query time") | |
print(t2 - t0) | |
print(result.head()) | |
# ### Q9 | |
# In[ ]: | |
t0 = time.time() | |
queryId = 'TEST_09' | |
print(queryId) | |
query = """ select | |
nationl, o_year, sum(amount) as sum_profit | |
from | |
( select n_name as nationl, extract(year from o_orderdate) as o_year, | |
l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount | |
from lineitem | |
INNER JOIN orders ON o_orderkey = l_orderkey | |
INNER JOIN partsupp ON ps_suppkey = l_suppkey and ps_partkey = l_partkey | |
INNER JOIN part ON p_partkey = l_partkey | |
INNER JOIN supplier ON s_suppkey = l_suppkey | |
INNER JOIN nation ON n_nationkey = s_nationkey | |
where | |
p_name like '%green%' ) as profit | |
group by | |
nationl, o_year | |
order by | |
nationl, o_year desc""" | |
result = bc.sql(query).get() | |
print("query time") | |
print(t2 - t0) | |
print(result.head()) | |
# ### Q13 | |
# In[ ]: | |
queryId = 'TEST_13' | |
print(queryId) | |
query = """ select | |
c_count, count(*) as custdist | |
from | |
(select | |
c.c_custkey, count(o.o_orderkey) | |
from | |
customer as c | |
LEFT OUTER JOIN orders as o ON c.c_custkey = o.o_custkey | |
where o.o_comment not like '%special%requests%' | |
group by | |
c.c_custkey) as c_orders (c_custkey, c_count) | |
group by | |
c_count | |
order by | |
custdist desc, c_count desc""" | |
result = bc.sql(query).get() | |
print(result.head()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment