Skip to content

Instantly share code, notes, and snippets.

@auxten
Last active May 29, 2023 02:53
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save auxten/5dd72eeaf5f76728d17531deb59a6dd2 to your computer and use it in GitHub Desktop.
Save auxten/5dd72eeaf5f76728d17531deb59a6dd2 to your computer and use it in GitHub Desktop.
Bench different impl of running chdb directly on dataframe
import os
import time
import pandas as pd
import pyarrow as pa
import chdb
import subprocess
# file size 117MB
data_path = '/home/Clickhouse/bench/hits_0.parquet'
# Run query directly on parquet file
t = time.time()
res = chdb.query(f"""SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) \
FROM file('{data_path}', Parquet) GROUP BY RegionID ORDER BY c DESC LIMIT 10""", output_format="Dataframe")
# print(res)
print(f"Run query on parquet file in {time.time() - t} seconds")
# read dataframe from parquet file
t = time.time()
df = pd.read_parquet(data_path, engine='pyarrow', dtype_backend='pyarrow')
print(f"Read parquet file in {time.time() - t} seconds")
t = time.time()
data = pa.BufferOutputStream()
table = pa.Table.from_pandas(df)
print(f"Convert Dataframe to Arrow stream in {time.time() - t} seconds")
t = time.time()
with pa.RecordBatchStreamWriter(data, table.schema) as writer:
writer.write_table(table)
data = data.getvalue()
print(f"Write Arrow stream to buffer in {time.time() - t} seconds")
query = '''
CREATE TABLE table ENGINE = File(ArrowStream, 0);
SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID)
FROM table GROUP BY RegionID ORDER BY c DESC LIMIT 10;
'''
# Spawn the subprocess and pass custom input from the parent process
def spawn_chdb(query, stdin, format):
proc = subprocess.Popen(['python3', '-m', 'chdb', query, format], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
custom_input = data
stdout = proc.communicate(input=stdin)[0].decode('utf-8')
return stdout
t = time.time()
# Call the function to spawn the subprocess with custom input
res = spawn_chdb(query, data, "Dataframe")
print(f"Run query in {time.time() - t} seconds")
# print(res)
# create memfd
fd = os.memfd_create("./tmp_mem_fd", flags=os.MFD_CLOEXEC)
# fd = open("./hits_0.arrow", "wb")
# time.sleep(1000)
# create writer from file descriptor
t = time.time()
# with pa.RecordBatchFileWriter(fd, table.schema) as writer:
# writer.write_table(table)
ffd = os.fdopen(fd, "wb")
with pa.RecordBatchFileWriter(ffd, table.schema) as writer:
writer.write_table(table)
ffd.flush()
#trancate the last byte
# print(fd.tell())
# fd.truncate(fd.tell() - 1)
# fd.close()
print(f"Write Arrow file to memfd in {time.time() - t} seconds")
os.lseek(fd, 0, os.SEEK_SET)
#get file size from fd
# print(os.fstat(fd))
# print(os.lseek(fd, 0, os.SEEK_END))
# copy file content to another file
# with os.fdopen(fd, "rb") as f:
# f.seek(0)
# with open("./hits_0_memfd.arrow", "wb") as f2:
# while True:
# buf = f.read(1024)
# if buf:
# f2.write(buf)
# else:
# break
# f.seek(0)
t = time.time()
# Query with Arrow file descriptor.
os.lseek(fd, 0, os.SEEK_SET)
# query = f'''CREATE TABLE table ENGINE = File(Arrow, {fd});
# SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID)
# FROM table GROUP BY RegionID ORDER BY c DESC LIMIT 10;'''
query=f'''SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID)
FROM file('/dev/fd/{fd}', Arrow) GROUP BY RegionID ORDER BY c DESC LIMIT 10'''
# query=f'''SELECT *
# FROM file('hits_0.arrow', Arrow) LIMIT 10'''
ret = chdb.query(query, output_format="Dataframe")
# os.close(fd)
# fd.close()
print(f"Run query in {time.time() - t} seconds")
print(ret)
# Output
# Run query on parquet file in 0.11854410171508789 seconds
# Read parquet file in 0.503279447555542 seconds
# Convert Dataframe to Arrow stream in 0.01825261116027832 seconds
# Write Arrow stream to buffer in 1.214848279953003 seconds
# Run query in 2.638005495071411 seconds
# Write Arrow file to memfd in 0.5541248321533203 seconds
# Run query in 0.4575684070587158 seconds
# RegionID sum(AdvEngineID) c avg(ResolutionWidth) uniqExact(UserID)
# 0 229 38044 426435 1612.787187 27961
# 1 2 12801 148193 1593.870891 10413
# 2 208 2673 30614 1490.615111 3073
# 3 1 1802 28577 1623.851699 1720
# 4 34 508 14329 1592.897201 1428
# 5 47 1041 13661 1637.851914 943
# 6 158 78 13294 1576.340605 1110
# 7 7 1166 11679 1627.319034 647
# 8 42 642 11547 1625.601022 956
# 9 184 30 10157 1614.693807 987
@auxten
Copy link
Author

auxten commented May 26, 2023

Timer output:

Run query on parquet file in 0.10411977767944336 seconds
Read parquet file in 1.4163477420806885 seconds
Convert Dataframe to Arrow stream in 1.2699682712554932 seconds
Write Arrow stream to buffer in 1.0464630126953125 seconds
Run query in 2.665607452392578 seconds
Write Arrow stream to memfd in 0.5679309368133545 seconds
Run query in 1.9212286472320557 seconds

@auxten
Copy link
Author

auxten commented May 28, 2023

Use arrow not arrow stream, and FROM file('/dev/fd/{fd}', Arrow) got:

Write Arrow file to memfd in 0.5541248321533203 seconds
Run query in 0.4575684070587158 seconds

CREATE TABLE table ENGINE = File(Arrow, {fd}); runs much slower than SELECT FROM file('/dev/fd/{fd}', Arrow)

Run query in 3.3507769107818604 seconds

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment