Skip to content

Instantly share code, notes, and snippets.

@cryocaustik
Last active May 29, 2021 15:12
Show Gist options
  • Save cryocaustik/974973239ad7a423c5c9799389df2bdd to your computer and use it in GitHub Desktop.
Save cryocaustik/974973239ad7a423c5c9799389df2bdd to your computer and use it in GitHub Desktop.
finds all CSV files in specified directory and loads them into specified sqlite database, using a row limit for reading large files

sqlite_load

prerequisites

how to use

  • install python packages using pip
    • pip install -r python_requirements.txt
  • execute script
    • python sqlite_load.py
import sqlite3
import csv
from os.path import isdir, isfile, join as path_join
from os import listdir
from tqdm import tqdm
def exception_handler(caller_name, given_exception, quit_app=False):
"""elegantly handle exceptions and quit execution if specified.
Parameters:
----------
caller_name : {str}
name of functon/method calling the exception_handler
given_exception : {exception}
valid python exception
quit_app : {bool}, optional
specify wether the app should quit after handeling the exception (the default is False, which [default_description])
Raises
------
ValueError
raises a ValueError exception if an invalid caller_name is passed (e.g. blank value)
"""
try:
if not len(str(caller_name).strip()) >= 1:
raise ValueError('invalid caller_name value')
_msg = '{call_name} error:\n\terror: {err_type}\n\tmsg: {err_msg}'.format(
call_name = caller_name,
err_type = type(given_exception).__name__,
err_msg = given_exception.args
)
print(_msg)
if quit_app:
input('\n\npress enter to exit...')
except Exception as _err:
_msg = 'exception_handler error:\n\terror: {err_type}\n\tmsg: {err_msg}\n\npress enter to exit...'.format(
err_type = type(_err).__name__,
err_msg = _err.args
)
input(_msg)
exit()
def find_files(dir):
"""fine all CSV files in specified directory and return a list of full paths for found files.
Parameters:
----------
dir : {str}
directory where target CSV files are stored
Raises
------
ValueError
raises ValueError when an invalid directory is specified
FileNotFoundError
raises a FileNotFoundError when no CSV files are found
Returns
-------
[list]
list of full paths for found files
"""
try:
if not isdir(dir):
raise ValueError('specified directory does not exist')
file_name_list = set()
for file_name in listdir(dir):
if file_name[-4:] == '.csv':
file_name_list.add(path_join(dir, file_name))
if not file_name_list:
raise FileNotFoundError('no CSV files found in specified dir')
else:
return list(file_name_list)
except Exception as _err:
exception_handler('find_files', _err, True)
def load_files(sqlite_db_path, file_dir):
"""load all found csv files into specified sqlite database
Parameters:
----------
sqlite_db_path : {str}
full path to sqlite database
file_dir : {str}
directory of where target CSV files are stored
Raises
------
ValueError
raises ValueError if sqlite database is not found at given path
"""
try:
if not isfile(sqlite_db_path):
raise ValueError('specified sqlite database was not found')
# find all csv files in specified dir; you can also set static values here
files = find_files(file_dir)
# set the limit of how many rows we want to read;
# this is important for large files so that we do not read more than your machine can handle
read_row_limit = 100000
# connect to the sqlite database and create our conn (connection) and cs (cursor) variables
conn = sqlite3.connect(sqlite_db_path)
cs = conn.cursor()
# create our sql variables
sql_destination_tbl = 'tst_tbl'
sql_insert_qry_template = 'insert into {dest_tbl} values ({col_plc_hldr})'
sql_insert_qry = None
sql_table_exists = False
# create progress bar
with tqdm(total=len(files)) as _pbar:
for _file in files:
# open the file and prepare to read it
with open(_file, 'r', encoding = 'utf-8-sig') as _fn:
# setup the csv parser; you can set your delimiter and qoutechar here if needed
_reader = csv.reader(_fn)
# capture file headers;
# if your file does not have headers, you will need to specify the column names of your table here
_headers = next(_reader)
# use headers to determine column count and update the qry; skip after initial file as columns should remain the same.
# if your file does not have headers, use a static insert query instead
if not sql_insert_qry:
sql_insert_qry = sql_insert_qry_template.format(
dest_tbl = sql_destination_tbl,
col_plc_hldr = ','.join(['?']*len(_headers))
)
# create destination table if it does not exist; this can be toggled by changing sql_table_exists to true
if not sql_table_exists:
qry_create_tbl = 'create table if not exists {dest_tbl} ({col_plc_hldr});'.format(
dest_tbl = sql_destination_tbl,
col_plc_hldr = ' text,'.join(_headers)
)
cs.execute(qry_create_tbl)
conn.commit()
sql_table_exists = True
# read the specified amount of rows
_data = list()
for _indx, _row in enumerate(_reader):
_data.append(tuple(_row))
if _indx == read_row_limit:
# load the values from csv into sqlite
cs.executemany(sql_insert_qry, _data)
conn.commit()
data = list()
else:
if _data:
# load the and remaining values into sqlite
cs.executemany(sql_insert_qry, _data)
conn.commit()
conn.close()
# close the current file, at which point we loop back to the next file
_fn.close()
# update progress bar
_pbar.update(1)
except Exception as _err:
exception_handler('load_files', _err, True)
def main():
"""obtain variables from user and execute the program
"""
try:
csv_files_dir = input('specify full path to csv files: ')
sqlite_db_path = input('specify full path to sqlite database: ')
load_files(sqlite_db_path, csv_files_dir)
except Exception as _err:
exception_handler('main', _err, True)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment