Skip to content

Instantly share code, notes, and snippets.

@meyer1994
Last active June 18, 2022 14:54
Show Gist options
  • Save meyer1994/6c93bdbc1c1bb7168be5788d6e59cab3 to your computer and use it in GitHub Desktop.
Save meyer1994/6c93bdbc1c1bb7168be5788d6e59cab3 to your computer and use it in GitHub Desktop.
Script used to import CSV files to SQL databases
import sys
import logging
from argparse import ArgumentParser
import pandas as pd
from pandasql import sqldf
fmt = '[%(asctime)s] %(levelname)s [%(name)s:%(lineno)03d] %(message)s'
logger = logging.getLogger(__name__)
def read(files: list, chunk: int, **kwargs) -> list:
""" Reads all files and yields every chunk read """
logger.info('Reading %d files', len(files))
for file in files:
logger.info('Reading file: %s', file)
yield from pd.read_csv(file, chunksize=chunk, **kwargs)
def filter_kwargs(args: object, prefix: str) -> dict:
logger.info('Filtering args with prefix: %s', prefix)
kwargs = {}
for key, val in vars(args).items():
if key.startswith(prefix) and val is not None:
_, key = key.split(':')
kwargs[key] = val
return kwargs
parser = ArgumentParser()
parser.add_argument('files', nargs='*', default=[sys.stdin.buffer])
parser.add_argument('-t', '--table', default='df')
parser.add_argument('--chunk', default=10_000, type=int)
parser.add_argument('-f', '--filter', default=r'SELECT * FROM df')
parser.add_argument('-db', '--database', default=r'sqlite:///df.sqlite')
parser.add_argument('--read:sep', type=str)
parser.add_argument('--read:quoting', type=int)
parser.add_argument('--read:encoding', type=str)
parser.add_argument('--sql:index', action='store_true')
parser.add_argument('--sql:if_exists', default='append', type=str)
parser.add_argument('-v', '--verbose', action='store_true')
args = parser.parse_args()
read_kwargs = filter_kwargs(args, 'read:')
sql_kwargs = filter_kwargs(args, 'sql:')
if args.verbose:
logging.basicConfig(level=logging.INFO, format=fmt)
chunks = sorted(args.files)
chunks = read(args.files, args.chunk, **read_kwargs)
for i, df in enumerate(chunks):
logger.info('Filtering dataframe with: %s', args.filter)
df = sqldf(args.filter, locals())
# We should only replace on first iteration, otherwise,
# every chunk will drop the table and we will end up
# with a table containing only the data from the last
# chunk
if i > 0:
sql_kwargs['if_exists'] = 'append'
logger.info('Importing %d rows chunk', len(df))
df = df.to_sql(args.table, con=args.database, **sql_kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment