Skip to content

Instantly share code, notes, and snippets.

@bioinfornatics
Last active March 19, 2020 15:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bioinfornatics/31098eb887155f4ac57a8eb1ffb48fe7 to your computer and use it in GitHub Desktop.
Save bioinfornatics/31098eb887155f4ac57a8eb1ffb48fe7 to your computer and use it in GitHub Desktop.
bench parquet vs sqlite vs raw
#!/usr/bin/env python3
import sqlite3
import random
import string
from uuid import uuid4
from typing import List, Tuple
from statistics import mean
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow.lib import Schema
from pyspark.sql import SparkSession, SQLContext
def random_str():
str_length = random.randint(40,50)
return ''.join(random.choice(string.ascii_lowercase) for _ in range(str_length))
def sql_init(db_file: str):
conn = sqlite3.connect(db_file)
c = conn.cursor()
configure_sqlite(c)
c.execute('''CREATE TABLE IF NOT EXISTS some_text
(txt1 TEXT, txt2 TEXT, txt3 TEXT, txt4 TEXT, text5 TEXT)''')
return conn
def sql_init_with_constraint(db_file: str):
conn = sqlite3.connect(db_file)
c = conn.cursor()
configure_sqlite(c)
c.execute('''CREATE TABLE IF NOT EXISTS some_text
(txt1 TEXT PRIMARY KEY, txt2 TEXT, txt3 TEXT, txt4 TEXT, text5 TEXT)''')
return conn
def configure_sqlite(cursor: sqlite3.Cursor):
cursor.execute('PRAGMA optimize')
cursor.execute('PRAGMA journal_mode=WAL')
cursor.execute('PRAGMA LOCKING_MODE=EXCLUSIVE')
cursor.execute('PRAGMA synchronous=NORMAL')
cursor.execute('PRAGMA SQLITE_CONFIG_MULTITHREAD')
cursor.execute('PRAGMA SQLITE_DEFAULT_CACHE_SIZE=-4000')
cursor.execute('PRAGMA THREAD=4')
def parquet_init() -> Schema:
my_schema = pa.schema([('txt1', pa.string()),
('txt2', pa.string()),
('txt3', pa.string()),
('txt4', pa.string()),
('txt5', pa.string())])
return my_schema
def spark_init() -> Tuple[SparkSession, SQLContext]:
spark = SparkSession.builder.master("local") \
.appName("Word selector") \
.getOrCreate()
sc = SQLContext(spark.sparkContext, sparkSession=spark)
return spark, sc
def sql_write(conn: sqlite3.Connection, data: List[List[str]]):
cursor = conn.cursor()
cursor.executemany('''INSERT INTO some_text VALUES
(?,?,?,?,?)''', data)
conn.commit()
conn.close()
def txt_write(txt_file: str, data: List[List[str]]):
with open(txt_file, 'w') as f:
for row in data:
f.write(', '.join(row) + '\n')
def parquet_write(parquet_file: str, schema, data: List[List[str]]):
table = None
columns = [[] for _ in range(0, len(data[0]))] # each row have same number of columns
for row in data:
for i, item in enumerate(row):
columns[i].append(item)
arrays = [pa.array(column) for column in columns]
table = pa.Table.from_arrays(arrays, schema=schema)
with pq.ParquetWriter(parquet_file, table.schema,
use_dictionary=True, version='2.0') as writer:
writer.write_table(table) # write one row_group
def sql_select(conn: sqlite3.Connection, to_select: List[str]):
cursor = conn.cursor()
query = f'''SELECT * FROM some_text
WHERE txt1 IN ({','.join(['?']*len(to_select))})'''
cursor.execute(query, to_select)
return cursor.fetchall()
def txt_select(txt_file: str, to_select: List[str]):
result = []
with open(txt_file, 'r') as f:
for line in f:
fields = line.split()
if fields[0] in to_select:
result.append(line)
return result
def parquet_sql_select(parquet_file: str, sc: SQLContext, to_select: List[str]):
data_frame = sc.read.parquet(parquet_file)
data_frame.createOrReplaceTempView("some_text")
query = f'''SELECT * FROM some_text
WHERE txt1 IN ({','.join(f'"{txt1}"' for txt1 in to_select)})'''
results = sc.sql(query)
return results.collect()
def parquet_sql_select2(parquet_file: str, sc: SQLContext, spark: SparkSession, to_select: List[str]):
data_frame = sc.read.parquet(parquet_file)
filter_df = spark.createDataFrame(to_select, data_frame.schema['txt1'].dataType)
return data_frame.join(filter_df, data_frame['txt1'] == filter_df["value"])
def parquet_dataframe_select(parquet_file: str, sc: SQLContext, to_select: List[str]):
data_frame = sc.read.parquet(parquet_file)
results = data_frame.where(data_frame.txt1.isin(to_select))
return results
nb_rows = 100000
rows = [[str(uuid4())] + [random_str() for _ in range(4)] for iteration in range(0,nb_rows)]
txt1_field_selector = [rows[random.randint(0, nb_rows-1)][0] for _ in range(0, 100)]
db_file = 'perf_pq_db_txt.db'
db_constraint_file = 'perf_pq_db_txt.constraint.db'
txt_file = 'perf_pq_db_txt.txt'
pq_file = 'perf_pq_db_txt.parquet'
if __name__ == '__main__':
from timeit import repeat
txt_write_t1 = repeat(stmt='txt_write(txt_file,rows)', repeat=6, number=1, setup='''
from __main__ import txt_write, rows, txt_file
import os
if os.path.exists(txt_file):
os.remove(txt_file)
''')
sql_write_t1 = repeat(stmt='sql_write(conn,rows)', repeat=6, number=1, setup='''
from __main__ import sql_init, sql_write, rows, db_file
import os
if os.path.exists(db_file):
os.remove(db_file)
conn = sql_init(db_file)
''')
sql_write_t2 = repeat(stmt='sql_write(conn,rows)', repeat=6, number=1, setup='''
from __main__ import sql_init_with_constraint, sql_write, rows, db_constraint_file
import os
if os.path.exists(db_constraint_file):
os.remove(db_constraint_file)
conn = sql_init_with_constraint(db_constraint_file)
''')
parquet_write_t1 = repeat(stmt='parquet_write(pq_file,schema,rows)', repeat=6, number=1, setup='''
from __main__ import parquet_init, parquet_write, rows, pq_file
import os
if os.path.exists(pq_file):
os.remove(pq_file)
schema = parquet_init()
''')
txt_select_t1 = repeat(stmt='txt_select(txt_file,txt1_field_selector)', repeat=6, number=1, setup='''
from __main__ import txt_select, txt_file, txt1_field_selector
''')
sql_select_t1 = repeat(stmt='sql_select(conn,txt1_field_selector)', repeat=6, number=1, setup='''
from __main__ import sql_init, sql_select, txt1_field_selector, db_file
conn = sql_init(db_file)
''')
sql_select_t2 = repeat(stmt='sql_select(conn,txt1_field_selector)', repeat=6, number=1, setup='''
from __main__ import sql_init, sql_select, txt1_field_selector, db_constraint_file
conn = sql_init(db_constraint_file)
''')
parquet_select_t1 = repeat(stmt='parquet_sql_select(pq_file,sc,txt1_field_selector)', repeat=6, number=1, setup='''
from __main__ import spark_init, parquet_sql_select, txt1_field_selector, pq_file
import os
spark, sc = spark_init()
''')
parquet_select_t2 = repeat(stmt='parquet_dataframe_select(pq_file,sc,txt1_field_selector)', repeat=6, number=1, setup='''
from __main__ import spark_init, parquet_dataframe_select, txt1_field_selector, pq_file
import os
spark, sc = spark_init()
''')
print(f'{"text":<40} was writed in {mean(txt_write_t1):.3f} seconds.')
print(f'{"database":<40} was writed in {mean(sql_write_t1):.3f} seconds.')
print(f'{"database with constraint":<40} was writed in {mean(sql_write_t2):.3f} seconds.')
print(f'{"parquet":<40} was writed in {mean(parquet_write_t1):.3f} seconds.')
print(f'{"text":<40} selected in {mean(txt_select_t1):.3f} seconds.')
print(f'{"database":<40} selected in {mean(sql_select_t1):.3f} seconds.')
print(f'{"database with constraint":<40} selected in {mean(sql_select_t2):.3f} seconds.')
print(f'{"parquet SQL":<40} selected in {mean(parquet_select_t1):.3f} seconds.')
print(f'{"parquet using dataframe":<40} selected in {mean(parquet_select_t2):.3f} seconds.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment