Skip to content

Instantly share code, notes, and snippets.

@robertobarreda
Created January 14, 2015 17:18
Show Gist options
  • Save robertobarreda/70d3630742b1cc447076 to your computer and use it in GitHub Desktop.
Save robertobarreda/70d3630742b1cc447076 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import argparse
import csv
import gzip
import sys
import re
from contextlib import closing
# This prevents prematurely closed pipes from raising
# an exception in Python
from signal import signal, SIGPIPE, SIG_DFL
signal(SIGPIPE, SIG_DFL)
csv.field_size_limit(sys.maxsize)
def is_insert(line):
"""
Returns true if the line begins a SQL insert statement.
"""
return line.startswith('INSERT INTO') or False
def get_values(line):
"""
Returns the portion of an INSERT statement containing values
"""
header, values = line.split(' VALUES ', 1)
tbl_name = re.search(r'INSERT INTO `([a-zA-Z0-9_\-]+?)`', header).group(1)
return tbl_name, values
def values_sanity_check(values):
"""
Ensures that values from the INSERT statement meet basic checks.
"""
assert values
assert values[0] == '('
# Assertions have not been raised
return True
def parse_values(values, outfile):
"""
Given a file handle and the raw values from a MySQL INSERT
statement, write the equivalent CSV to the file
"""
latest_row = []
reader = csv.reader([values], delimiter=',', doublequote=False,
escapechar='\\', quotechar="'", strict=True)
writer = csv.writer(outfile, quoting=csv.QUOTE_MINIMAL)
for reader_row in reader:
for column in reader_row:
# If our current string is empty...
if len(column) == 0:
continue
# If our string starts with an open paren
if column[0] == "(":
# Assume that this column does not begin
# a new row.
new_row = False
# If we've been filling out a row
if len(latest_row) > 0:
# Check if the previous entry ended in
# a close paren. If so, the row we've
# been filling out has been COMPLETED
# as:
# 1) the previous entry ended in a )
# 2) the current entry starts with a (
if latest_row[-1][-1] == ")":
# Remove the close paren.
latest_row[-1] = latest_row[-1][:-1]
new_row = True
# At the end of an INSERT statement, we'll
# have the semicolon.
# Make sure to remove the semicolon and
# the close paren.
elif latest_row[-1][-2:] == ");":
latest_row[-1] = latest_row[-1][:-2]
new_row = True
# If we've found a new row, write it out
# and begin our new one
if new_row:
writer.writerow(latest_row)
latest_row = []
# If we're beginning a new row, eliminate the
# opening parentheses.
if len(latest_row) == 0:
column = column[1:]
# Add our column to the row we're working on.
latest_row.append(column)
def main(args):
try:
tbl_list = set(args.tbl_list)
print 'Dumping:', tbl_list
with closing(gzip.open(args.infile, 'rb')) as infile:
for line in infile:
# Look for an INSERT statement and parse it.
if is_insert(line):
tbl_name, values = get_values(line)
if tbl_name not in tbl_list:
continue
if values_sanity_check(values):
with open(tbl_name + '.csv', 'a') as outfile:
parse_values(values, outfile)
except KeyboardInterrupt:
sys.exit(0)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Convert mysqldump to csv.')
parser.add_argument('-f', dest='infile', help='mysqldump to parse')
parser.add_argument('-t', dest='tbl_list', action='append')
args = parser.parse_args()
main(args)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment