-
-
Save vmuriart/1ef50b5523dc23d995154f41e6afb544 to your computer and use it in GitHub Desktop.
sqlite2parquet
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
import os | |
import argparse | |
try: | |
import pyspark | |
import pyspark.sql | |
except ImportError: | |
import sys | |
import os | |
# TODO: unhardcode these | |
os.environ["SPARK_HOME"] = '/opt/spark' | |
sys.path.append('/opt/spark/python') | |
sys.path.append('/opt/spark/python/lib/py4j-0.8.2.1-src.zip') | |
import pyspark | |
import pyspark.sql | |
from pyspark.sql.types import * | |
# load spark stuff | |
conf = pyspark.SparkConf() | |
conf.set('spark.executor.memory', '4g') | |
conf.set('spark.sql.parquet.compression.codec', 'gzip') | |
#conf.set('spark.sql.parquet.compression.codec', 'snappy') | |
sc = pyspark.SparkContext("local", conf=conf) | |
sqlContext = pyspark.SQLContext(sc) | |
def get_table_list(sqldb_loc): | |
"""Gets the list of tables in the sqlite database given a file path | |
to the sqlite database | |
""" | |
conn = sqlite3.connect(sqldb_loc) | |
cur = conn.cursor() | |
res = cur.execute("""SELECT name | |
FROM sqlite_master | |
WHERE type='table'""") | |
names = [x[0] for x in res] | |
cur.close() | |
return names | |
def get_generator_from_table(sqldb_loc, table_name): | |
conn = sqlite3.connect(sqldb_loc) | |
cur = conn.cursor() | |
res = cur.execute(""" | |
SELECT * FROM {0} | |
""".format(table_name)) | |
for row in res: | |
yield row | |
cur.close() | |
conn.close() | |
def get_column_names_from_table(sqldb_loc, table_name): | |
conn = sqlite3.connect(sqldb_loc) | |
cur = conn.cursor() | |
res = cur.execute(""" | |
SELECT * | |
FROM {0} | |
""".format(table_name)) | |
names = [x[0] for x in cur.description] | |
cur.close() | |
conn.close() | |
return names | |
def create_df_from_generator(gen, names): | |
a = sc.parallelize(gen, 20) | |
a.persist(pyspark.StorageLevel(True, True, False, True, 1)) | |
df = sqlContext.createDataFrame(a, schema=names, samplingRatio=None).repartition(20) | |
#df.persist(pyspark.StorageLevel(True, True, False, True, 1)) | |
return df | |
def sqlite2parquet(db_path, output_dir, skip_tables=['sqlite_sequence']): | |
tables = get_table_list(db_path) | |
for table in tables: | |
if table in skip_tables: | |
print "Skipping: {0}".format(table) | |
continue | |
print "Converting: {0}".format(table) | |
gen = get_generator_from_table(db_path, table) | |
if table in schemas: | |
print "\t known schema" | |
schema = schemas[table] | |
else: | |
schema = get_column_names_from_table(db_path, table) | |
print "\t converting to data-frame" | |
df = create_df_from_generator(gen, schema) | |
p = os.path.join(output_dir, table + '.parquet') | |
print "\t saving..." | |
df.saveAsParquetFile(p) | |
def main(): | |
parser = argparse.ArgumentParser(description='Convert sqlite database into parquet files') | |
parser.add_argument('sqlite_db_path') | |
args = parser.parse_args() | |
try: | |
os.makedirs(args.sqlite_db_path + '.parquets') | |
except OSError: | |
#mmm quiet failure... brilliant! | |
pass | |
sqlite2parquet(args.sqlite_db_path, args.sqlite_db_path + '.parquets', skip_tables=['task', 'sqlite_sequence', 'sqlite_stat1', 'crawl']) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment