Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kagesenshi/2bfeb28a6b39deb3491d to your computer and use it in GitHub Desktop.
Save kagesenshi/2bfeb28a6b39deb3491d to your computer and use it in GitHub Desktop.
from datetime import datetime, timedelta
import urllib
import re
from lxml.html import fromstring
from cssselect import GenericTranslator, SelectorError
import os
import json
base_url = 'http://apims.doe.gov.my/v2/'
HOURS = {
'hour1': range(0,6),
'hour2': range(6,12),
'hour3': range(12,18),
'hour4': range(18,24),
}
def table_filenames(dt):
limit = 4
if dt.strftime("%Y-%m-%d") == datetime.now().strftime("%Y-%m-%d"):
if dt.hour < 7:
limit = 0
elif dt.hour < 12:
limit = 1
elif dt.hour < 18:
limit = 2
else:
limit = 3
result = []
for l in range(limit):
result.append('hour%s_%s.html' % (l + 1, dt.strftime("%Y-%m-%d")))
return result
def get_data(url, date):
ds = date.strftime("%Y-%m-%d")
f = urllib.urlopen(url).read()
document = fromstring(f)
css = GenericTranslator().css_to_xpath
for row in document.xpath(css("table.table1 > tr")):
if row.attrib.get("class","") == "tablehead":
continue
raw_data = [td.text_content() for td in row.findall("td")]
data = {
'state': raw_data[0],
'area': raw_data[1]
}
for k, r in HOURS.items():
if k in url:
for idx, h in enumerate(r):
try:
data['%sT%02d:00' % (ds, h)] = raw_data[idx + 2]
except:
import pdb;pdb.set_trace()
yield data
dates = []
now = datetime.now()
dates.append(now)
while now.strftime("%Y-%m-%d") != "2015-09-29":
now = now - timedelta(days=1)
dates.append(now)
stage1 = []
for d in dates:
for u in table_filenames(d):
url = base_url + u
for r in get_data(url, d):
stage1.append(r)
stage2 = {}
for d in stage1:
key = (d['area'].upper(), d['state'].upper())
stage2.setdefault(key, {})
for k, v in d.items():
if k in ['area', 'state']:
continue
stage2[key][k] = v
stage3 = []
for k in sorted(stage2.keys()):
area, state = k
for t in sorted(stage2[k].keys()):
v = stage2[k][t]
try:
api = int(v.replace("*",''))
except:
api = None
datum = {
'area': area,
'state': state,
'time': t,
'api': api
}
stage3.append(datum)
with open(os.path.join(os.path.dirname(__file__),
'historical.jsonl'), 'w') as o:
for d in stage3:
o.write(json.dumps(d) + '\n')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment