-
-
Save thelink2012/485fb6af0e45c150d5ea to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys, os, csv, tempfile, shutil | |
import io, StringIO | |
from os.path import join, isfile, isdir | |
from zipfile import ZipFile, ZIP_DEFLATED | |
def read_routes(dirname): | |
output = {} | |
if not isdir(dirname): | |
return None | |
for filename in os.listdir(dirname): | |
filepath = join(dirname, filename) | |
if isfile(filepath) and filepath.endswith(".txt"): | |
for entry in csv.DictReader(open(filepath)): | |
if len(entry['route_id']) > 0: | |
del entry['weelchair'] | |
output[entry['route_id']] = entry | |
return output | |
def read_lines(fstream): | |
output = {} | |
for entry in csv.DictReader(fstream): | |
if len(entry['line_id']) > 0: | |
tokens = entry['line_id'].split(':') | |
if len(tokens) == 3: | |
output[tokens[1] + ':' + entry['line_code']] = entry | |
return output | |
def read_desserte(fstream): | |
output = {} | |
for entry in csv.DictReader(fstream): | |
output[entry['code_mission']] = entry | |
return output | |
def write_trips(trips): | |
output = StringIO.StringIO() | |
fieldnames = ['route_id','service_id','trip_id','trip_headsign','direction_id','block_id'] | |
writer = csv.DictWriter(output, fieldnames, lineterminator='\n') | |
writer.writeheader() | |
writer.writerows(trips) | |
return output | |
def write_routes(routes): | |
output = StringIO.StringIO() | |
fieldnames = ['route_id','agency_id','route_short_name','route_long_name','route_desc','route_type','route_url','route_color','route_text_color'] | |
writer = csv.DictWriter(output, fieldnames, lineterminator='\n') | |
writer.writeheader() | |
writer.writerows(routes) | |
return output | |
def zipdir(path, ziph): | |
for root, dirs, files in os.walk(path): | |
for filename in files: | |
ziph.write(os.path.join(root, filename), filename) | |
def perform(input_zip, prod_zip, tmp_path): | |
new_routes = [] | |
new_trips = [] | |
print("Fetching necessary files...") | |
trips = csv.DictReader(input_zip.open("trips.txt")) | |
lines = read_lines(prod_zip.open("lines.txt")) | |
routes = read_routes("fr-idf_stif") | |
desserte = read_desserte(open("desserteRER.txt")) | |
print("Processing trips...") | |
for row in trips: | |
if row['trip_headsign'].isdigit(): | |
line = lines.get(row['route_id']) | |
if line: | |
if row['direction_id'] == '0': | |
row['trip_headsign'] = line['forward_line_name'] | |
elif row['direction_id'] == '1': | |
row['trip_headsign'] = line['backward_line_name'] | |
else: | |
dess = desserte.get(row['trip_headsign']) | |
if dess: | |
row['trip_headsign'] = dess['code_mission'] + " - " + dess['destination'] | |
new_trips.append(row) | |
if routes: | |
print("Updating routes...") | |
for row in csv.DictReader(input_zip.open("routes.txt")): | |
route = routes.get(row['route_id']) | |
if route: | |
new_routes.append(route) | |
else: | |
new_routes.append(row) | |
else: | |
print("Skipping routes, no 'fr-idf_stif' directory found...") | |
print("Writing new trips...") | |
with open(join(tmp_path, "trips.txt"), "w") as f: | |
output = write_trips(new_trips) | |
f.write(output.getvalue()) | |
output.close() | |
if routes: | |
print("Writing new routes...") | |
with open(join(tmp_path, "routes.txt"), "w") as f: | |
output = write_routes(new_routes) | |
f.write(output.getvalue()) | |
output.close() | |
if __name__ == "__main__": | |
if len(sys.argv) < 4: | |
print("Usage: update_gtfs <input_gtfs_zip> <zip_with_lines_txt> <output_gtfs_zip>") | |
sys.exit(0) | |
arg_input_zip = sys.argv[1] | |
arg_prod_zip = sys.argv[2] | |
arg_output_zip = sys.argv[3] | |
tmp_path = tempfile.mkdtemp() | |
try: | |
print("Fetching zip...") | |
with ZipFile(arg_input_zip) as input_zip: | |
with ZipFile(arg_prod_zip) as prod_zip: | |
input_zip.extractall(tmp_path) | |
perform(input_zip, prod_zip, tmp_path) | |
print("Building new zip file...") | |
with ZipFile(arg_output_zip, mode="w", compression=ZIP_DEFLATED) as output_zip: | |
zipdir(tmp_path, output_zip) | |
finally: | |
shutil.rmtree(tmp_path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment