Skip to content

Instantly share code, notes, and snippets.

@thejevans
Last active February 23, 2022 02:03
Show Gist options
  • Save thejevans/274407d26ebacd1b451aad61a1e13cd0 to your computer and use it in GitHub Desktop.
Save thejevans/274407d26ebacd1b451aad61a1e13cd0 to your computer and use it in GitHub Desktop.
import ast
import json
import os
import sys
import time
import pandas as pd
import py_nextbus
def parse_and_list(pairs: list) -> dict:
"""Docstring"""
res = {}
for key, val in pairs:
if val in {'true','false'}:
res[key] = [val == 'true']
continue
try:
res[key] = [ast.literal_eval(val)]
except ValueError as e:
res[key] = [val]
except SyntaxError as e:
res[key] = [val]
return res
def update_csv(
csv_dir: str,
this_time: int,
route_id: str,
route_update: dict,
) -> None:
"""Docstring"""
if 'vehicle' not in route_update:
return
csv_id = '_'.join([route_id, str(round(this_time * 1e-9))])
csv_file = ''.join([csv_dir, csv_id, '.csv'])
dict_for_df = parse_and_list(route_update['vehicle'].items())
dict_for_df['time'] = [this_time]
df = pd.DataFrame.from_dict(dict_for_df)
if os.path.exists(csv_file):
df_orig = pd.read_csv(csv_file)
df = pd.concat([df_orig, df], ignore_index=True)
df.to_csv(csv_file, index=False)
def main(config_file: str, csv_dir: str) -> None:
"""Docstring"""
with open(config_file, 'rb') as f:
config = json.load(f)
client = py_nextbus.NextBusClient(output_format='json')
umd_route_list = client.get_route_list(agency='umd')
this_time = int(time.time() * 1e3)
route_updates = {
route['tag']: client.get_vehicle_locations(
route['tag'], config['last_time'], agency='umd')
for route in umd_route_list['route']
}
for route in umd_route_list['route']:
update_csv(csv_dir, this_time, route['tag'], route_updates[route['tag']])
config['last_time'] = this_time
with open(config_file, 'w') as f:
json.dump(config, f)
if __name__ == '__main__':
main(sys.argv[1], sys.argv[2])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment