- install python packages using pip
pip install -r python_requirements.txt
- execute script
python sqlite_load.py
Last active
May 29, 2021 15:12
-
-
Save cryocaustik/974973239ad7a423c5c9799389df2bdd to your computer and use it in GitHub Desktop.
finds all CSV files in specified directory and loads them into specified sqlite database, using a row limit for reading large files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
tdqm>=4.19.8 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
import csv | |
from os.path import isdir, isfile, join as path_join | |
from os import listdir | |
from tqdm import tqdm | |
def exception_handler(caller_name, given_exception, quit_app=False): | |
"""elegantly handle exceptions and quit execution if specified. | |
Parameters: | |
---------- | |
caller_name : {str} | |
name of functon/method calling the exception_handler | |
given_exception : {exception} | |
valid python exception | |
quit_app : {bool}, optional | |
specify wether the app should quit after handeling the exception (the default is False, which [default_description]) | |
Raises | |
------ | |
ValueError | |
raises a ValueError exception if an invalid caller_name is passed (e.g. blank value) | |
""" | |
try: | |
if not len(str(caller_name).strip()) >= 1: | |
raise ValueError('invalid caller_name value') | |
_msg = '{call_name} error:\n\terror: {err_type}\n\tmsg: {err_msg}'.format( | |
call_name = caller_name, | |
err_type = type(given_exception).__name__, | |
err_msg = given_exception.args | |
) | |
print(_msg) | |
if quit_app: | |
input('\n\npress enter to exit...') | |
except Exception as _err: | |
_msg = 'exception_handler error:\n\terror: {err_type}\n\tmsg: {err_msg}\n\npress enter to exit...'.format( | |
err_type = type(_err).__name__, | |
err_msg = _err.args | |
) | |
input(_msg) | |
exit() | |
def find_files(dir): | |
"""fine all CSV files in specified directory and return a list of full paths for found files. | |
Parameters: | |
---------- | |
dir : {str} | |
directory where target CSV files are stored | |
Raises | |
------ | |
ValueError | |
raises ValueError when an invalid directory is specified | |
FileNotFoundError | |
raises a FileNotFoundError when no CSV files are found | |
Returns | |
------- | |
[list] | |
list of full paths for found files | |
""" | |
try: | |
if not isdir(dir): | |
raise ValueError('specified directory does not exist') | |
file_name_list = set() | |
for file_name in listdir(dir): | |
if file_name[-4:] == '.csv': | |
file_name_list.add(path_join(dir, file_name)) | |
if not file_name_list: | |
raise FileNotFoundError('no CSV files found in specified dir') | |
else: | |
return list(file_name_list) | |
except Exception as _err: | |
exception_handler('find_files', _err, True) | |
def load_files(sqlite_db_path, file_dir): | |
"""load all found csv files into specified sqlite database | |
Parameters: | |
---------- | |
sqlite_db_path : {str} | |
full path to sqlite database | |
file_dir : {str} | |
directory of where target CSV files are stored | |
Raises | |
------ | |
ValueError | |
raises ValueError if sqlite database is not found at given path | |
""" | |
try: | |
if not isfile(sqlite_db_path): | |
raise ValueError('specified sqlite database was not found') | |
# find all csv files in specified dir; you can also set static values here | |
files = find_files(file_dir) | |
# set the limit of how many rows we want to read; | |
# this is important for large files so that we do not read more than your machine can handle | |
read_row_limit = 100000 | |
# connect to the sqlite database and create our conn (connection) and cs (cursor) variables | |
conn = sqlite3.connect(sqlite_db_path) | |
cs = conn.cursor() | |
# create our sql variables | |
sql_destination_tbl = 'tst_tbl' | |
sql_insert_qry_template = 'insert into {dest_tbl} values ({col_plc_hldr})' | |
sql_insert_qry = None | |
sql_table_exists = False | |
# create progress bar | |
with tqdm(total=len(files)) as _pbar: | |
for _file in files: | |
# open the file and prepare to read it | |
with open(_file, 'r', encoding = 'utf-8-sig') as _fn: | |
# setup the csv parser; you can set your delimiter and qoutechar here if needed | |
_reader = csv.reader(_fn) | |
# capture file headers; | |
# if your file does not have headers, you will need to specify the column names of your table here | |
_headers = next(_reader) | |
# use headers to determine column count and update the qry; skip after initial file as columns should remain the same. | |
# if your file does not have headers, use a static insert query instead | |
if not sql_insert_qry: | |
sql_insert_qry = sql_insert_qry_template.format( | |
dest_tbl = sql_destination_tbl, | |
col_plc_hldr = ','.join(['?']*len(_headers)) | |
) | |
# create destination table if it does not exist; this can be toggled by changing sql_table_exists to true | |
if not sql_table_exists: | |
qry_create_tbl = 'create table if not exists {dest_tbl} ({col_plc_hldr});'.format( | |
dest_tbl = sql_destination_tbl, | |
col_plc_hldr = ' text,'.join(_headers) | |
) | |
cs.execute(qry_create_tbl) | |
conn.commit() | |
sql_table_exists = True | |
# read the specified amount of rows | |
_data = list() | |
for _indx, _row in enumerate(_reader): | |
_data.append(tuple(_row)) | |
if _indx == read_row_limit: | |
# load the values from csv into sqlite | |
cs.executemany(sql_insert_qry, _data) | |
conn.commit() | |
data = list() | |
else: | |
if _data: | |
# load the and remaining values into sqlite | |
cs.executemany(sql_insert_qry, _data) | |
conn.commit() | |
conn.close() | |
# close the current file, at which point we loop back to the next file | |
_fn.close() | |
# update progress bar | |
_pbar.update(1) | |
except Exception as _err: | |
exception_handler('load_files', _err, True) | |
def main(): | |
"""obtain variables from user and execute the program | |
""" | |
try: | |
csv_files_dir = input('specify full path to csv files: ') | |
sqlite_db_path = input('specify full path to sqlite database: ') | |
load_files(sqlite_db_path, csv_files_dir) | |
except Exception as _err: | |
exception_handler('main', _err, True) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment