Created
July 27, 2019 03:02
-
-
Save knkillname/af8eda6890edfab3d5079bcc504b1e53 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import csv | |
import io | |
import itertools | |
import pathlib | |
import sqlite3 | |
from typing import Any, Dict, Iterable, List, Optional | |
import zipfile | |
def read_sql_query(query: str, connection: sqlite3.Connection): | |
def dict_factory(cursor: sqlite3.Cursor, row: tuple): | |
return dict(zip(column_names, row)) | |
cursor = connection.execute(query) | |
column_names = [info[0] for info in cursor.description] | |
cursor.row_factory = dict_factory | |
yield from cursor | |
def get_fieldnames(table_name: str, connection: sqlite3.Connection) \ | |
-> List[str]: | |
query = f'SELECT * FROM {table_name}' | |
cursor = connection.execute(query) | |
return [info[0] for info in cursor.description] | |
def read_table(table_name: str, connection: sqlite3.Connection): | |
yield from read_sql_query(f'SELECT * FROM {table_name}', connection) | |
def table_to_csv(table: Iterable[Dict[str, Any]], file_obj: io.TextIOBase, | |
fieldnames: Optional[Iterable[str]] = None): | |
if fieldnames is None: | |
table = tuple(table) | |
fieldnames = set(itertools.chain.from_iterable( | |
row.keys() for row in table)) | |
else: | |
fieldnames = tuple(fieldnames) | |
writer = csv.DictWriter(file_obj, fieldnames=fieldnames) | |
writer.writeheader() | |
writer.writerows(table) | |
def get_table_names(connection: sqlite3.Connection) -> List[str]: | |
query = "SELECT tbl_name FROM sqlite_master WHERE type LIKE 'table'" | |
return [row[0] for row in connection.execute(query)] | |
def sqlite_to_zip(connection: sqlite3.Connection, zip_file: zipfile.ZipFile): | |
for table_name in get_table_names(connection): | |
fieldnames = get_fieldnames(table_name, connection) | |
table = read_table(table_name, connection) | |
with zip_file.open(table_name + '.csv', 'w') as zipped_csv: | |
table_to_csv(table, io.TextIOWrapper(zipped_csv), fieldnames) | |
def main(sqlite3_path: str, zip_path: Optional[str] = None): | |
if zip_path is None: | |
zip_path = pathlib.Path(sqlite3_path).with_suffix('.zip') | |
connection = sqlite3.connect(sqlite3_path) | |
with zipfile.ZipFile(zip_path, 'w') as zip_file: | |
sqlite_to_zip(connection, zip_file) | |
connection.close() | |
if __name__ == '__main__': | |
PARSER = argparse.ArgumentParser() | |
PARSER.add_argument("db_path", help='Path to sqlite3 DB.') | |
PARSER.add_argument('-o', '--output', help='Path to output Zip file.') | |
ARGS = PARSER.parse_args() | |
main(ARGS.db_path, ARGS.output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment