Skip to content

Instantly share code, notes, and snippets.

@syedarehaq
Created June 11, 2021 20:50
Show Gist options
  • Save syedarehaq/0c459b0b198025acfab096a36483d0e9 to your computer and use it in GitHub Desktop.
Save syedarehaq/0c459b0b198025acfab096a36483d0e9 to your computer and use it in GitHub Desktop.
A slightly modified script to convert mysql dump to csv. Utilizing it for the sql dump of wikipedia interwiki links and pagelinks found here: https://dumps.wikimedia.org/enwiki/latest/ . It is slightly modified to accommodate some special unicode characters from its original source found here: https://github.com/jamesmishra/mysqldump-to-csv
#!/usr/bin/env python
## Source: https://github.com/jamesmishra/mysqldump-to-csv
import fileinput
import csv
import sys
# This prevents prematurely closed pipes from raising
# an exception in Python
from signal import signal, SIGPIPE, SIG_DFL
signal(SIGPIPE, SIG_DFL)
# allow large content in the dump
csv.field_size_limit(sys.maxsize)
def is_insert(line):
"""
Returns true if the line begins a SQL insert statement.
"""
return line.startswith('INSERT INTO') or False
def get_values(line):
"""
Returns the portion of an INSERT statement containing values
"""
return line.partition('` VALUES ')[2]
def values_sanity_check(values):
"""
Ensures that values from the INSERT statement meet basic checks.
"""
assert values
assert values[0] == '('
# Assertions have not been raised
return True
def parse_values(values, outfile):
"""
Given a file handle and the raw values from a MySQL INSERT
statement, write the equivalent CSV to the file
"""
latest_row = []
reader = csv.reader([values], delimiter=',',
doublequote=False,
escapechar='\\',
quotechar="'",
strict=True
)
writer = csv.writer(outfile, quoting=csv.QUOTE_MINIMAL)
for reader_row in reader:
for column in reader_row:
# If our current string is empty...
if len(column) == 0 or column == 'NULL':
latest_row.append(chr(0))
continue
# If our string starts with an open paren
if column[0] == "(":
# Assume that this column does not begin
# a new row.
new_row = False
# If we've been filling out a row
if len(latest_row) > 0:
# Check if the previous entry ended in
# a close paren. If so, the row we've
# been filling out has been COMPLETED
# as:
# 1) the previous entry ended in a )
# 2) the current entry starts with a (
if latest_row[-1][-1] == ")":
# Remove the close paren.
latest_row[-1] = latest_row[-1][:-1]
new_row = True
# If we've found a new row, write it out
# and begin our new one
if new_row:
writer.writerow(latest_row)
latest_row = []
# If we're beginning a new row, eliminate the
# opening parentheses.
if len(latest_row) == 0:
column = column[1:]
# Add our column to the row we're working on.
latest_row.append(column)
# At the end of an INSERT statement, we'll
# have the semicolon.
# Make sure to remove the semicolon and
# the close paren.
if latest_row[-1][-2:] == ");":
latest_row[-1] = latest_row[-1][:-2]
writer.writerow(latest_row)
def main():
"""
Parse arguments and start the program
"""
# Iterate over all lines in all files
# listed in sys.argv[1:]
# or stdin if no args given.
try:
for line in fileinput.input(openhook=fileinput.hook_encoded("utf-8",errors="ignore")):
# Look for an INSERT statement and parse it.
if is_insert(line):
values = get_values(line)
if values_sanity_check(values):
parse_values(values, sys.stdout)
except KeyboardInterrupt:
sys.exit(0)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment