Skip to content

Instantly share code, notes, and snippets.

@prggmr
Last active February 20, 2023 21:35
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save prggmr/2161849 to your computer and use it in GitHub Desktop.
Save prggmr/2161849 to your computer and use it in GitHub Desktop.
Tool to import enormous XML files into mysql using lxml.etree
def does_exist(tbl, where = None, where_field = 'id'):
"""Builds a MySQL SELECT statement
"""
where_clause = "WHERE `%s` = %%s" % where_field
query = "SELECT COUNT(`id`) AS total FROM %s %s" % (
tbl, where_clause
)
cur.execute(query, (where,))
return cur.rownumber != 0
def insert_query(tbl, fields):
fields_insert_values = []
fields_tmp = []
fields_tmp_2 = []
for _field, _val in fields.iteritems():
fields_tmp.append('`%s`' % _field)
fields_tmp_2.append('%s')
fields_insert_values.append(_val)
query = "INSERT INTO %s (%s) VALUES (%s)" % (
tbl, ','.join(fields_tmp), ','.join(fields_tmp_2)
)
try:
cur.execute(query, (fields_insert_values))
con.commit()
return True
except Exception, e:
con.rollback()
return e
def update_query(tbl, fields, id):
fields_insert_values = []
fields_tmp = []
for _field, _val in fields.iteritems():
fields_tmp.append('`%s` = %%s' % _field)
fields_insert_values.append(_val)
query = "UPDATE %s SET %s WHERE id = %%s" % (
tbl, ','.join(fields_tmp)
)
fields_insert_values.append(id)
try:
cur.execute(query, (fields_insert_values))
con.commit()
return True
except Exception, e:
con.rollback()
return e
class Import(object):
"""
Imports XML Data from a Content Feed.
There are two seperate ways of importing a feed FULL and INC.
A FULL will only insert data and not look for any changes. This should ONLY be
used when doing a **full** sync.
A INC will look for data changes and update the database as needed, along with
inserting new data it receives. This should be used when importing the
incremental feed updates provided by MN each day.
"""
def __init__(self, type, source):
self._log = logging.getLogger('import')
self._type = type
self._source = source
self._log.info("Import %s started" % self._source)
if self._type == IMPORT_FULL:
self._log.debug("FULL IMPORT PERFORMED")
# Has the file been uncompressed
if '.gz' in self._source:
self._log.info("Compressed data file encountered.")
# Uncompress file
try:
self._log.info("Starting uncompression of data file.")
self._source = gzip.open(self._source, 'rb')
self._log.info("Uncompression finished.")
except Exception, error:
self._log.critical(error)
sys.exit(2)
elements = etree.iterparse(
self._source, events = ('start', 'end')
)
lookup_element = (
(
'parent_node',
(
('tbl_column', 'node_name'),
),
'tbl_name'
),
(
'parent_node',
(
"""
Allow for choosing elements in the case of
<parent_node><node_name><sub_node</node_name>
"""
('tbl_column', 'node_name', 'sub_node'),
),
'table_name',
custom_insert_function
),
)
root = None
fields = {}
lookup = None
func = None
where = None
exists_function = None
parent = None
for event, element in elements:
if event == 'start':
if root is None:
for tup in lookup_element:
if tup[0] == element.tag:
lookup = tup[1]
root = element.tag
fields = {}
tbl = tup[2]
func = self._insert_record
try:
exists_function = tup[3]
except IndexError:
exists_function = None
else:
for field in lookup:
use = False
if len(field) == 3:
if parent is None:
parent = element.tag
if parent == field[1] and element.tag == field[2]:
use = True
else:
if element.tag == field[1]:
use = True
if use:
try:
fields[field[0]] = element.text.encode('ascii','ignore')
except AttributeError:
fields[field[0]] = element.text
break
if event == 'end':
if parent is not None:
if element.tag == parent:
parent = None
if element.tag == root:
func(root, fields, tbl, exists_function)
lookup = None
fields = {}
func = None
root = None
exists_function = None
element.clear()
while element.getprevious() is not None:
del element.getparent()[0]
del element
self._log.info("Import complete")
def _cleanup(self, element):
element.clear()
while element.getprevious() is not None:
del element.getparent()[0]
del element
def _insert_record(self, type, fields, tbl, exists_function = None):
"""
Import a Record
:returns boolean:
"""
count = 0
failed = False
query = None
if exists_function is None:
exists = does_exist(tbl, fields['id'])
else:
exists = exists_function(fields)
if exists is False:
query = insert_query(tbl, fields)
else:
self._log.info("Skipping %s [%s]" % (tbl, fields['id']))
query = update_query(tbl, fields, fields['id'])
if isinstance(failed, bool) is False:
self._log.error("Failed to import %s %s (%s)" %
(type, fields, failed)
)
@PizzaShift
Copy link

Hi. I wanted to use your code to import a 15GB XML file to MySQL. I am new to this part of database migration so bare with me. Here is the error I get when running the snippet in VS Code:

line 26
except Exception, e:
^^^^^^^^^^^^
SyntaxError: multiple exception types must be parenthesized

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment