Skip to content

Instantly share code, notes, and snippets.

@knkillname
Created July 27, 2019 03:02
Show Gist options
  • Save knkillname/af8eda6890edfab3d5079bcc504b1e53 to your computer and use it in GitHub Desktop.
Save knkillname/af8eda6890edfab3d5079bcc504b1e53 to your computer and use it in GitHub Desktop.
import argparse
import csv
import io
import itertools
import pathlib
import sqlite3
from typing import Any, Dict, Iterable, List, Optional
import zipfile
def read_sql_query(query: str, connection: sqlite3.Connection):
def dict_factory(cursor: sqlite3.Cursor, row: tuple):
return dict(zip(column_names, row))
cursor = connection.execute(query)
column_names = [info[0] for info in cursor.description]
cursor.row_factory = dict_factory
yield from cursor
def get_fieldnames(table_name: str, connection: sqlite3.Connection) \
-> List[str]:
query = f'SELECT * FROM {table_name}'
cursor = connection.execute(query)
return [info[0] for info in cursor.description]
def read_table(table_name: str, connection: sqlite3.Connection):
yield from read_sql_query(f'SELECT * FROM {table_name}', connection)
def table_to_csv(table: Iterable[Dict[str, Any]], file_obj: io.TextIOBase,
fieldnames: Optional[Iterable[str]] = None):
if fieldnames is None:
table = tuple(table)
fieldnames = set(itertools.chain.from_iterable(
row.keys() for row in table))
else:
fieldnames = tuple(fieldnames)
writer = csv.DictWriter(file_obj, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(table)
def get_table_names(connection: sqlite3.Connection) -> List[str]:
query = "SELECT tbl_name FROM sqlite_master WHERE type LIKE 'table'"
return [row[0] for row in connection.execute(query)]
def sqlite_to_zip(connection: sqlite3.Connection, zip_file: zipfile.ZipFile):
for table_name in get_table_names(connection):
fieldnames = get_fieldnames(table_name, connection)
table = read_table(table_name, connection)
with zip_file.open(table_name + '.csv', 'w') as zipped_csv:
table_to_csv(table, io.TextIOWrapper(zipped_csv), fieldnames)
def main(sqlite3_path: str, zip_path: Optional[str] = None):
if zip_path is None:
zip_path = pathlib.Path(sqlite3_path).with_suffix('.zip')
connection = sqlite3.connect(sqlite3_path)
with zipfile.ZipFile(zip_path, 'w') as zip_file:
sqlite_to_zip(connection, zip_file)
connection.close()
if __name__ == '__main__':
PARSER = argparse.ArgumentParser()
PARSER.add_argument("db_path", help='Path to sqlite3 DB.')
PARSER.add_argument('-o', '--output', help='Path to output Zip file.')
ARGS = PARSER.parse_args()
main(ARGS.db_path, ARGS.output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment