Skip to content

Instantly share code, notes, and snippets.

@joneskoo
Last active May 12, 2023 03:20
Show Gist options
  • Save joneskoo/ffe1610729497a6de4d1 to your computer and use it in GitHub Desktop.
Save joneskoo/ffe1610729497a6de4d1 to your computer and use it in GitHub Desktop.
Parse HTML table to CSV
#!/usr/bin/env python3.4
import sys
import sqlite3
from html.parser import HTMLParser
class TableParser(HTMLParser):
def __init__(self):
super().__init__(self)
self._path = []
self.table = []
self._row = []
self._content = []
def handle_starttag(self, tag, attrs):
self._path.append(tag)
if tag == 'tr':
self._row = []
def handle_data(self, data):
if 'td' in self._path:
self._content.append(data)
def handle_endtag(self, tag):
old_tag = self._path.pop()
assert old_tag == tag, "Should end this tag"
if tag == 'td':
self._row.append("".join(self._content))
self._content = []
elif tag == 'tr':
self.table.append(self._row)
self._row = []
def write_sqlite(header, rows):
header = [x.replace('.', '_') for x in header]
header[0] = 'Date'
h_idx = {}
for i, h in enumerate(header):
h_idx[h] = i
write_header = ['Date', 'Hours', 'FI']
conn = sqlite3.connect(sys.argv[1])
query = ("INSERT INTO elspot ({h}) VALUES ({v})"
.format(h=",".join(write_header),
v=",".join(['?']*len(write_header))))
print(query)
for row in rows:
values = []
for key in write_header:
pos = h_idx[key]
values.append(row[pos].replace(',', '.'))
conn.execute(query, values)
conn.commit()
conn.close()
def main():
p = TableParser()
with open(sys.argv[2]) as fd:
p.feed(fd.read())
write_sqlite(header=p.table[2], rows=p.table[3:])
if __name__ == '__main__':
main()
#!/usr/bin/env python3.4
import sys
import csv
from html.parser import HTMLParser
class TableParser(HTMLParser):
def __init__(self):
super().__init__(self)
self._path = []
self.table = []
self._row = []
self._content = []
def handle_starttag(self, tag, attrs):
self._path.append(tag)
if tag == 'tr':
self._row = []
def handle_data(self, data):
if 'td' in self._path:
self._content.append(data)
def handle_endtag(self, tag):
old_tag = self._path.pop()
assert old_tag == tag, "Should end this tag"
if tag == 'td':
self._row.append("".join(self._content))
self._content = []
elif tag == 'tr':
self.table.append(self._row)
self._row = []
def main():
p = TableParser()
with open(sys.argv[1]) as fd:
p.feed(fd.read())
c = csv.writer(sys.stdout)
c.writerows(p.table)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment