Skip to content

Instantly share code, notes, and snippets.

@proffalken
Created April 11, 2022 08:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save proffalken/3fad9ee7b55dd03160aa85bbbece826f to your computer and use it in GitHub Desktop.
Save proffalken/3fad9ee7b55dd03160aa85bbbece826f to your computer and use it in GitHub Desktop.
Import the schedule data from Network Rail
#!/usr/bin/env python
import sentry_sdk
import json
import os
import requests
import sys
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from rrltdb.models import Tiploc, Schedule, AtocCodes, TrainStatus
sentry_sdk.init(
os.getenv("SENTRY_DSN"),
# Set traces_sample_rate to 1.0 to capture 100%
# of transactions for performance monitoring.
# We recommend adjusting this value in production.
traces_sample_rate=1.0
)
def main():
# Check if the schedule file exists and download if it doesn't
if not os.path.exists("/tmp/schedule.json"):
print("Schedule file does not exist, Please run the helper download script")
sys.exit(1)
#nrod_user = os.getenv("NROD_USER")
#nrod_pass = os.getenv("NROD_PASS")
#corpus_uri = f"https://{nrod_user}:{nrod_pass}@datafeeds.networkrail.co.uk/ntrod/CifFileAuthenticate?type=CIF_ALL_FULL_DAILY&day=toc-full"
#corpus_dl = requests.get(corpus_uri, allow_redirects=True)
#open("/tmp/schedule.json.gz", "wb").write(corpus_dl.content)
schedules_checked = 0
schedules_created = 0
commit_loop = 0
engine = create_engine(os.getenv("ALEMBIC_DB_URI"))
Session = sessionmaker()
Session.configure(bind=engine)
session = Session()
schedule_json = {}
atoc_codes = session.query(AtocCodes)
atoc_map = {}
for code in atoc_codes:
atoc_map[code.atoc_code] = code.atoc_internal_id
train_status = session.query(TrainStatus)
status_map = {}
for status in train_status:
status_map[status.train_status_code] = status.train_status_internal_id
schedule_file = open("/tmp/schedule.json", 'r')
for schedule_data in schedule_file:
schedule_json = json.loads(schedule_data)
if "JsonScheduleV1" in schedule_json:
schedule_entry = schedule_json["JsonScheduleV1"]
"""
CIF_train_uid
transaction_type
schedule_start_date
schedule_end_date
schedule_days_runs
CIF_bank_holiday_running
train_status
CIF_stp_indicator
atoc_code
applicable_timetable
"""
# Check that we have a TOC and that this is a CREATE record
if "atoc_code" in schedule_entry and schedule_entry['transaction_type'] == "Create":
# Find the schedule if it exists
schedules_checked = schedules_checked + 1
curr_schedule = session.query(Schedule).filter_by(CIF_train_uid=schedule_entry["CIF_train_uid"])
if curr_schedule.count() < 1:
schedules_created = schedules_created + 1
commit_loop = commit_loop + 1
if schedule_entry["applicable_timetable"] == "Y":
applicable_timetable = True
else:
applicable_timetable = False
atoc_code = atoc_map[schedule_entry["atoc_code"]]
train_status = status_map[schedule_entry["train_status"]]
new_schedule = Schedule()
new_schedule.CIF_train_uid = schedule_entry["CIF_train_uid"]
new_schedule.schedule_start_date = schedule_entry["schedule_start_date"]
new_schedule.schedule_end_date = schedule_entry["schedule_end_date"]
new_schedule.schedule_days_runs = schedule_entry["schedule_days_runs"]
new_schedule.CIF_bankh_holiday_running = schedule_entry["CIF_bank_holiday_running"]
new_schedule.train_status = train_status
new_schedule.CIF_stp_indicator = schedule_entry["CIF_stp_indicator"]
new_schedule.atoc_code = atoc_code
new_schedule.applicable_timetable = applicable_timetable
session.add(new_schedule)
if commit_loop > 1000:
print(f"Schedules created so far: {schedules_created} / {schedules_checked}")
session.commit()
commit_loop = 0
print(f"Checked={schedules_checked} created={schedules_created}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment