Skip to content

Instantly share code, notes, and snippets.

@biggers biggers/csvs_db_insert.py
Last active Mar 3, 2019

Embed
What would you like to do?
insert a set of CSVs (spreadsheets, "raw") into a MySQL DB - Python3 example (DB API; generators; PyMySQL)
import pymysql.cursors
import os
from attrdict import AttrDict
import sys
from datetime import datetime as dt
# Insert all /var/tmp/*/"CSV summary-reports" into MySQL - using Py DB-API
# NOTE: schema for the table must have been already created!
#
# INSTALL:
# pip3 install PyMySQL attrdict
#
# REFs:
# PyMySQL pure-Py driver for MySQL - Python DB-API
#
# Run:
# env DB_PASSWD=xyzzy1234 python3 csvs_db_insert.py
from reporting.futils import (
gen_find,
gen_open_each,
gen_concatenate,
)
# CSV row:
# compute,abc-123-kms,instances,36,50,0.72,14,2017-08-17 14:47:51.009463
# compute,abc-123-kms,total_cores,138,200,0.69,62,2017-08-17 14:47:51.009526
def set_unique_index_for_fields(connection_object):
""" Ref: https://stackoverflow.com/a/5038052
"""
connobj = connection_object
sql = """ALTER TABLE {db_table} ADD UNIQUE
(cap_type, measurement, utc_datetime)""".\
format(db_table=connobj.db_table)
with connobj.dbc.cursor() as cursor:
cursor.execute(sql)
connobj.dbc.commit()
def put_csv_rows_to_db(lines, connection_object):
""" batch INSERT of all CSV-summary report values...
"""
connobj = connection_object
with connobj.dbc.cursor() as cursor:
values = []
for line in lines:
if 'in-use,' in line: # drop CSV summary-report "header" row
continue
vals = line.strip().split(',')
try: # convert the old "Influx timestamp", if any
t = int(vals[7])
ts = dt.utcfromtimestamp(t / 1e9)
vals[7] = str(ts)
except ValueError:
pass # vals[7] is a UTC-datetime string!
for i in (3, 4): # get rid of None(s) in server_group rows
vals[i] = 0 if 'None' in vals[i] else vals[i]
values.append(vals)
try:
# Create a new Measurement record
sql = """INSERT INTO {db_table}(cap_type, cloud_region,
measurement, in_use, cap_limit,
percent, remaining, utc_datetime)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)""".\
format(db_table=connobj.db_table)
cursor.executemany(sql, values)
# NOT "autocommit by default; commit to save!
connobj.dbc.commit()
except BaseException as e:
print(str(e), file=sys.stderr)
connobj.dbc.close()
def main():
"""
"""
# get *all* the Summary CSV reports, across all CSV files/folders
file_names = gen_find('compute-quotas-*summary-*.csv', '/var/tmp')
files = gen_open_each(file_names) # file "pointers" (opened)
lines = gen_concatenate(files) # get all lines, from all files!
connobj = AttrDict(user=os.getenv('DB_USER', 'capacity'),
password=os.getenv('DB_PASSWD', 'xyzzy'),
host=os.getenv('DB_HOST', '10.203.49.194'),
db=os.getenv('DB', 'capacity'),
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
# connect to the Capacity database
connobj.dbc = pymysql.connect(**connobj)
connobj.db_table = os.getenv('DB_TABLE', 'report')
set_unique_idx = os.getenv('SET_UNIQUE_IDX', 0)
if set_unique_idx:
set_unique_index_for_fields(connobj)
put_csv_rows_to_db(lines, connobj)
if __name__ == '__main__':
main()
#!/usr/bin/python3
import os
import fnmatch
import gzip
import bz2
def walk_files(root, patterns='*', single_level=True, yield_folders=False):
""" Return all files, matched from a ';'-string of "patterns".
Modified, from the Python Cookbook 2nd-ed, ex 2.16
"""
patterns = patterns.split(';')
for path, subdirs, files in os.walk(root):
if yield_folders:
files.extend(subdirs)
files.sort()
for fname in files:
for pattern in patterns:
if fnmatch.fnmatch(fname, pattern):
yield path, fname
break
if single_level:
break
def walk_files_2(root, patterns='*', single_level=True, yield_folders=False):
""" Return all files, matched from a ';'-string of "patterns".
Modified, from the Python Cookbook 2nd-ed, ex 2.16
"""
patterns = patterns.split(';')
for path, subdirs, files in os.walk(root):
if yield_folders:
files.extend(subdirs)
files.sort()
for fname in files:
for pattern in patterns:
if fnmatch.fnmatch(fname, pattern):
yield os.path.join(path, fname)
break
if single_level:
break
# REF: Python_Cookbook_Third_Edition/
# src/4/creating_data_processing_pipelines/example.py
# (below fns)
def gen_find(filepat, top):
"""
Find all filenames in a directory tree that match a shell wildcard pattern
Does *not* sort the filenames, that are found!
"""
for path, dirlist, filelist in os.walk(top):
for name in fnmatch.filter(filelist, filepat):
yield os.path.join(path, name)
def gen_open_each(filenames):
"""
Open a sequence of filenames one at a time producing a file object.
The file is closed immediately when proceeding to the next iteration.
"""
for filename in filenames:
if filename.endswith('.gz'):
f = gzip.open(filename, 'rt')
elif filename.endswith('.bz2'):
f = bz2.open(filename, 'rt')
else:
f = open(filename, 'rt')
yield f
f.close()
def gen_concatenate(iterators):
"""
Chain a sequence of iterators together into a single sequence.
"""
for it in iterators:
yield from it # Python.3 !
# for one in it: # Py2
# yield one
def ilen(it):
"""
Make a stateful counting iterator - zip it with the input
iterator, then drain until input exhausted at C level.
'count' is 0 based, so the 'next' value is the actual count
REF: http://stackoverflow.com/a/34404546/4846773
"""
from collections import deque
from itertools import count
cnt = count()
deque(zip(it, cnt), 0) # cnt is 2nd zip-arg, to avoid advancing too far
return next(cnt)
def main_test():
"""
"integration test" of functions in this module
"""
from itertools import tee
import sys
file_ns = gen_find('quotas*summary*.csv', '/var/tmp')
# file_names = walk_files_2(CSV_DIR, CSV_PAT)
file_names, fns2 = tee(file_ns)
files = gen_open_each(file_names)
lines = gen_concatenate(files)
total_lines = ilen(lines) # sum(1 for l in lines)
print("total lines, all found files: {}".format(total_lines),
file=sys.stderr)
for fname in fns2:
print("-> {}".format(fname), file=sys.stderr)
if __name__ == "__main__":
main_test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.