Skip to content

Instantly share code, notes, and snippets.

@vmuriart
Forked from beauzeaux/sqlite2parquet.py
Created June 23, 2016 00:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vmuriart/1ef50b5523dc23d995154f41e6afb544 to your computer and use it in GitHub Desktop.
Save vmuriart/1ef50b5523dc23d995154f41e6afb544 to your computer and use it in GitHub Desktop.
sqlite2parquet
import sqlite3
import os
import argparse
try:
import pyspark
import pyspark.sql
except ImportError:
import sys
import os
# TODO: unhardcode these
os.environ["SPARK_HOME"] = '/opt/spark'
sys.path.append('/opt/spark/python')
sys.path.append('/opt/spark/python/lib/py4j-0.8.2.1-src.zip')
import pyspark
import pyspark.sql
from pyspark.sql.types import *
# load spark stuff
conf = pyspark.SparkConf()
conf.set('spark.executor.memory', '4g')
conf.set('spark.sql.parquet.compression.codec', 'gzip')
#conf.set('spark.sql.parquet.compression.codec', 'snappy')
sc = pyspark.SparkContext("local", conf=conf)
sqlContext = pyspark.SQLContext(sc)
def get_table_list(sqldb_loc):
"""Gets the list of tables in the sqlite database given a file path
to the sqlite database
"""
conn = sqlite3.connect(sqldb_loc)
cur = conn.cursor()
res = cur.execute("""SELECT name
FROM sqlite_master
WHERE type='table'""")
names = [x[0] for x in res]
cur.close()
return names
def get_generator_from_table(sqldb_loc, table_name):
conn = sqlite3.connect(sqldb_loc)
cur = conn.cursor()
res = cur.execute("""
SELECT * FROM {0}
""".format(table_name))
for row in res:
yield row
cur.close()
conn.close()
def get_column_names_from_table(sqldb_loc, table_name):
conn = sqlite3.connect(sqldb_loc)
cur = conn.cursor()
res = cur.execute("""
SELECT *
FROM {0}
""".format(table_name))
names = [x[0] for x in cur.description]
cur.close()
conn.close()
return names
def create_df_from_generator(gen, names):
a = sc.parallelize(gen, 20)
a.persist(pyspark.StorageLevel(True, True, False, True, 1))
df = sqlContext.createDataFrame(a, schema=names, samplingRatio=None).repartition(20)
#df.persist(pyspark.StorageLevel(True, True, False, True, 1))
return df
def sqlite2parquet(db_path, output_dir, skip_tables=['sqlite_sequence']):
tables = get_table_list(db_path)
for table in tables:
if table in skip_tables:
print "Skipping: {0}".format(table)
continue
print "Converting: {0}".format(table)
gen = get_generator_from_table(db_path, table)
if table in schemas:
print "\t known schema"
schema = schemas[table]
else:
schema = get_column_names_from_table(db_path, table)
print "\t converting to data-frame"
df = create_df_from_generator(gen, schema)
p = os.path.join(output_dir, table + '.parquet')
print "\t saving..."
df.saveAsParquetFile(p)
def main():
parser = argparse.ArgumentParser(description='Convert sqlite database into parquet files')
parser.add_argument('sqlite_db_path')
args = parser.parse_args()
try:
os.makedirs(args.sqlite_db_path + '.parquets')
except OSError:
#mmm quiet failure... brilliant!
pass
sqlite2parquet(args.sqlite_db_path, args.sqlite_db_path + '.parquets', skip_tables=['task', 'sqlite_sequence', 'sqlite_stat1', 'crawl'])
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment