Skip to content

Instantly share code, notes, and snippets.

@roder
Created December 16, 2010 04:49
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 12 You must be signed in to fork a gist
  • Save roder/743047 to your computer and use it in GitHub Desktop.
Save roder/743047 to your computer and use it in GitHub Desktop.
Convert XML files into SQLite
#!/usr/bin/env python
from optparse import OptionParser
from xml.dom.minidom import parse
import os
import sqlite3
datatypeMap = {
'integer': 'INT',
'datetime': 'DATETIME',
'boolean': 'BOOLEAN'
}
defaultDataType = 'TEXT'
def get_xml_doms(directory):
result = []
for filename in directory:
if filename.endswith('.xml'):
dom = parse(filename)
result.append(dom)
return result
def yield_db_schema(dbDef):
result = ''
for (table, tableDef) in dbDef.items():
result += create_table(table, tableDef)
return result
def exec_create_schema(dbDef, conn, db):
for (table, tableDef) in dbDef.items():
create = create_table(table, tableDef)
db.execute(create)
def yield_inserts(recordSet):
inserts = ''
for (table, rows) in recordSet.items():
for row in rows:
fields = "\'" + '\', \''.join(row.keys()) + "\'"
data = "\'" + '\', \''.join(row.values()) + "\'"
if fields != "''":
inserts += "INSERT INTO \'%s\' (%s) VALUES (%s);\n" % (table, fields, data)
return inserts
def exec_insert(recordSet, conn, db):
for (table, rows) in recordSet.items():
for row in rows:
fields = "\'" + '\', \''.join(row.keys()) + "\'"
data = "\'" + '\', \''.join(row.values()) + "\'"
if len(row.keys()) >0:
marklist = ["?"] * len(row.keys())
marks = ', '.join(marklist)
insert = "INSERT INTO \'%s\' (%s) VALUES (%s)" % (table, fields, marks)
values = tuple(row.values())
db.execute(insert, values)
conn.commit()
def create_table(table, tableDef):
fields = []
begin = 'CREATE TABLE \'%s\' ( \n' % table
for field, fieldDef in tableDef.items():
fields.append(create_field(field, fieldDef))
end = '\n);\n\n'
result = begin + ',\n'.join(fields) + end
return result
def create_field(field, fieldDef):
if fieldDef.has_key(u'type'):
datatype = fieldDef.get(u'type')
else:
datatype = defaultDataType
return " '%s' %s" % (field, datatype)
def collect_structure(doms):
db = {}
records = {}
for dom in doms:
db = gen_db_struct(dom.childNodes, db)
return db
def collect_data(dbDef, doms):
recordset = {}
for dom in doms:
for (table, fieldDef) in dbDef.items():
if not recordset.has_key(table):
recordset[table] = []
for row in dom.getElementsByTagName(table):
record = {}
for (column, _) in fieldDef.items():
for node in row.getElementsByTagName(column):
if node.hasChildNodes():
for item in node.childNodes:
if hasattr(item, 'data'):
if len(item.data.strip()) > 0:
record[column] = item.data
recordset[table].append(record)
return recordset
def gen_db_struct(nodeList, db = {}):
for node in nodeList:
if not node.hasChildNodes() and node.parentNode.parentNode.nodeName != '#document':
# a new field of data
field = node.parentNode
fieldName = field.nodeName
table = field.parentNode
tableName = table.nodeName
if not db.has_key(tableName):
db[tableName] = {}
db[tableName][fieldName] = {}
if field.hasAttributes():
for (Key, Value) in field.attributes.items():
if Key != u'type' and Value != u'array':
db[tableName][fieldName][Key] = datatypeMap[Value]
else:
gen_db_struct(node.childNodes, db)
return db
def run(inputDir, outputFile):
files = []
for filename in os.listdir(inputDir):
files.append(os.path.join(inputDir, filename))
domList = get_xml_doms(files)
dbDef = collect_structure(domList)
records = collect_data(dbDef, domList)
conn = sqlite3.connect(outputFile)
db = conn.cursor()
exec_create_schema(dbDef, conn, db)
exec_insert(records, conn, db)
db.close()
def main():
usage = "usage: %prog [options] /path/to/dir/with/xml"
parser = OptionParser(usage)
parser.add_option("-f", "--file", dest="outputFile", default = 'xmlsqlite.db3',
help="Specify the filename for the sqlite database. It will be created if it does not exist [Default: xmlsqlite.db3]")
(options, args) = parser.parse_args()
if len(args) != 1:
parser.error("incorrect number of arguments")
inputDir = os.path.abspath(os.path.expanduser(args[0]))
run(inputDir, options.outputFile)
if __name__ == "__main__": main()
@Oposum01
Copy link

Oposum01 commented Jul 5, 2013

Is line 113 correct? I mean the "parentNode.parentNode".
"if not node.hasChildNodes() and node.parentNode.parentNode.nodeName != '#document':"

@vickkeshri
Copy link

I have tried this code but getting error in line no. 149.
please guide me how to solve it.

@nigelhorne
Copy link

Doesn't work:

Traceback (most recent call last):
File "y", line 162, in
if name == "main": main()
File "y", line 159, in main
run(inputDir, options.outputFile)
File "y", line 140, in run
dbDef = collect_structure(domList)
File "y", line 86, in collect_structure
db = gen_db_struct(dom.childNodes, db)
File "y", line 130, in gen_db_struct
gen_db_struct(node.childNodes, db)
File "y", line 130, in gen_db_struct
gen_db_struct(node.childNodes, db)
File "y", line 128, in gen_db_struct
db[tableName][fieldName][Key] = datatypeMap[Value]
KeyError: u'1'

@Axel-Erfurt
Copy link

has_key was removed

from optparse import OptionParser
from xml.dom.minidom import parse
import os
import sqlite3


datatypeMap = {
  'integer': 'INT',
  'datetime': 'DATETIME',
  'boolean': 'BOOLEAN'
}

defaultDataType = 'TEXT'


def get_xml_doms(directory):
  result = []
  for filename in directory:
    if filename.endswith('.xml'):
      dom = parse(filename)
      result.append(dom)
  return result  

def yield_db_schema(dbDef):
  result = ''
  for (table, tableDef) in dbDef.items():
    result += create_table(table, tableDef) 
    
  return result

def exec_create_schema(dbDef, conn, db):
  for (table, tableDef) in dbDef.items():
    create = create_table(table, tableDef) 
    db.execute(create)
    
def yield_inserts(recordSet):
  inserts = ''
  for (table, rows) in recordSet.items():
    for row in rows:
      fields = "\'" + '\', \''.join(row.keys()) + "\'"
      data = "\'" + '\', \''.join(row.values()) + "\'"
      if fields != "''":
        inserts += "INSERT INTO \'%s\' (%s) VALUES (%s);\n" % (table, fields, data)
  
  return inserts
  
def exec_insert(recordSet, conn, db):
  for (table, rows) in recordSet.items():
    for row in rows:
        
      fields = "\'" + '\', \''.join(row.keys()) + "\'"
      data = "\'" + '\', \''.join(row.values()) + "\'"
      if len(row.keys()) >0:
        marklist = ["?"] * len(row.keys())
        marks = ', '.join(marklist)
          
        insert = "INSERT INTO \'%s\' (%s) VALUES (%s)" %  (table, fields, marks)
        values = tuple(row.values())
        db.execute(insert, values)
        conn.commit()
        

def create_table(table, tableDef):
  fields = []
  begin = 'CREATE TABLE \'%s\' ( \n' % table
  for field, fieldDef in tableDef.items():
    fields.append(create_field(field, fieldDef))
  end = '\n);\n\n'  
  result = begin + ',\n'.join(fields) + end
  
  return result

def create_field(field, fieldDef):
  if (u'type') in fieldDef:
    datatype =  fieldDef.get(u'type')
  else:
    datatype = defaultDataType
  
  return "  '%s' %s" % (field, datatype) 

def collect_structure(doms):
  db = {}
  records = {}
  for dom in doms:
    db = gen_db_struct(dom.childNodes, db)
 
  return db
  
def collect_data(dbDef, doms):
  recordset = {}
  for dom in doms:
    for (table, fieldDef) in dbDef.items():
      if not table in recordset:
        recordset[table] = []
      for row in dom.getElementsByTagName(table):
        record = {}
        for (column, _) in fieldDef.items():
          for node in row.getElementsByTagName(column):
            if node.hasChildNodes():
              for item in node.childNodes:
                if hasattr(item, 'data'):
                  if len(item.data.strip()) > 0:
                    record[column] = item.data
              
        recordset[table].append(record)
  
  return recordset
              

def gen_db_struct(nodeList, db = {}):
  for node in nodeList:
    if not node.hasChildNodes() and node.parentNode.parentNode.nodeName != '#document':
      # a new field of data
      field = node.parentNode
      fieldName = field.nodeName
      table = field.parentNode
      tableName = table.nodeName
        
      if not tableName in db:
        db[tableName] = {}
  
      db[tableName][fieldName] = {}

      if field.hasAttributes():
        for (Key, Value) in field.attributes.items():
          if Key != u'type' and Value != u'array':
            db[tableName][fieldName][Key] = datatypeMap[Value]   
    else:
      gen_db_struct(node.childNodes, db)
      
  return db

def run(inputDir, outputFile):
  files = []
  for filename in os.listdir(inputDir):
    files.append(os.path.join(inputDir, filename))
    
  domList = get_xml_doms(files)
  dbDef = collect_structure(domList)
  records = collect_data(dbDef, domList)
  conn = sqlite3.connect(outputFile)
  db = conn.cursor()
  exec_create_schema(dbDef, conn, db)
  exec_insert(records, conn, db)
  db.close()
  
def main():
  usage = "usage: %prog [options] /path/to/dir/with/xml"
  parser = OptionParser(usage)
  parser.add_option("-f", "--file", dest="outputFile", default = 'xmlsqlite.db3',
                    help="Specify the filename for the sqlite database.  It will be created if it does not exist [Default: xmlsqlite.db3]")

  (options, args) = parser.parse_args()
  if len(args) != 1:
      parser.error("incorrect number of arguments")
  
  inputDir = os.path.abspath(os.path.expanduser(args[0]))
  run(inputDir, options.outputFile)

  
if __name__ == "__main__": main()

@guest0815
Copy link

With my data I get the error:
"KeyError: '1'
If I replace line 128 with
db[tableName][fieldName][Key] = Value
the script runs, but does not create a foreign key in the sr_fa table.
Here is an example of my data:

<sr_id>5430</sr_id>
<sr_vorname>Bernd</sr_vorname>
<sr_faecher>
<sr_fa ind="1">
<srf_kurz>GE</srf_kurz>
<srf_kurs hj="1">
<srfk_kurz>GE1</srfk_kurz>
<srfk_punkte>12</srfk_punkte>
</srf_kurs>
<srf_kurs hj="2">
<srfk_kurz>E1</srfk_kurz>
</srf_kurs>
<srf_kurs hj="3">
<srfk_kurz>E1</srfk_kurz>
</srf_kurs>
<srf_kurs hj="4">
<srfk_kurz>E1</srfk_kurz>
</srf_kurs>
</sr_fa>
<sr_fa ind="2">
<srf_kurz>DE</srf_kurz>
<srf_kurs hj="1">
<srfk_kurz>DE1</srfk_kurz>
<srfk_punkte>12</srfk_punkte>
</srf_kurs>
<srf_kurs hj="2">
<srfk_kurz>DE1</srfk_kurz>
</srf_kurs>
<srf_kurs hj="3">
<srfk_kurz>DE1</srfk_kurz>
</srf_kurs>
<srf_kurs hj="4">
<srfk_kurz>DE1</srfk_kurz>
</srf_kurs>
</sr_fa>
<sr_fa ind="3">
<srf_kurz>PW</srf_kurz>
<srf_kurs hj="1">
<srfk_kurz>PW1</srfk_kurz>
<srfk_punkte>3</srfk_punkte>
</srf_kurs>
<srf_kurs hj="2">
<srfk_kurz>PW1</srfk_kurz>
</srf_kurs>
<srf_kurs hj="3">
<srfk_kurz>PW1</srfk_kurz>
</srf_kurs>
<srf_kurs hj="4">
<srfk_kurz>PW1</srfk_kurz>
</srf_kurs>
</sr_fa>
</sr_faecher>

Hope, someone can help me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment