Skip to content

Instantly share code, notes, and snippets.

@paul-english
Created July 10, 2023 23:08
Show Gist options
  • Save paul-english/363a54bac04e23d4e44858c3e9cec55d to your computer and use it in GitHub Desktop.
Save paul-english/363a54bac04e23d4e44858c3e9cec55d to your computer and use it in GitHub Desktop.
import os
import sqlite3
from xml.etree.ElementTree import iterparse
from tqdm import tqdm
import json
def create_database(dbname: str):
# Create a connection to the SQLite database
conn = sqlite3.connect(dbname)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE dbinfo (
dbinfo_id INTEGER PRIMARY KEY,
dbname TEXT NOT NULL,
version TEXT,
entry_count TEXT,
file_date TEXT
)
''')
cursor.execute('''
CREATE TABLE ipr (
ipr_id INTEGER PRIMARY KEY,
match_id INTEGER NOT NULL,
id TEXT NOT NULL,
name TEXT NOT NULL,
type TEXT NOT NULL,
parent_id TEXT,
FOREIGN KEY (match_id) REFERENCES match (match_id)
)
''')
cursor.execute('''
CREATE TABLE lcn (
lcn_id INTEGER PRIMARY KEY,
match_id INTEGER NOT NULL,
start TEXT NOT NULL,
end TEXT NOT NULL,
fragments TEXT,
score TEXT NOT NULL,
FOREIGN KEY (match_id) REFERENCES match (match_id)
)
''')
cursor.execute('''
CREATE TABLE match (
match_id INTEGER PRIMARY KEY,
protein_id INTEGER NOT NULL,
id TEXT NOT NULL,
name TEXT NOT NULL,
dbname TEXT NOT NULL,
status TEXT NOT NULL,
evd TEXT NOT NULL,
model TEXT NOT NULL,
FOREIGN KEY (protein_id) REFERENCES protein (protein_id)
)
''')
cursor.execute('''
CREATE TABLE protein (
protein_id INTEGER PRIMARY KEY,
id TEXT NOT NULL,
name TEXT NOT NULL,
length TEXT NOT NULL,
crc64 TEXT NOT NULL
)
''')
# Commit the changes and close the connection
conn.commit()
conn.close()
def element_to_dict(element):
result = {}
if element.attrib:
result.update(element.attrib)
for child in element:
child_dict = element_to_dict(child)
if child.tag in result:
if not isinstance(result[child.tag], list):
result[child.tag] = [result[child.tag]]
result[child.tag].append(child_dict)
else:
result[child.tag] = child_dict
return result
def process_element(elem, conn, cursor):
#dbinfo, protein, match, ipr, lcn = 0,0,0,0,0
if elem.tag == 'dbinfo':
dbinfo_attrib = elem.attrib
cursor.execute('INSERT INTO dbinfo (dbname, version, entry_count, file_date) VALUES (?, ?, ?, ?)',
(dbinfo_attrib['dbname'], dbinfo_attrib.get('version'), dbinfo_attrib.get('entry_count'),
dbinfo_attrib.get('file_date')))
#dbinfo += 1
elif elem.tag == 'protein':
protein_attrib = elem.attrib
cursor.execute('INSERT INTO protein (id, name, length, crc64) VALUES (?, ?, ?, ?)',
(protein_attrib['id'], protein_attrib['name'], protein_attrib['length'], protein_attrib['crc64']))
protein_id = cursor.lastrowid
#protein += 1
for child in elem:
if child.tag == 'match':
match_attrib = child.attrib
try:
cursor.execute('INSERT INTO match (protein_id, id, name, dbname, status, evd, model) VALUES (?, ?, ?, ?, ?, ?, ?)',
(protein_id, match_attrib['id'], match_attrib['name'], match_attrib['dbname'],
match_attrib['status'], match_attrib['evd'], match_attrib['model']))
except KeyError as e:
print('--', match_attrib, element_to_dict(elem))
raise e
match_id = cursor.lastrowid
#match += 1
for sub_child in child:
if sub_child.tag == 'ipr':
ipr_attrib = sub_child.attrib
cursor.execute('INSERT INTO ipr (match_id, id, name, type, parent_id) VALUES (?, ?, ?, ?, ?)',
(match_id, ipr_attrib['id'], ipr_attrib['name'], ipr_attrib['type'],
ipr_attrib.get('parent_id')))
#ipr += 1
elif sub_child.tag == 'lcn':
lcn_attrib = sub_child.attrib
cursor.execute('INSERT INTO lcn (match_id, start, end, fragments, score) VALUES (?, ?, ?, ?, ?)',
(match_id, lcn_attrib['start'], lcn_attrib['end'], lcn_attrib.get('fragments'),
lcn_attrib['score']))
#lcn += 1
#return dbinfo, protein, match, ipr, lcn
def main():
filename = './data/interpro/match_complete.xml'
dbname = './data/interpro/match_complete.sqlite'
os.system(f'rm -rf {dbname}')
create_database(dbname)
conn = sqlite3.connect(dbname)
cursor = conn.cursor()
totals = {
'dbinfo': 0,
'protein': 0,
'match': 0,
'ipr': 0,
'lcn': 0,
}
try:
with tqdm() as pbar:
for event, elem in iterparse(filename, events=("end",)):
if event == 'end':
if elem.tag == 'dbinfo' or elem.tag == 'protein':
#dbinfo, protein, match, ipr, lcn = process_element(elem, conn, cursor)
process_element(elem, conn, cursor)
conn.commit()
#totals['dbinfo'] += dbinfo
#totals['protein'] += protein
#totals['match'] += match
#totals['ipr'] += ipr
#totals['lcn'] += lcn
#pbar.set_description(json.dumps(totals))
pbar.update()
elem.clear()
finally:
conn.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment