Last active
June 18, 2022 14:54
-
-
Save meyer1994/6c93bdbc1c1bb7168be5788d6e59cab3 to your computer and use it in GitHub Desktop.
Script used to import CSV files to SQL databases
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import logging | |
from argparse import ArgumentParser | |
import pandas as pd | |
from pandasql import sqldf | |
fmt = '[%(asctime)s] %(levelname)s [%(name)s:%(lineno)03d] %(message)s' | |
logger = logging.getLogger(__name__) | |
def read(files: list, chunk: int, **kwargs) -> list: | |
""" Reads all files and yields every chunk read """ | |
logger.info('Reading %d files', len(files)) | |
for file in files: | |
logger.info('Reading file: %s', file) | |
yield from pd.read_csv(file, chunksize=chunk, **kwargs) | |
def filter_kwargs(args: object, prefix: str) -> dict: | |
logger.info('Filtering args with prefix: %s', prefix) | |
kwargs = {} | |
for key, val in vars(args).items(): | |
if key.startswith(prefix) and val is not None: | |
_, key = key.split(':') | |
kwargs[key] = val | |
return kwargs | |
parser = ArgumentParser() | |
parser.add_argument('files', nargs='*', default=[sys.stdin.buffer]) | |
parser.add_argument('-t', '--table', default='df') | |
parser.add_argument('--chunk', default=10_000, type=int) | |
parser.add_argument('-f', '--filter', default=r'SELECT * FROM df') | |
parser.add_argument('-db', '--database', default=r'sqlite:///df.sqlite') | |
parser.add_argument('--read:sep', type=str) | |
parser.add_argument('--read:quoting', type=int) | |
parser.add_argument('--read:encoding', type=str) | |
parser.add_argument('--sql:index', action='store_true') | |
parser.add_argument('--sql:if_exists', default='append', type=str) | |
parser.add_argument('-v', '--verbose', action='store_true') | |
args = parser.parse_args() | |
read_kwargs = filter_kwargs(args, 'read:') | |
sql_kwargs = filter_kwargs(args, 'sql:') | |
if args.verbose: | |
logging.basicConfig(level=logging.INFO, format=fmt) | |
chunks = sorted(args.files) | |
chunks = read(args.files, args.chunk, **read_kwargs) | |
for i, df in enumerate(chunks): | |
logger.info('Filtering dataframe with: %s', args.filter) | |
df = sqldf(args.filter, locals()) | |
# We should only replace on first iteration, otherwise, | |
# every chunk will drop the table and we will end up | |
# with a table containing only the data from the last | |
# chunk | |
if i > 0: | |
sql_kwargs['if_exists'] = 'append' | |
logger.info('Importing %d rows chunk', len(df)) | |
df = df.to_sql(args.table, con=args.database, **sql_kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment