Skip to content

Instantly share code, notes, and snippets.

@UranusSeven
Last active August 31, 2023 00:16
Show Gist options
  • Save UranusSeven/55817bf0f304cc24f5eb63b2f1c3e2cd to your computer and use it in GitHub Desktop.
Save UranusSeven/55817bf0f304cc24f5eb63b2f1c3e2cd to your computer and use it in GitHub Desktop.
Benchmark pandas 2.0.0 with TPC-H

The TPC-H is a decision support benchmark. It consists of a suite of business oriented ad-hoc queries and concurrent data modifications.

Pandas 2.0.0 RC0 and RC1 has been released. According to the release notes, the main new features in 2.0.0 focus on performance improvement.

The benchmark runs on my laptop:

MacBook Pro (16-inch, 2021)
Chip Apple M1 Max
Memory 32 GB

I used pandas 1.5.2 as the baseline and tested all the 4 combinations of new features:

2.0.0rc0
2.0.0rc0 + lazy copy
2.0.0rc0 + pyarrow dtype backend
2.0.0rc0 + lazy copy + pyarrow dtype backend

And here's the results, the numbers are running time in seconds. Lower means better.

round 1 round 2 round 3 average
1.5.3 11.94 11.81 11.94 11.89
2.0.0rc0 17.38 17.50 17.25 17.37
2.0.0rc0 + lazy copy 16.39 16.51 16.52 16.47
2.0.0rc0 + pyarrow dtype backend 51.89 52.55 52.60 52.34
2.0.0rc0 + lazy copy + pyarrow dtype backend 53.51 53.92 54.19 53.87
import argparse
import functools
import inspect
import json
import time
from typing import Callable, List, Dict
import pandas as pd
@functools.lru_cache
def load_lineitem(
data_folder: str, **storage_options
) -> pd.DataFrame:
data_path = data_folder + "/lineitem"
df = pd.read_parquet(
data_path, storage_options=storage_options
)
df["L_SHIPDATE"] = pd.to_datetime(df.L_SHIPDATE, format="%Y-%m-%d")
df["L_RECEIPTDATE"] = pd.to_datetime(df.L_RECEIPTDATE, format="%Y-%m-%d")
df["L_COMMITDATE"] = pd.to_datetime(df.L_COMMITDATE, format="%Y-%m-%d")
return df
@functools.lru_cache
def load_part(
data_folder: str, **storage_options
) -> pd.DataFrame:
data_path = data_folder + "/part"
df = pd.read_parquet(
data_path, storage_options=storage_options
)
return df
@functools.lru_cache
def load_orders(
data_folder: str, **storage_options
) -> pd.DataFrame:
data_path = data_folder + "/orders"
df = pd.read_parquet(
data_path, storage_options=storage_options
)
df["O_ORDERDATE"] = pd.to_datetime(df.O_ORDERDATE, format="%Y-%m-%d")
return df
@functools.lru_cache
def load_customer(
data_folder: str, **storage_options
) -> pd.DataFrame:
data_path = data_folder + "/customer"
df = pd.read_parquet(
data_path, storage_options=storage_options
)
return df
@functools.lru_cache
def load_nation(
data_folder: str, **storage_options
) -> pd.DataFrame:
data_path = data_folder + "/nation"
df = pd.read_parquet(
data_path, storage_options=storage_options
)
return df
@functools.lru_cache
def load_region(
data_folder: str, **storage_options
) -> pd.DataFrame:
data_path = data_folder + "/region"
df = pd.read_parquet(
data_path, storage_options=storage_options
)
return df
@functools.lru_cache
def load_supplier(
data_folder: str, **storage_options
) -> pd.DataFrame:
data_path = data_folder + "/supplier"
df = pd.read_parquet(
data_path, storage_options=storage_options
)
return df
@functools.lru_cache
def load_partsupp(
data_folder: str, **storage_options
) -> pd.DataFrame:
data_path = data_folder + "/partsupp"
df = pd.read_parquet(
data_path, storage_options=storage_options
)
return df
def timethis(q: Callable):
@functools.wraps(q)
def wrapped(*args, **kwargs):
t = time.time()
q(*args, **kwargs)
print("%s Execution time (s): %f" % (q.__name__.upper(), time.time() - t))
return wrapped
_query_to_datasets: Dict[int, List[str]] = dict()
def collect_datasets(func: Callable):
_query_to_datasets[int(func.__name__[1:])] = list(inspect.signature(func).parameters)
return func
@timethis
@collect_datasets
def q01(lineitem: pd.DataFrame):
date = pd.Timestamp("1998-09-02")
lineitem_filtered = lineitem.loc[
:,
[
"L_QUANTITY",
"L_EXTENDEDPRICE",
"L_DISCOUNT",
"L_TAX",
"L_RETURNFLAG",
"L_LINESTATUS",
"L_SHIPDATE",
"L_ORDERKEY",
],
]
sel = lineitem_filtered.L_SHIPDATE <= date
lineitem_filtered = lineitem_filtered[sel]
lineitem_filtered["AVG_QTY"] = lineitem_filtered.L_QUANTITY
lineitem_filtered["AVG_PRICE"] = lineitem_filtered.L_EXTENDEDPRICE
lineitem_filtered["DISC_PRICE"] = lineitem_filtered.L_EXTENDEDPRICE * (
1 - lineitem_filtered.L_DISCOUNT
)
lineitem_filtered["CHARGE"] = (
lineitem_filtered.L_EXTENDEDPRICE
* (1 - lineitem_filtered.L_DISCOUNT)
* (1 + lineitem_filtered.L_TAX)
)
gb = lineitem_filtered.groupby(["L_RETURNFLAG", "L_LINESTATUS"], as_index=False)[
[
"L_QUANTITY",
"L_EXTENDEDPRICE",
"DISC_PRICE",
"CHARGE",
"AVG_QTY",
"AVG_PRICE",
"L_DISCOUNT",
"L_ORDERKEY",
]
]
total = gb.agg(
{
"L_QUANTITY": "sum",
"L_EXTENDEDPRICE": "sum",
"DISC_PRICE": "sum",
"CHARGE": "sum",
"AVG_QTY": "mean",
"AVG_PRICE": "mean",
"L_DISCOUNT": "mean",
"L_ORDERKEY": "count",
}
)
# skip sort, Mars groupby enables sort
# total = total.sort_values(["L_RETURNFLAG", "L_LINESTATUS"])
print(total)
@timethis
@collect_datasets
def q02(part, partsupp, supplier, nation, region):
nation_filtered = nation.loc[:, ["N_NATIONKEY", "N_NAME", "N_REGIONKEY"]]
region_filtered = region[(region["R_NAME"] == "EUROPE")]
region_filtered = region_filtered.loc[:, ["R_REGIONKEY"]]
r_n_merged = nation_filtered.merge(
region_filtered, left_on="N_REGIONKEY", right_on="R_REGIONKEY", how="inner"
)
r_n_merged = r_n_merged.loc[:, ["N_NATIONKEY", "N_NAME"]]
supplier_filtered = supplier.loc[
:,
[
"S_SUPPKEY",
"S_NAME",
"S_ADDRESS",
"S_NATIONKEY",
"S_PHONE",
"S_ACCTBAL",
"S_COMMENT",
],
]
s_r_n_merged = r_n_merged.merge(
supplier_filtered, left_on="N_NATIONKEY", right_on="S_NATIONKEY", how="inner"
)
s_r_n_merged = s_r_n_merged.loc[
:,
[
"N_NAME",
"S_SUPPKEY",
"S_NAME",
"S_ADDRESS",
"S_PHONE",
"S_ACCTBAL",
"S_COMMENT",
],
]
partsupp_filtered = partsupp.loc[:, ["PS_PARTKEY", "PS_SUPPKEY", "PS_SUPPLYCOST"]]
ps_s_r_n_merged = s_r_n_merged.merge(
partsupp_filtered, left_on="S_SUPPKEY", right_on="PS_SUPPKEY", how="inner"
)
ps_s_r_n_merged = ps_s_r_n_merged.loc[
:,
[
"N_NAME",
"S_NAME",
"S_ADDRESS",
"S_PHONE",
"S_ACCTBAL",
"S_COMMENT",
"PS_PARTKEY",
"PS_SUPPLYCOST",
],
]
part_filtered = part.loc[:, ["P_PARTKEY", "P_MFGR", "P_SIZE", "P_TYPE"]]
part_filtered = part_filtered[
(part_filtered["P_SIZE"] == 15)
& (part_filtered["P_TYPE"].str.endswith("BRASS"))
]
part_filtered = part_filtered.loc[:, ["P_PARTKEY", "P_MFGR"]]
merged_df = part_filtered.merge(
ps_s_r_n_merged, left_on="P_PARTKEY", right_on="PS_PARTKEY", how="inner"
)
merged_df = merged_df.loc[
:,
[
"N_NAME",
"S_NAME",
"S_ADDRESS",
"S_PHONE",
"S_ACCTBAL",
"S_COMMENT",
"PS_SUPPLYCOST",
"P_PARTKEY",
"P_MFGR",
],
]
min_values = merged_df.groupby("P_PARTKEY", as_index=False, sort=False)[
"PS_SUPPLYCOST"
].min()
min_values.columns = ["P_PARTKEY_CPY", "MIN_SUPPLYCOST"]
merged_df = merged_df.merge(
min_values,
left_on=["P_PARTKEY", "PS_SUPPLYCOST"],
right_on=["P_PARTKEY_CPY", "MIN_SUPPLYCOST"],
how="inner",
)
total = merged_df.loc[
:,
[
"S_ACCTBAL",
"S_NAME",
"N_NAME",
"P_PARTKEY",
"P_MFGR",
"S_ADDRESS",
"S_PHONE",
"S_COMMENT",
],
]
total = total.sort_values(
by=["S_ACCTBAL", "N_NAME", "S_NAME", "P_PARTKEY"],
ascending=[False, True, True, True],
)
print(total)
@timethis
@collect_datasets
def q03(lineitem, orders, customer):
date = pd.Timestamp("1995-03-04")
lineitem_filtered = lineitem.loc[
:, ["L_ORDERKEY", "L_EXTENDEDPRICE", "L_DISCOUNT", "L_SHIPDATE"]
]
orders_filtered = orders.loc[
:, ["O_ORDERKEY", "O_CUSTKEY", "O_ORDERDATE", "O_SHIPPRIORITY"]
]
customer_filtered = customer.loc[:, ["C_MKTSEGMENT", "C_CUSTKEY"]]
lsel = lineitem_filtered.L_SHIPDATE > date
osel = orders_filtered.O_ORDERDATE < date
csel = customer_filtered.C_MKTSEGMENT == "HOUSEHOLD"
flineitem = lineitem_filtered[lsel]
forders = orders_filtered[osel]
fcustomer = customer_filtered[csel]
jn1 = fcustomer.merge(forders, left_on="C_CUSTKEY", right_on="O_CUSTKEY")
jn2 = jn1.merge(flineitem, left_on="O_ORDERKEY", right_on="L_ORDERKEY")
jn2["TMP"] = jn2.L_EXTENDEDPRICE * (1 - jn2.L_DISCOUNT)
total = (
jn2.groupby(
["L_ORDERKEY", "O_ORDERDATE", "O_SHIPPRIORITY"], as_index=False, sort=False
)["TMP"]
.sum()
.sort_values(["TMP"], ascending=False)
)
res = total.loc[:, ["L_ORDERKEY", "TMP", "O_ORDERDATE", "O_SHIPPRIORITY"]]
print(res.head(10))
@timethis
@collect_datasets
def q04(lineitem, orders):
date1 = pd.Timestamp("1993-11-01")
date2 = pd.Timestamp("1993-08-01")
lsel = lineitem.L_COMMITDATE < lineitem.L_RECEIPTDATE
osel = (orders.O_ORDERDATE < date1) & (orders.O_ORDERDATE >= date2)
flineitem = lineitem[lsel]
forders = orders[osel]
jn = forders[forders["O_ORDERKEY"].isin(flineitem["L_ORDERKEY"])]
total = (
jn.groupby("O_ORDERPRIORITY", as_index=False)["O_ORDERKEY"].count()
# skip sort when Mars enables sort in groupby
# .sort_values(["O_ORDERPRIORITY"])
)
print(total)
@timethis
@collect_datasets
def q05(lineitem, orders, customer, nation, region, supplier):
date1 = pd.Timestamp("1996-01-01")
date2 = pd.Timestamp("1997-01-01")
rsel = region.R_NAME == "ASIA"
osel = (orders.O_ORDERDATE >= date1) & (orders.O_ORDERDATE < date2)
forders = orders[osel]
fregion = region[rsel]
jn1 = fregion.merge(nation, left_on="R_REGIONKEY", right_on="N_REGIONKEY")
jn2 = jn1.merge(customer, left_on="N_NATIONKEY", right_on="C_NATIONKEY")
jn3 = jn2.merge(forders, left_on="C_CUSTKEY", right_on="O_CUSTKEY")
jn4 = jn3.merge(lineitem, left_on="O_ORDERKEY", right_on="L_ORDERKEY")
jn5 = supplier.merge(
jn4, left_on=["S_SUPPKEY", "S_NATIONKEY"], right_on=["L_SUPPKEY", "N_NATIONKEY"]
)
jn5["TMP"] = jn5.L_EXTENDEDPRICE * (1.0 - jn5.L_DISCOUNT)
gb = jn5.groupby("N_NAME", as_index=False, sort=False)["TMP"].sum()
total = gb.sort_values("TMP", ascending=False)
print(total)
@timethis
@collect_datasets
def q06(lineitem):
date1 = pd.Timestamp("1996-01-01")
date2 = pd.Timestamp("1997-01-01")
lineitem_filtered = lineitem.loc[
:, ["L_QUANTITY", "L_EXTENDEDPRICE", "L_DISCOUNT", "L_SHIPDATE"]
]
sel = (
(lineitem_filtered.L_SHIPDATE >= date1)
& (lineitem_filtered.L_SHIPDATE < date2)
& (lineitem_filtered.L_DISCOUNT >= 0.08)
& (lineitem_filtered.L_DISCOUNT <= 0.1)
& (lineitem_filtered.L_QUANTITY < 24)
)
flineitem = lineitem_filtered[sel]
total = (flineitem.L_EXTENDEDPRICE * flineitem.L_DISCOUNT).sum()
print(total)
@timethis
@collect_datasets
def q07(lineitem, supplier, orders, customer, nation):
"""This version is faster than q07_old. Keeping the old one for reference"""
lineitem_filtered = lineitem[
(lineitem["L_SHIPDATE"] >= pd.Timestamp("1995-01-01"))
& (lineitem["L_SHIPDATE"] < pd.Timestamp("1997-01-01"))
]
lineitem_filtered["L_YEAR"] = lineitem_filtered["L_SHIPDATE"].dt.year
lineitem_filtered["VOLUME"] = lineitem_filtered["L_EXTENDEDPRICE"] * (
1.0 - lineitem_filtered["L_DISCOUNT"]
)
lineitem_filtered = lineitem_filtered.loc[
:, ["L_ORDERKEY", "L_SUPPKEY", "L_YEAR", "VOLUME"]
]
supplier_filtered = supplier.loc[:, ["S_SUPPKEY", "S_NATIONKEY"]]
orders_filtered = orders.loc[:, ["O_ORDERKEY", "O_CUSTKEY"]]
customer_filtered = customer.loc[:, ["C_CUSTKEY", "C_NATIONKEY"]]
n1 = nation[(nation["N_NAME"] == "FRANCE")].loc[:, ["N_NATIONKEY", "N_NAME"]]
n2 = nation[(nation["N_NAME"] == "GERMANY")].loc[:, ["N_NATIONKEY", "N_NAME"]]
# ----- do nation 1 -----
N1_C = customer_filtered.merge(
n1, left_on="C_NATIONKEY", right_on="N_NATIONKEY", how="inner"
)
N1_C = N1_C.drop(columns=["C_NATIONKEY", "N_NATIONKEY"]).rename(
columns={"N_NAME": "CUST_NATION"}
)
N1_C_O = N1_C.merge(
orders_filtered, left_on="C_CUSTKEY", right_on="O_CUSTKEY", how="inner"
)
N1_C_O = N1_C_O.drop(columns=["C_CUSTKEY", "O_CUSTKEY"])
N2_S = supplier_filtered.merge(
n2, left_on="S_NATIONKEY", right_on="N_NATIONKEY", how="inner"
)
N2_S = N2_S.drop(columns=["S_NATIONKEY", "N_NATIONKEY"]).rename(
columns={"N_NAME": "SUPP_NATION"}
)
N2_S_L = N2_S.merge(
lineitem_filtered, left_on="S_SUPPKEY", right_on="L_SUPPKEY", how="inner"
)
N2_S_L = N2_S_L.drop(columns=["S_SUPPKEY", "L_SUPPKEY"])
total1 = N1_C_O.merge(
N2_S_L, left_on="O_ORDERKEY", right_on="L_ORDERKEY", how="inner"
)
total1 = total1.drop(columns=["O_ORDERKEY", "L_ORDERKEY"])
# ----- do nation 2 -----
N2_C = customer_filtered.merge(
n2, left_on="C_NATIONKEY", right_on="N_NATIONKEY", how="inner"
)
N2_C = N2_C.drop(columns=["C_NATIONKEY", "N_NATIONKEY"]).rename(
columns={"N_NAME": "CUST_NATION"}
)
N2_C_O = N2_C.merge(
orders_filtered, left_on="C_CUSTKEY", right_on="O_CUSTKEY", how="inner"
)
N2_C_O = N2_C_O.drop(columns=["C_CUSTKEY", "O_CUSTKEY"])
N1_S = supplier_filtered.merge(
n1, left_on="S_NATIONKEY", right_on="N_NATIONKEY", how="inner"
)
N1_S = N1_S.drop(columns=["S_NATIONKEY", "N_NATIONKEY"]).rename(
columns={"N_NAME": "SUPP_NATION"}
)
N1_S_L = N1_S.merge(
lineitem_filtered, left_on="S_SUPPKEY", right_on="L_SUPPKEY", how="inner"
)
N1_S_L = N1_S_L.drop(columns=["S_SUPPKEY", "L_SUPPKEY"])
total2 = N2_C_O.merge(
N1_S_L, left_on="O_ORDERKEY", right_on="L_ORDERKEY", how="inner"
)
total2 = total2.drop(columns=["O_ORDERKEY", "L_ORDERKEY"])
# concat results
total = pd.concat([total1, total2])
total = total.groupby(["SUPP_NATION", "CUST_NATION", "L_YEAR"], as_index=False).agg(
REVENUE=pd.NamedAgg(column="VOLUME", aggfunc="sum")
)
# skip sort when Mars groupby does sort already
# total = total.sort_values(
# by=["SUPP_NATION", "CUST_NATION", "L_YEAR"], ascending=[True, True, True]
# )
print(total)
@timethis
@collect_datasets
def q08(part, lineitem, supplier, orders, customer, nation, region):
part_filtered = part[(part["P_TYPE"] == "ECONOMY ANODIZED STEEL")]
part_filtered = part_filtered.loc[:, ["P_PARTKEY"]]
lineitem_filtered = lineitem.loc[:, ["L_PARTKEY", "L_SUPPKEY", "L_ORDERKEY"]]
lineitem_filtered["VOLUME"] = lineitem["L_EXTENDEDPRICE"] * (
1.0 - lineitem["L_DISCOUNT"]
)
total = part_filtered.merge(
lineitem_filtered, left_on="P_PARTKEY", right_on="L_PARTKEY", how="inner"
)
total = total.loc[:, ["L_SUPPKEY", "L_ORDERKEY", "VOLUME"]]
supplier_filtered = supplier.loc[:, ["S_SUPPKEY", "S_NATIONKEY"]]
total = total.merge(
supplier_filtered, left_on="L_SUPPKEY", right_on="S_SUPPKEY", how="inner"
)
total = total.loc[:, ["L_ORDERKEY", "VOLUME", "S_NATIONKEY"]]
orders_filtered = orders[
(orders["O_ORDERDATE"] >= pd.Timestamp("1995-01-01"))
& (orders["O_ORDERDATE"] < pd.Timestamp("1997-01-01"))
]
orders_filtered["O_YEAR"] = orders_filtered["O_ORDERDATE"].dt.year
orders_filtered = orders_filtered.loc[:, ["O_ORDERKEY", "O_CUSTKEY", "O_YEAR"]]
total = total.merge(
orders_filtered, left_on="L_ORDERKEY", right_on="O_ORDERKEY", how="inner"
)
total = total.loc[:, ["VOLUME", "S_NATIONKEY", "O_CUSTKEY", "O_YEAR"]]
customer_filtered = customer.loc[:, ["C_CUSTKEY", "C_NATIONKEY"]]
total = total.merge(
customer_filtered, left_on="O_CUSTKEY", right_on="C_CUSTKEY", how="inner"
)
total = total.loc[:, ["VOLUME", "S_NATIONKEY", "O_YEAR", "C_NATIONKEY"]]
n1_filtered = nation.loc[:, ["N_NATIONKEY", "N_REGIONKEY"]]
n2_filtered = nation.loc[:, ["N_NATIONKEY", "N_NAME"]].rename(
columns={"N_NAME": "NATION"}
)
total = total.merge(
n1_filtered, left_on="C_NATIONKEY", right_on="N_NATIONKEY", how="inner"
)
total = total.loc[:, ["VOLUME", "S_NATIONKEY", "O_YEAR", "N_REGIONKEY"]]
total = total.merge(
n2_filtered, left_on="S_NATIONKEY", right_on="N_NATIONKEY", how="inner"
)
total = total.loc[:, ["VOLUME", "O_YEAR", "N_REGIONKEY", "NATION"]]
region_filtered = region[(region["R_NAME"] == "AMERICA")]
region_filtered = region_filtered.loc[:, ["R_REGIONKEY"]]
total = total.merge(
region_filtered, left_on="N_REGIONKEY", right_on="R_REGIONKEY", how="inner"
)
total = total.loc[:, ["VOLUME", "O_YEAR", "NATION"]]
def udf(df):
demonimator = df["VOLUME"].sum()
df = df[df["NATION"] == "BRAZIL"]
numerator = df["VOLUME"].sum()
return numerator / demonimator
total = total.groupby("O_YEAR", as_index=False).apply(udf)
total.columns = ["O_YEAR", "MKT_SHARE"]
total = total.sort_values(by=["O_YEAR"], ascending=[True])
print(total)
@timethis
@collect_datasets
def q09(lineitem, orders, part, nation, partsupp, supplier):
psel = part.P_NAME.str.contains("ghost")
fpart = part[psel]
jn1 = lineitem.merge(fpart, left_on="L_PARTKEY", right_on="P_PARTKEY")
jn2 = jn1.merge(supplier, left_on="L_SUPPKEY", right_on="S_SUPPKEY")
jn3 = jn2.merge(nation, left_on="S_NATIONKEY", right_on="N_NATIONKEY")
jn4 = partsupp.merge(
jn3, left_on=["PS_PARTKEY", "PS_SUPPKEY"], right_on=["L_PARTKEY", "L_SUPPKEY"]
)
jn5 = jn4.merge(orders, left_on="L_ORDERKEY", right_on="O_ORDERKEY")
jn5["TMP"] = jn5.L_EXTENDEDPRICE * (1 - jn5.L_DISCOUNT) - (
(1 * jn5.PS_SUPPLYCOST) * jn5.L_QUANTITY
)
jn5["O_YEAR"] = jn5.O_ORDERDATE.dt.year
gb = jn5.groupby(["N_NAME", "O_YEAR"], as_index=False, sort=False)["TMP"].sum()
total = gb.sort_values(["N_NAME", "O_YEAR"], ascending=[True, False])
print(total)
@timethis
@collect_datasets
def q10(lineitem, orders, customer, nation):
date1 = pd.Timestamp("1994-11-01")
date2 = pd.Timestamp("1995-02-01")
osel = (orders.O_ORDERDATE >= date1) & (orders.O_ORDERDATE < date2)
lsel = lineitem.L_RETURNFLAG == "R"
forders = orders[osel]
flineitem = lineitem[lsel]
jn1 = flineitem.merge(forders, left_on="L_ORDERKEY", right_on="O_ORDERKEY")
jn2 = jn1.merge(customer, left_on="O_CUSTKEY", right_on="C_CUSTKEY")
jn3 = jn2.merge(nation, left_on="C_NATIONKEY", right_on="N_NATIONKEY")
jn3["TMP"] = jn3.L_EXTENDEDPRICE * (1.0 - jn3.L_DISCOUNT)
gb = jn3.groupby(
[
"C_CUSTKEY",
"C_NAME",
"C_ACCTBAL",
"C_PHONE",
"N_NAME",
"C_ADDRESS",
"C_COMMENT",
],
as_index=False,
sort=False,
)["TMP"].sum()
total = gb.sort_values("TMP", ascending=False)
print(total.head(20))
@timethis
@collect_datasets
def q11(partsupp, supplier, nation):
partsupp_filtered = partsupp.loc[:, ["PS_PARTKEY", "PS_SUPPKEY"]]
partsupp_filtered["TOTAL_COST"] = (
partsupp["PS_SUPPLYCOST"] * partsupp["PS_AVAILQTY"]
)
supplier_filtered = supplier.loc[:, ["S_SUPPKEY", "S_NATIONKEY"]]
ps_supp_merge = partsupp_filtered.merge(
supplier_filtered, left_on="PS_SUPPKEY", right_on="S_SUPPKEY", how="inner"
)
ps_supp_merge = ps_supp_merge.loc[:, ["PS_PARTKEY", "S_NATIONKEY", "TOTAL_COST"]]
nation_filtered = nation[(nation["N_NAME"] == "GERMANY")]
nation_filtered = nation_filtered.loc[:, ["N_NATIONKEY"]]
ps_supp_n_merge = ps_supp_merge.merge(
nation_filtered, left_on="S_NATIONKEY", right_on="N_NATIONKEY", how="inner"
)
ps_supp_n_merge = ps_supp_n_merge.loc[:, ["PS_PARTKEY", "TOTAL_COST"]]
sum_val = ps_supp_n_merge["TOTAL_COST"].sum() * 0.0001
total = ps_supp_n_merge.groupby(["PS_PARTKEY"], as_index=False, sort=False).agg(
VALUE=pd.NamedAgg(column="TOTAL_COST", aggfunc="sum")
)
total = total[total["VALUE"] > sum_val]
total = total.sort_values("VALUE", ascending=False)
print(total)
@timethis
@collect_datasets
def q12(lineitem, orders):
date1 = pd.Timestamp("1994-01-01")
date2 = pd.Timestamp("1995-01-01")
sel = (
(lineitem.L_RECEIPTDATE < date2)
& (lineitem.L_COMMITDATE < date2)
& (lineitem.L_SHIPDATE < date2)
& (lineitem.L_SHIPDATE < lineitem.L_COMMITDATE)
& (lineitem.L_COMMITDATE < lineitem.L_RECEIPTDATE)
& (lineitem.L_RECEIPTDATE >= date1)
& ((lineitem.L_SHIPMODE == "MAIL") | (lineitem.L_SHIPMODE == "SHIP"))
)
flineitem = lineitem[sel]
jn = flineitem.merge(orders, left_on="L_ORDERKEY", right_on="O_ORDERKEY")
def g1(x):
return ((x == "1-URGENT") | (x == "2-HIGH")).sum()
def g2(x):
return ((x != "1-URGENT") & (x != "2-HIGH")).sum()
total = jn.groupby("L_SHIPMODE", as_index=False)["O_ORDERPRIORITY"].agg((g1, g2))
total = total.reset_index() # reset index to keep consistency with pandas
# skip sort when groupby does sort already
# total = total.sort_values("L_SHIPMODE")
print(total)
@timethis
@collect_datasets
def q13(customer, orders):
customer_filtered = customer.loc[:, ["C_CUSTKEY"]]
orders_filtered = orders[
~orders["O_COMMENT"].str.contains(r"special[\S|\s]*requests")
]
orders_filtered = orders_filtered.loc[:, ["O_ORDERKEY", "O_CUSTKEY"]]
c_o_merged = customer_filtered.merge(
orders_filtered, left_on="C_CUSTKEY", right_on="O_CUSTKEY", how="left"
)
c_o_merged = c_o_merged.loc[:, ["C_CUSTKEY", "O_ORDERKEY"]]
count_df = c_o_merged.groupby(["C_CUSTKEY"], as_index=False, sort=False).agg(
C_COUNT=pd.NamedAgg(column="O_ORDERKEY", aggfunc="count")
)
total = count_df.groupby(["C_COUNT"], as_index=False, sort=False).size()
total.columns = ["C_COUNT", "CUSTDIST"]
total = total.sort_values(by=["CUSTDIST", "C_COUNT"], ascending=[False, False])
print(total)
@timethis
@collect_datasets
def q14(lineitem, part):
startDate = pd.Timestamp("1994-03-01")
endDate = pd.Timestamp("1994-04-01")
p_type_like = "PROMO"
part_filtered = part.loc[:, ["P_PARTKEY", "P_TYPE"]]
lineitem_filtered = lineitem.loc[
:, ["L_EXTENDEDPRICE", "L_DISCOUNT", "L_SHIPDATE", "L_PARTKEY"]
]
sel = (lineitem_filtered.L_SHIPDATE >= startDate) & (
lineitem_filtered.L_SHIPDATE < endDate
)
flineitem = lineitem_filtered[sel]
jn = flineitem.merge(part_filtered, left_on="L_PARTKEY", right_on="P_PARTKEY")
jn["TMP"] = jn.L_EXTENDEDPRICE * (1.0 - jn.L_DISCOUNT)
total = jn[jn.P_TYPE.str.startswith(p_type_like)].TMP.sum() * 100 / jn.TMP.sum()
print(total)
@timethis
@collect_datasets
def q15(lineitem, supplier):
lineitem_filtered = lineitem[
(lineitem["L_SHIPDATE"] >= pd.Timestamp("1996-01-01"))
& (
lineitem["L_SHIPDATE"]
< (pd.Timestamp("1996-01-01") + pd.DateOffset(months=3))
)
]
lineitem_filtered["REVENUE_PARTS"] = lineitem_filtered["L_EXTENDEDPRICE"] * (
1.0 - lineitem_filtered["L_DISCOUNT"]
)
lineitem_filtered = lineitem_filtered.loc[:, ["L_SUPPKEY", "REVENUE_PARTS"]]
revenue_table = (
lineitem_filtered.groupby("L_SUPPKEY", as_index=False, sort=False)
.agg(TOTAL_REVENUE=pd.NamedAgg(column="REVENUE_PARTS", aggfunc="sum"))
.rename(columns={"L_SUPPKEY": "SUPPLIER_NO"})
)
max_revenue = revenue_table["TOTAL_REVENUE"].max()
revenue_table = revenue_table[revenue_table["TOTAL_REVENUE"] == max_revenue]
supplier_filtered = supplier.loc[:, ["S_SUPPKEY", "S_NAME", "S_ADDRESS", "S_PHONE"]]
total = supplier_filtered.merge(
revenue_table, left_on="S_SUPPKEY", right_on="SUPPLIER_NO", how="inner"
)
total = total.loc[
:, ["S_SUPPKEY", "S_NAME", "S_ADDRESS", "S_PHONE", "TOTAL_REVENUE"]
]
print(total)
@timethis
@collect_datasets
def q16(part, partsupp, supplier):
part_filtered = part[
(part["P_BRAND"] != "Brand#45")
& (~part["P_TYPE"].str.contains("^MEDIUM POLISHED"))
& part["P_SIZE"].isin([49, 14, 23, 45, 19, 3, 36, 9])
]
part_filtered = part_filtered.loc[:, ["P_PARTKEY", "P_BRAND", "P_TYPE", "P_SIZE"]]
partsupp_filtered = partsupp.loc[:, ["PS_PARTKEY", "PS_SUPPKEY"]]
total = part_filtered.merge(
partsupp_filtered, left_on="P_PARTKEY", right_on="PS_PARTKEY", how="inner"
)
total = total.loc[:, ["P_BRAND", "P_TYPE", "P_SIZE", "PS_SUPPKEY"]]
supplier_filtered = supplier[
supplier["S_COMMENT"].str.contains(r"Customer(\S|\s)*Complaints")
]
supplier_filtered = supplier_filtered.loc[:, ["S_SUPPKEY"]].drop_duplicates()
# left merge to select only PS_SUPPKEY values not in supplier_filtered
total = total.merge(
supplier_filtered, left_on="PS_SUPPKEY", right_on="S_SUPPKEY", how="left"
)
total = total[total["S_SUPPKEY"].isna()]
total = total.loc[:, ["P_BRAND", "P_TYPE", "P_SIZE", "PS_SUPPKEY"]]
total = total.groupby(["P_BRAND", "P_TYPE", "P_SIZE"], as_index=False, sort=False)[
"PS_SUPPKEY"
].nunique()
total.columns = ["P_BRAND", "P_TYPE", "P_SIZE", "SUPPLIER_CNT"]
total = total.sort_values(
by=["SUPPLIER_CNT", "P_BRAND", "P_TYPE", "P_SIZE"],
ascending=[False, True, True, True],
)
print(total)
@timethis
@collect_datasets
def q17(lineitem, part):
left = lineitem.loc[:, ["L_PARTKEY", "L_QUANTITY", "L_EXTENDEDPRICE"]]
right = part[((part["P_BRAND"] == "Brand#23") & (part["P_CONTAINER"] == "MED BOX"))]
right = right.loc[:, ["P_PARTKEY"]]
line_part_merge = left.merge(
right, left_on="L_PARTKEY", right_on="P_PARTKEY", how="inner"
)
line_part_merge = line_part_merge.loc[
:, ["L_QUANTITY", "L_EXTENDEDPRICE", "P_PARTKEY"]
]
lineitem_filtered = lineitem.loc[:, ["L_PARTKEY", "L_QUANTITY"]]
lineitem_avg = lineitem_filtered.groupby(
["L_PARTKEY"], as_index=False, sort=False
).agg(avg=pd.NamedAgg(column="L_QUANTITY", aggfunc="mean"))
lineitem_avg["avg"] = 0.2 * lineitem_avg["avg"]
lineitem_avg = lineitem_avg.loc[:, ["L_PARTKEY", "avg"]]
total = line_part_merge.merge(
lineitem_avg, left_on="P_PARTKEY", right_on="L_PARTKEY", how="inner"
)
total = total[total["L_QUANTITY"] < total["avg"]]
total = pd.DataFrame({"avg_yearly": [total["L_EXTENDEDPRICE"].sum() / 7.0]})
print(total)
@timethis
@collect_datasets
def q18(lineitem, orders, customer):
gb1 = lineitem.groupby("L_ORDERKEY", as_index=False, sort=False)["L_QUANTITY"].sum()
fgb1 = gb1[gb1.L_QUANTITY > 300]
jn1 = fgb1.merge(orders, left_on="L_ORDERKEY", right_on="O_ORDERKEY")
jn2 = jn1.merge(customer, left_on="O_CUSTKEY", right_on="C_CUSTKEY")
gb2 = jn2.groupby(
["C_NAME", "C_CUSTKEY", "O_ORDERKEY", "O_ORDERDATE", "O_TOTALPRICE"],
as_index=False,
sort=False,
)["L_QUANTITY"].sum()
total = gb2.sort_values(["O_TOTALPRICE", "O_ORDERDATE"], ascending=[False, True])
print(total.head(100))
@timethis
@collect_datasets
def q19(lineitem, part):
Brand31 = "Brand#31"
Brand43 = "Brand#43"
SMBOX = "SM BOX"
SMCASE = "SM CASE"
SMPACK = "SM PACK"
SMPKG = "SM PKG"
MEDBAG = "MED BAG"
MEDBOX = "MED BOX"
MEDPACK = "MED PACK"
MEDPKG = "MED PKG"
LGBOX = "LG BOX"
LGCASE = "LG CASE"
LGPACK = "LG PACK"
LGPKG = "LG PKG"
DELIVERINPERSON = "DELIVER IN PERSON"
AIR = "AIR"
AIRREG = "AIRREG"
lsel = (
(
((lineitem.L_QUANTITY <= 36) & (lineitem.L_QUANTITY >= 26))
| ((lineitem.L_QUANTITY <= 25) & (lineitem.L_QUANTITY >= 15))
| ((lineitem.L_QUANTITY <= 14) & (lineitem.L_QUANTITY >= 4))
)
& (lineitem.L_SHIPINSTRUCT == DELIVERINPERSON)
& ((lineitem.L_SHIPMODE == AIR) | (lineitem.L_SHIPMODE == AIRREG))
)
psel = (part.P_SIZE >= 1) & (
(
(part.P_SIZE <= 5)
& (part.P_BRAND == Brand31)
& (
(part.P_CONTAINER == SMBOX)
| (part.P_CONTAINER == SMCASE)
| (part.P_CONTAINER == SMPACK)
| (part.P_CONTAINER == SMPKG)
)
)
| (
(part.P_SIZE <= 10)
& (part.P_BRAND == Brand43)
& (
(part.P_CONTAINER == MEDBAG)
| (part.P_CONTAINER == MEDBOX)
| (part.P_CONTAINER == MEDPACK)
| (part.P_CONTAINER == MEDPKG)
)
)
| (
(part.P_SIZE <= 15)
& (part.P_BRAND == Brand43)
& (
(part.P_CONTAINER == LGBOX)
| (part.P_CONTAINER == LGCASE)
| (part.P_CONTAINER == LGPACK)
| (part.P_CONTAINER == LGPKG)
)
)
)
flineitem = lineitem[lsel]
fpart = part[psel]
jn = flineitem.merge(fpart, left_on="L_PARTKEY", right_on="P_PARTKEY")
jnsel = (
(jn.P_BRAND == Brand31)
& (
(jn.P_CONTAINER == SMBOX)
| (jn.P_CONTAINER == SMCASE)
| (jn.P_CONTAINER == SMPACK)
| (jn.P_CONTAINER == SMPKG)
)
& (jn.L_QUANTITY >= 4)
& (jn.L_QUANTITY <= 14)
& (jn.P_SIZE <= 5)
| (jn.P_BRAND == Brand43)
& (
(jn.P_CONTAINER == MEDBAG)
| (jn.P_CONTAINER == MEDBOX)
| (jn.P_CONTAINER == MEDPACK)
| (jn.P_CONTAINER == MEDPKG)
)
& (jn.L_QUANTITY >= 15)
& (jn.L_QUANTITY <= 25)
& (jn.P_SIZE <= 10)
| (jn.P_BRAND == Brand43)
& (
(jn.P_CONTAINER == LGBOX)
| (jn.P_CONTAINER == LGCASE)
| (jn.P_CONTAINER == LGPACK)
| (jn.P_CONTAINER == LGPKG)
)
& (jn.L_QUANTITY >= 26)
& (jn.L_QUANTITY <= 36)
& (jn.P_SIZE <= 15)
)
jn = jn[jnsel]
total = (jn.L_EXTENDEDPRICE * (1.0 - jn.L_DISCOUNT)).sum()
print(total)
@timethis
@collect_datasets
def q20(lineitem, part, nation, partsupp, supplier):
date1 = pd.Timestamp("1996-01-01")
date2 = pd.Timestamp("1997-01-01")
psel = part.P_NAME.str.startswith("azure")
nsel = nation.N_NAME == "JORDAN"
lsel = (lineitem.L_SHIPDATE >= date1) & (lineitem.L_SHIPDATE < date2)
fpart = part[psel]
fnation = nation[nsel]
flineitem = lineitem[lsel]
jn1 = fpart.merge(partsupp, left_on="P_PARTKEY", right_on="PS_PARTKEY")
jn2 = jn1.merge(
flineitem,
left_on=["PS_PARTKEY", "PS_SUPPKEY"],
right_on=["L_PARTKEY", "L_SUPPKEY"],
)
gb = jn2.groupby(
["PS_PARTKEY", "PS_SUPPKEY", "PS_AVAILQTY"], as_index=False, sort=False
)["L_QUANTITY"].sum()
gbsel = gb.PS_AVAILQTY > (0.5 * gb.L_QUANTITY)
fgb = gb[gbsel]
jn3 = fgb.merge(supplier, left_on="PS_SUPPKEY", right_on="S_SUPPKEY")
jn4 = fnation.merge(jn3, left_on="N_NATIONKEY", right_on="S_NATIONKEY")
jn4 = jn4.loc[:, ["S_NAME", "S_ADDRESS"]]
total = jn4.sort_values("S_NAME").drop_duplicates()
print(total)
@timethis
@collect_datasets
def q21(lineitem, orders, supplier, nation):
lineitem_filtered = lineitem.loc[
:, ["L_ORDERKEY", "L_SUPPKEY", "L_RECEIPTDATE", "L_COMMITDATE"]
]
# Keep all rows that have another row in linetiem with the same orderkey and different suppkey
lineitem_orderkeys = (
lineitem_filtered.loc[:, ["L_ORDERKEY", "L_SUPPKEY"]]
.groupby("L_ORDERKEY", as_index=False, sort=False)["L_SUPPKEY"]
.nunique()
)
lineitem_orderkeys.columns = ["L_ORDERKEY", "nunique_col"]
lineitem_orderkeys = lineitem_orderkeys[lineitem_orderkeys["nunique_col"] > 1]
lineitem_orderkeys = lineitem_orderkeys.loc[:, ["L_ORDERKEY"]]
# Keep all rows that have l_receiptdate > l_commitdate
lineitem_filtered = lineitem_filtered[
lineitem_filtered["L_RECEIPTDATE"] > lineitem_filtered["L_COMMITDATE"]
]
lineitem_filtered = lineitem_filtered.loc[:, ["L_ORDERKEY", "L_SUPPKEY"]]
# Merge Filter + Exists
lineitem_filtered = lineitem_filtered.merge(
lineitem_orderkeys, on="L_ORDERKEY", how="inner"
)
# Not Exists: Check the exists condition isn't still satisfied on the output.
lineitem_orderkeys = lineitem_filtered.groupby(
"L_ORDERKEY", as_index=False, sort=False
)["L_SUPPKEY"].nunique()
lineitem_orderkeys.columns = ["L_ORDERKEY", "nunique_col"]
lineitem_orderkeys = lineitem_orderkeys[lineitem_orderkeys["nunique_col"] == 1]
lineitem_orderkeys = lineitem_orderkeys.loc[:, ["L_ORDERKEY"]]
# Merge Filter + Not Exists
lineitem_filtered = lineitem_filtered.merge(
lineitem_orderkeys, on="L_ORDERKEY", how="inner"
)
orders_filtered = orders.loc[:, ["O_ORDERSTATUS", "O_ORDERKEY"]]
orders_filtered = orders_filtered[orders_filtered["O_ORDERSTATUS"] == "F"]
orders_filtered = orders_filtered.loc[:, ["O_ORDERKEY"]]
total = lineitem_filtered.merge(
orders_filtered, left_on="L_ORDERKEY", right_on="O_ORDERKEY", how="inner"
)
total = total.loc[:, ["L_SUPPKEY"]]
supplier_filtered = supplier.loc[:, ["S_SUPPKEY", "S_NATIONKEY", "S_NAME"]]
total = total.merge(
supplier_filtered, left_on="L_SUPPKEY", right_on="S_SUPPKEY", how="inner"
)
total = total.loc[:, ["S_NATIONKEY", "S_NAME"]]
nation_filtered = nation.loc[:, ["N_NAME", "N_NATIONKEY"]]
nation_filtered = nation_filtered[nation_filtered["N_NAME"] == "SAUDI ARABIA"]
total = total.merge(
nation_filtered, left_on="S_NATIONKEY", right_on="N_NATIONKEY", how="inner"
)
total = total.loc[:, ["S_NAME"]]
total = total.groupby("S_NAME", as_index=False, sort=False).size()
total.columns = ["S_NAME", "NUMWAIT"]
total = total.sort_values(by=["NUMWAIT", "S_NAME"], ascending=[False, True])
print(total)
@timethis
@collect_datasets
def q22(customer, orders):
customer_filtered = customer.loc[:, ["C_ACCTBAL", "C_CUSTKEY"]]
customer_filtered["CNTRYCODE"] = customer["C_PHONE"].str.slice(0, 2)
customer_filtered = customer_filtered[
(customer["C_ACCTBAL"] > 0.00)
& customer_filtered["CNTRYCODE"].isin(
["13", "31", "23", "29", "30", "18", "17"]
)
]
avg_value = customer_filtered["C_ACCTBAL"].mean()
customer_filtered = customer_filtered[customer_filtered["C_ACCTBAL"] > avg_value]
# Select only the keys that don't match by performing a left join and only selecting columns with an na value
orders_filtered = orders.loc[:, ["O_CUSTKEY"]].drop_duplicates()
customer_keys = customer_filtered.loc[:, ["C_CUSTKEY"]].drop_duplicates()
customer_selected = customer_keys.merge(
orders_filtered, left_on="C_CUSTKEY", right_on="O_CUSTKEY", how="left"
)
customer_selected = customer_selected[customer_selected["O_CUSTKEY"].isna()]
customer_selected = customer_selected.loc[:, ["C_CUSTKEY"]]
customer_selected = customer_selected.merge(
customer_filtered, on="C_CUSTKEY", how="inner"
)
customer_selected = customer_selected.loc[:, ["CNTRYCODE", "C_ACCTBAL"]]
agg1 = customer_selected.groupby(["CNTRYCODE"], as_index=False, sort=False).size()
agg1.columns = ["CNTRYCODE", "NUMCUST"]
agg2 = customer_selected.groupby(["CNTRYCODE"], as_index=False, sort=False).agg(
TOTACCTBAL=pd.NamedAgg(column="C_ACCTBAL", aggfunc="sum")
)
total = agg1.merge(agg2, on="CNTRYCODE", how="inner")
total = total.sort_values(by=["CNTRYCODE"], ascending=[True])
print(total)
def run_queries(
root: str,
storage_options: Dict[str, str],
queries: List[int],
):
total_start = time.time()
print("Start data loading")
queries_to_args = dict()
datasets_to_load = set()
for query in queries:
args = []
for dataset in _query_to_datasets[query]:
args.append(
globals()[f"load_{dataset}"](root, **storage_options)
)
queries_to_args[query] = args
print(f"Data loading time (s): {time.time() - total_start}")
total_start = time.time()
for query in queries:
globals()[f"q{query:02}"](*queries_to_args[query])
print(f"Total query execution time (s): {time.time() - total_start}")
def main():
parser = argparse.ArgumentParser(description="tpch-queries")
parser.add_argument(
"--data_set",
type=str,
required=True,
help="Path to the TPC-H dataset.",
)
parser.add_argument(
"--storage_options",
type=str,
required=False,
help="Path to the storage options json file.",
)
parser.add_argument(
"--queries",
type=int,
nargs="+",
required=False,
help="Comma separated TPC-H queries to run.",
)
parser.add_argument(
"--pyarrow-dtype",
default=False,
action="store_true",
help="Use arrow dtype.",
)
parser.add_argument(
"--lazy-copy",
default=False,
action="store_true",
help="Use arrow dtype.",
)
args = parser.parse_args()
data_set = args.data_set
if args.pyarrow_dtype:
print("Enable pyarrow dtype")
pd.set_option("mode.dtype_backend", "pyarrow")
if args.lazy_copy:
print("Enable lazy copy")
pd.set_option("mode.copy_on_write", True)
# credentials to access the datasource.
storage_options = {}
if args.storage_options is not None:
with open(args.storage_options, "r") as fp:
storage_options = json.load(fp)
print(f"Storage options: {storage_options}")
queries = list(range(1, 23))
if args.queries is not None:
queries = args.queries
print(f"Queries to run: {queries}")
run_queries(
data_set,
storage_options=storage_options,
queries=queries,
)
if __name__ == "__main__":
print(f"Running TPC-H against pandas v{pd.__version__}")
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment