Skip to content

Instantly share code, notes, and snippets.

@cpcloud
Last active May 30, 2023 19:07
Show Gist options
  • Save cpcloud/af29ff80118c2377668d271c2b397867 to your computer and use it in GitHub Desktop.
Save cpcloud/af29ff80118c2377668d271c2b397867 to your computer and use it in GitHub Desktop.
Generate ClickHouse data and run a simple benchmark
def gen_data(*, n_ids: int, n_minutes: int):
import numpy as np
import tqdm
start_date = np.datetime64("2013-01-01")
end_date = np.datetime64("2023-05-20")
start_time = np.timedelta64(9, "h")
dates = np.arange(start_date, end_date)
dates = dates[np.is_busday(dates)]
n = n_minutes * n_ids
times = start_time + np.repeat(np.arange(n_minutes), n_ids).view("timedelta64[m]")
offsets = np.random.randint(0, 1_000_000, size=n).view("timedelta64[us]")
for date in tqdm.tqdm(dates, unit_scale=n):
yield date + times + offsets
np.random.shuffle(offsets)
def query(q):
import ibis
from ibis import _
w = ibis.trailing_window(group_by=["id", "time"], order_by="date", preceding=30)
return q.filter(_.date >= "2023-05-01").mutate(avg=_.feature.mean().over(w))
def create_table(con, *, engine: str, table_name: str, n_minutes: int, n_ids: int):
import pyarrow as pa
import ibis
options_mapping = {
"rmergetree": "ReplacingMergeTree",
"mergetree": "MergeTree",
"parquet": "File(Parquet)",
"native": "File(Native)",
"avro": "File(Avro)",
}
engine = options_mapping[engine]
schema = ibis.schema({"timestamp": "!timestamp(6)"}).to_pyarrow()
con.raw_sql("SET engine_file_allow_create_multiple_files = true")
con.drop_table(table_name, force=True)
con.raw_sql(
f"""
CREATE TABLE {table_name} (
timestamp DateTime64(6) EPHEMERAL,
date Nullable(Date) DEFAULT IF(randUniform(0.0, 1.0) < 0.10, NULL, CAST(timestamp AS Nullable(Date))),
time Nullable(Time) DEFAULT IF(randUniform(0.0, 1.0) < 0.20, NULL, CAST(timestamp AS Nullable(Time))),
id Nullable(Int64) DEFAULT IF(randUniform(0.0, 1.0) < 0.30, NULL, CAST(round(randUniform(1.0, 3000.0)) AS Nullable(Int64))),
feature Nullable(Float64) DEFAULT IF(randUniform(0.0, 1.0) < 0.40, NULL, randUniform(0.0, 1.0))
) ENGINE = {engine}
"""
+ (
" PRIMARY KEY (date, time, id) ORDER BY (date, time, id) SETTINGS allow_nullable_key = 1"
if "mergetree" in engine.lower()
else ""
)
)
c = con.con
for timestamps in gen_data(n_ids=n_ids, n_minutes=n_minutes):
c.insert_arrow(
table_name, pa.RecordBatch.from_arrays([timestamps], schema=schema)
)
def bench(con, *, table_name: str, use_ideal: bool):
import time
print(f"table: {table_name}")
t = con.tables[table_name]
q = query(t)
if use_ideal:
sql = f"""\
SELECT
*,
AVG(t0.feature) OVER (PARTITION BY t0.id, t0.time ORDER BY t0.date ROWS BETWEEN 30 PRECEDING AND 0 FOLLOWING) AS avg
FROM default.{table_name} AS t0
WHERE
t0.date >= '2023-05-01'"""
else:
sql = q.compile()
start = time.time()
q.drop("timestamp").execute()
stop = time.time()
delta_ibis = stop - start
print(f"Ibis: {delta_ibis:.2f}s")
start = time.time()
con.con.query_df(sql)
stop = time.time()
delta_sql = stop - start
print(f"SQL: {delta_sql:.2f}s")
print(f"Ibis v. SQL: {delta_ibis - delta_sql:.2f}s")
def main(args):
"""Run with python ~/genslowdata.py."""
import ibis
con = ibis.clickhouse.connect()
table_name = args.table_name
if args.create:
create_table(
con,
engine=args.engine,
table_name=table_name,
n_ids=args.n_ids,
n_minutes=args.n_minutes,
)
bench(con, table_name=table_name, use_ideal=args.use_ideal)
if __name__ == "__main__":
import argparse
p = argparse.ArgumentParser(
description="Test clickhouse client performance versus ibis"
)
p.add_argument(
"-c", "--create", action="store_true", help="Whether to create test data"
)
p.add_argument(
"-e",
"--engine",
default="rmergetree",
choices=["rmergetree", "mergetree", "native", "parquet", "avro"],
help="Table storage engine",
)
p.add_argument("-t", "--table-name", default="test", help="Table name")
p.add_argument(
"-i", "--n-ids", default=3000, type=int, help="Number of possible unique IDs"
)
p.add_argument(
"-m", "--n-minutes", default=400, type=int, help="Number of minutes per hour"
)
p.add_argument(
"-I", "--use-ideal", action="store_true", help="Use hand-optimized SQL"
)
main(p.parse_args())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment