Skip to content

Instantly share code, notes, and snippets.

@kzzzr
Last active April 9, 2024 11:59
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save kzzzr/837645284f8dfe879193997fa9632a65 to your computer and use it in GitHub Desktop.
Save kzzzr/837645284f8dfe879193997fa9632a65 to your computer and use it in GitHub Desktop.
File formats comparison: CSV, JSON, Parquet, ORC

File formats comparison: CSV, JSON, Parquet, ORC

Key results

Whenever you need to store your data on S3 / Data Lake / External table choose file format wisely:

  • Parquet / ORC are the best options due to efficient data layout, compression, indexing capabilities
  • Columnar formats allow for column projection and partition pruning (reading only relevant data!)
  • Binary formats enable schema evolution which is very applicable for constantly changing business environment

Inputs

  • I used Clickhouse + S3 table engine to compare different file formats
  • Single node Clickhouse database was used – s2.small preset: 4 vCPU, 100% vCPU rate, 16 GB RAM
  • Source data: TPCH synthetic dataset for 1 year – 18.2M rows, 2GB raw CSV size
  • A single query is run at a time to ensure 100% dedicated resources

To perform it yourself you might need Yandex.Cloud account, set up Clickhouse database, generate S3 keys. Source data is available via public S3 link: https://storage.yandexcloud.net/otus-dwh/dbgen/lineorder.tbl.

Comparison measures

1. Time to serialize / deserialize

What time does it take to write data on disk in a particular format?

Let's have a look at Execution Log:

  • Compressed columnar formats ORC, Parquet take leadership here
  • It takes x6 times longer to write JSON data on disk compared with columnar formats on average (120 sec. vs 20 sec.)
  • The less data you write on disk the less time it takes - no surprise

2. Storage size

What amount of disk space is used to store data?

aws --endpoint-url=https://storage.yandexcloud.net \
	--profile yc \
	--human-readable \
	--summarize \
	--recursive \
	s3 ls s3://otus-dwh/file_formats/

  • Obviously best results show zstd compressed ORC and Parquet formats
  • Worst result is uncompressed JSON which is almost 3 times larger than source CSV data (you have to copy schema for every row!)
  • Great results for zstd compressed CSV data which is 632MB vs 2GB of uncompressed data

3. Query latency (response time)

How fast can one get query results for a simple analytical query?

  1. OLAP query including whole dataset (12 / 12 months)
SELECT
	date_trunc('month', LO_ORDERDATE) AS mnth
	, count(DISTINCT LO_ORDERKEY) AS num_orders
	, count(DISTINCT LO_CUSTKEY) AS num_customer
	, sum(LO_REVENUE) AS revenue
FROM
	lineorder_csv
GROUP BY
	date_trunc('month', LO_ORDERDATE)
ORDER BY date_trunc('month', LO_ORDERDATE)
;

  • Columnar formats outperform text formats because they allow to access only specific columns and there's no excessive IO
  • Compression accounts for lower IO operations thus lower latency
  1. OLAP query including subset of rows (1 / 12 months)
SELECT
	date_trunc('month', LO_ORDERDATE) AS mnth
	, count(DISTINCT LO_ORDERKEY) AS num_orders
	, count(DISTINCT LO_CUSTKEY) AS num_customer
	, sum(LO_REVENUE) AS revenue
FROM
	lineorder_csv
WHERE LO_ORDERDATE BETWEEN '1992-12-01' AND '1992-12-31'
GROUP BY
	date_trunc('month', LO_ORDERDATE)
ORDER BY date_trunc('month', LO_ORDERDATE)
;

  • Results for this query are pretty much the same as for the previous one without WHERE condition
  • Although columnar formats allow for partition pruning and reading only relevant rows (according to WHERE condition), it is not pushed down
  • Clickhouse EXPLAIN command revealed that filter is applied only after the whole result set is returned from S3 😕

-- 1. Get the data
CREATE TABLE src_lineorder
(
LO_ORDERKEY UInt32,
LO_LINENUMBER UInt8,
LO_CUSTKEY UInt32,
LO_PARTKEY UInt32,
LO_SUPPKEY UInt32,
LO_ORDERDATE Date,
LO_ORDERPRIORITY LowCardinality(String),
LO_SHIPPRIORITY UInt8,
LO_QUANTITY UInt8,
LO_EXTENDEDPRICE UInt32,
LO_ORDTOTALPRICE UInt32,
LO_DISCOUNT UInt8,
LO_REVENUE UInt32,
LO_SUPPLYCOST UInt32,
LO_TAX UInt8,
LO_COMMITDATE Date,
LO_SHIPMODE LowCardinality(String)
)
ENGINE = S3('https://storage.yandexcloud.net/otus-dwh/dbgen/lineorder.tbl', 'CSV')
;
select * from src_lineorder limit 200 ;
-- 2. Clickhouse internal storage
CREATE TABLE lineorder_internal
engine = MergeTree()
ORDER BY
(
LO_ORDERDATE
, LO_ORDERKEY
)
PARTITION BY (toYear(LO_ORDERDATE))
AS SELECT * FROM src_lineorder
;
select count(*) from lineorder_internal ;
select * from lineorder_internal limit 200 ;
-- 3. Checkout storage size
SELECT table,
formatReadableSize(sum(bytes)) as size,
min(min_date) as min_date,
max(max_date) as max_date
FROM system.parts
WHERE active
AND ("table" ilike '%lineorder%')
GROUP BY "table"
ORDER BY sum(bytes) DESC
;
-- 4. Clickhouse 1-year data
CREATE TABLE lineorder_clickhouse
engine = MergeTree()
ORDER BY
(
LO_ORDERDATE
, LO_ORDERKEY
)
AS SELECT * FROM lineorder_internal
WHERE LO_ORDERDATE BETWEEN '1992-01-01' AND '1992-12-31'
;
select count(*) from lineorder_clickhouse ;
-- 5. Unload to JSON
DROP TABLE lineorder_json ;
CREATE TABLE lineorder_json
(
LO_ORDERKEY UInt32,
LO_LINENUMBER UInt8,
LO_CUSTKEY UInt32,
LO_PARTKEY UInt32,
LO_SUPPKEY UInt32,
LO_ORDERDATE Date,
LO_ORDERPRIORITY LowCardinality(String),
LO_SHIPPRIORITY UInt8,
LO_QUANTITY UInt8,
LO_EXTENDEDPRICE UInt32,
LO_ORDTOTALPRICE UInt32,
LO_DISCOUNT UInt8,
LO_REVENUE UInt32,
LO_SUPPLYCOST UInt32,
LO_TAX UInt8,
LO_COMMITDATE Date,
LO_SHIPMODE LowCardinality(String)
)
ENGINE = S3('https://storage.yandexcloud.net/otus-dwh/file_formats/json/',
'aws_access_key_id',
'aws_secret_access_key',
'JSONEachRow'
)
;
INSERT INTO lineorder_json
SELECT * FROM lineorder_clickhouse ;
SELECT * FROM lineorder_json LIMIT 200 ;
-- 6. Unload to CSV
DROP TABLE lineorder_csv ;
CREATE TABLE lineorder_csv
(
LO_ORDERKEY UInt32,
LO_LINENUMBER UInt8,
LO_CUSTKEY UInt32,
LO_PARTKEY UInt32,
LO_SUPPKEY UInt32,
LO_ORDERDATE Date,
LO_ORDERPRIORITY LowCardinality(String),
LO_SHIPPRIORITY UInt8,
LO_QUANTITY UInt8,
LO_EXTENDEDPRICE UInt32,
LO_ORDTOTALPRICE UInt32,
LO_DISCOUNT UInt8,
LO_REVENUE UInt32,
LO_SUPPLYCOST UInt32,
LO_TAX UInt8,
LO_COMMITDATE Date,
LO_SHIPMODE LowCardinality(String)
)
ENGINE = S3('https://storage.yandexcloud.net/otus-dwh/file_formats/csv/',
'aws_access_key_id',
'aws_secret_access_key',
'CSV'
)
;
INSERT INTO lineorder_csv
SELECT * FROM lineorder_clickhouse ;
-- 7. Add compression
DROP TABLE lineorder_csv_compressed ;
CREATE TABLE lineorder_csv_compressed
(
LO_ORDERKEY UInt32,
LO_LINENUMBER UInt8,
LO_CUSTKEY UInt32,
LO_PARTKEY UInt32,
LO_SUPPKEY UInt32,
LO_ORDERDATE Date,
LO_ORDERPRIORITY LowCardinality(String),
LO_SHIPPRIORITY UInt8,
LO_QUANTITY UInt8,
LO_EXTENDEDPRICE UInt32,
LO_ORDTOTALPRICE UInt32,
LO_DISCOUNT UInt8,
LO_REVENUE UInt32,
LO_SUPPLYCOST UInt32,
LO_TAX UInt8,
LO_COMMITDATE Date,
LO_SHIPMODE LowCardinality(String)
)
ENGINE = S3('https://storage.yandexcloud.net/otus-dwh/file_formats/csv_compressed/',
'aws_access_key_id',
'aws_secret_access_key',
'CSV',
'zstd'
)
;
INSERT INTO lineorder_csv_compressed
SELECT * FROM lineorder_clickhouse ;
select count(*) from lineorder_csv_compressed ;
-- 8. Unload to Parquet
DROP TABLE lineorder_parquet ;
CREATE TABLE lineorder_parquet
(
LO_ORDERKEY UInt32,
LO_LINENUMBER UInt8,
LO_CUSTKEY UInt32,
LO_PARTKEY UInt32,
LO_SUPPKEY UInt32,
LO_ORDERDATE Date,
LO_ORDERPRIORITY LowCardinality(String),
LO_SHIPPRIORITY UInt8,
LO_QUANTITY UInt8,
LO_EXTENDEDPRICE UInt32,
LO_ORDTOTALPRICE UInt32,
LO_DISCOUNT UInt8,
LO_REVENUE UInt32,
LO_SUPPLYCOST UInt32,
LO_TAX UInt8,
LO_COMMITDATE Date,
LO_SHIPMODE LowCardinality(String)
)
ENGINE = S3('https://storage.yandexcloud.net/otus-dwh/file_formats/parquet/',
'aws_access_key_id',
'aws_secret_access_key',
'Parquet'
)
;
INSERT INTO lineorder_parquet
SELECT * FROM lineorder_clickhouse ;
-- 9. Parquet compressed
DROP TABLE lineorder_parquet_compressed ;
CREATE TABLE lineorder_parquet_compressed
(
LO_ORDERKEY UInt32,
LO_LINENUMBER UInt8,
LO_CUSTKEY UInt32,
LO_PARTKEY UInt32,
LO_SUPPKEY UInt32,
LO_ORDERDATE Date,
LO_ORDERPRIORITY LowCardinality(String),
LO_SHIPPRIORITY UInt8,
LO_QUANTITY UInt8,
LO_EXTENDEDPRICE UInt32,
LO_ORDTOTALPRICE UInt32,
LO_DISCOUNT UInt8,
LO_REVENUE UInt32,
LO_SUPPLYCOST UInt32,
LO_TAX UInt8,
LO_COMMITDATE Date,
LO_SHIPMODE LowCardinality(String)
)
ENGINE = S3('https://storage.yandexcloud.net/otus-dwh/file_formats/parquet_compressed/',
'aws_access_key_id',
'aws_secret_access_key',
'Parquet',
'zstd'
)
;
INSERT INTO lineorder_parquet_compressed
SELECT * FROM lineorder_clickhouse ;
-- 10. Unload to ORC
DROP TABLE lineorder_orc ;
CREATE TABLE lineorder_orc
(
LO_ORDERKEY UInt32,
LO_LINENUMBER UInt8,
LO_CUSTKEY UInt32,
LO_PARTKEY UInt32,
LO_SUPPKEY UInt32,
LO_ORDERDATE Date,
LO_ORDERPRIORITY LowCardinality(String),
LO_SHIPPRIORITY UInt8,
LO_QUANTITY UInt8,
LO_EXTENDEDPRICE UInt32,
LO_ORDTOTALPRICE UInt32,
LO_DISCOUNT UInt8,
LO_REVENUE UInt32,
LO_SUPPLYCOST UInt32,
LO_TAX UInt8,
LO_COMMITDATE Date,
LO_SHIPMODE LowCardinality(String)
)
ENGINE = S3('https://storage.yandexcloud.net/otus-dwh/file_formats/orc/',
'aws_access_key_id',
'aws_secret_access_key',
'ORC'
)
;
INSERT INTO lineorder_orc
SELECT * FROM lineorder_clickhouse ;
-- 11. Unload to ORC compressed
DROP TABLE lineorder_orc_compressed ;
CREATE TABLE lineorder_orc_compressed
(
LO_ORDERKEY UInt32,
LO_LINENUMBER UInt8,
LO_CUSTKEY UInt32,
LO_PARTKEY UInt32,
LO_SUPPKEY UInt32,
LO_ORDERDATE Date,
LO_ORDERPRIORITY LowCardinality(String),
LO_SHIPPRIORITY UInt8,
LO_QUANTITY UInt8,
LO_EXTENDEDPRICE UInt32,
LO_ORDTOTALPRICE UInt32,
LO_DISCOUNT UInt8,
LO_REVENUE UInt32,
LO_SUPPLYCOST UInt32,
LO_TAX UInt8,
LO_COMMITDATE Date,
LO_SHIPMODE LowCardinality(String)
)
ENGINE = S3('https://storage.yandexcloud.net/otus-dwh/file_formats/orc_compressed/',
'aws_access_key_id',
'aws_secret_access_key',
'ORC',
'zstd'
)
;
INSERT INTO lineorder_orc_compressed
SELECT * FROM lineorder_clickhouse ;
-- 12. Benchmark query latency
-- CSV
SELECT
date_trunc('month', LO_ORDERDATE) AS mnth
, count(DISTINCT LO_ORDERKEY) AS num_orders
, count(DISTINCT LO_CUSTKEY) AS num_customer
, sum(LO_REVENUE) AS revenue
FROM
lineorder_csv
WHERE LO_ORDERDATE BETWEEN '1992-12-01' AND '1992-12-31'
GROUP BY
date_trunc('month', LO_ORDERDATE)
ORDER BY date_trunc('month', LO_ORDERDATE)
;
-- JSON
SELECT
date_trunc('month', LO_ORDERDATE) AS mnth
, count(DISTINCT LO_ORDERKEY) AS num_orders
, count(DISTINCT LO_CUSTKEY) AS num_customer
, sum(LO_REVENUE) AS revenue
FROM
lineorder_json
WHERE LO_ORDERDATE BETWEEN '1992-12-01' AND '1992-12-31'
GROUP BY
date_trunc('month', LO_ORDERDATE)
ORDER BY date_trunc('month', LO_ORDERDATE)
;
-- CSV compressed
SELECT
date_trunc('month', LO_ORDERDATE) AS mnth
, count(DISTINCT LO_ORDERKEY) AS num_orders
, count(DISTINCT LO_CUSTKEY) AS num_customer
, sum(LO_REVENUE) AS revenue
FROM
lineorder_csv_compressed
WHERE LO_ORDERDATE BETWEEN '1992-12-01' AND '1992-12-31'
GROUP BY
date_trunc('month', LO_ORDERDATE)
ORDER BY date_trunc('month', LO_ORDERDATE)
;
-- Parquet
SELECT
date_trunc('month', LO_ORDERDATE) AS mnth
, count(DISTINCT LO_ORDERKEY) AS num_orders
, count(DISTINCT LO_CUSTKEY) AS num_customer
, sum(LO_REVENUE) AS revenue
FROM
lineorder_parquet
WHERE LO_ORDERDATE BETWEEN '1992-12-01' AND '1992-12-31'
GROUP BY
date_trunc('month', LO_ORDERDATE)
ORDER BY date_trunc('month', LO_ORDERDATE)
;
-- ORC
EXPLAIN
SELECT
date_trunc('month', LO_ORDERDATE) AS mnth
, count(DISTINCT LO_ORDERKEY) AS num_orders
, count(DISTINCT LO_CUSTKEY) AS num_customer
, sum(LO_REVENUE) AS revenue
FROM
lineorder_orc
WHERE LO_ORDERDATE BETWEEN '1992-12-01' AND '1992-12-31'
GROUP BY
date_trunc('month', LO_ORDERDATE)
ORDER BY date_trunc('month', LO_ORDERDATE)
;
-- Parquet compressed
SELECT
date_trunc('month', LO_ORDERDATE) AS mnth
, count(DISTINCT LO_ORDERKEY) AS num_orders
, count(DISTINCT LO_CUSTKEY) AS num_customer
, sum(LO_REVENUE) AS revenue
FROM
lineorder_parquet_compressed
WHERE LO_ORDERDATE BETWEEN '1992-12-01' AND '1992-12-31'
GROUP BY
date_trunc('month', LO_ORDERDATE)
ORDER BY date_trunc('month', LO_ORDERDATE)
;
-- ORC compressed
SELECT
date_trunc('month', LO_ORDERDATE) AS mnth
, count(DISTINCT LO_ORDERKEY) AS num_orders
, count(DISTINCT LO_CUSTKEY) AS num_customer
, sum(LO_REVENUE) AS revenue
FROM
lineorder_orc_compressed
WHERE LO_ORDERDATE BETWEEN '1992-12-01' AND '1992-12-31'
GROUP BY
date_trunc('month', LO_ORDERDATE)
ORDER BY date_trunc('month', LO_ORDERDATE)
;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment