Created
April 17, 2020 07:26
-
-
Save Tristramg/c97e658f9ec2dca2df5d737ffdd6ef14 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This tool is meant to process GTFS files from transport.data.gouv.fr, | |
convert them to the NeTEx format, | |
and upload them as community resources to transport.data.gouv.fr | |
""" | |
import logging | |
import subprocess | |
import tempfile | |
import urllib.request | |
import requests | |
logger = logging.getLogger('uploader') | |
logger.setLevel(logging.DEBUG) | |
stream_handler = logging.StreamHandler() | |
stream_handler.setLevel(logging.DEBUG) | |
logger.addHandler(stream_handler) | |
CONVERTER = "/home/tristram/beta.gouv.fr/tartare-tools/target/release/gtfs2netexfr" | |
DATAGOUV_API = "https://demo.data.gouv.fr/api/1" | |
DATAGOUV_API_KEY = "eyJhbGciOiJIUzUxMiJ9.eyJ1c2VyIjoiNTlmMzM1Mjk4OGVlMzg3M2IxNTgzNTY2IiwidGltZSI6MTU2MjA4NDQ5My43MjY4MDF9.bAs_wRzy1qbn5MfJRVq9t0G_aAkv7u8h-Cb4NiY8NI-sb_BdoDoXmIp_Mr1_uGAhhyBAFygxDwCu0Sy4imEcng" | |
TRANSPORT_ORGANIZATION_ID = "5abca8d588ee386ee6ece479" | |
def download_gtfs(url): | |
""" | |
Downloads the requested GTFS and saves it as local file. | |
Returns the path to that file | |
""" | |
local_filename, _headers = urllib.request.urlretrieve(url) | |
return local_filename | |
def convert(gtfs_src, publisher): | |
""" | |
Converts a given gtfs file and returns the path to the generated netex zip file. | |
The publisher is the name of the organization that published that dataset. | |
""" | |
with tempfile.TemporaryDirectory() as netex_dir: | |
ret = subprocess.run([ | |
CONVERTER, | |
"--input", gtfs_src, | |
"--output", netex_dir, | |
"--participant", publisher | |
]) | |
if ret.returncode == 0: | |
netex_zip = f"{netex_dir}.zip" | |
ret = subprocess.run([f"zip {netex_zip} -r {netex_dir}/*"], shell=True) | |
if ret.returncode == 0: | |
return netex_zip | |
raise "Unable to zip file" | |
raise "Unable to convert file" | |
def find_community_resources(dataset_id, netex_file): | |
""" | |
Checks if the a community resource already exists | |
""" | |
logger.debug("Searching community ressource %s in dataset %s", netex_file, dataset_id) | |
url = f"{DATAGOUV_API}/datasets/community_resources/" | |
params = { | |
'dataset': dataset_id, | |
'organization': TRANSPORT_ORGANIZATION_ID | |
} | |
ret = requests.get(url, params=params) | |
ret.raise_for_status() | |
data = ret.json()['data'] | |
if data is not None: | |
filtered = [r for r in data if r['title'] == netex_file] | |
if len(filtered) == 0: | |
logger.debug("Found the dataset %s, but no existing ressource", dataset_id) | |
return None | |
if len(filtered) > 1: | |
logger.warning("More that one community resource %s in dataset %s", | |
netex_file, dataset_id) | |
logger.debug("Found dataset %s and matching community resource, with id %s", | |
dataset_id, filtered[0]['id']) | |
return filtered[0] | |
raise Exception(f"Searched community ressources of dataset {dataset_id}, could not understand response") | |
def create_community_resource(dataset_id, netex_file): | |
""" | |
Creates a community resource and uploads the file | |
This call will not link the resource. It requires and extra call | |
""" | |
logger.debug('Creating a community resource on dataset %s', dataset_id) | |
headers = {'X-API-KEY': DATAGOUV_API_KEY} | |
files = {'file': open(netex_file, 'rb')} | |
url = f"{DATAGOUV_API}/datasets/{dataset_id}/upload/community/" | |
ret = requests.post(url, headers=headers, files=files) | |
ret.raise_for_status() | |
json = ret.json() | |
logger.debug("Created a new community resource %s on dataset %s", json['id'], dataset_id) | |
return json | |
def find_or_create_community_resource(dataset_id, netex_file): | |
""" | |
When publishing a file, either the community resource already existed, | |
then we only update the file. | |
Otherwise we create a new resource | |
""" | |
community_resource = find_community_resources(dataset_id, netex_file) | |
if community_resource is not None: | |
upload_resource(community_resource['id'], netex_file) | |
return community_resource | |
return create_community_resource(dataset_id, netex_file) | |
def update_resource_metadata(dataset_id, resource): | |
""" | |
Updates metadata of the resources. | |
This call is opportant to link the resource to a dataset. | |
It also sets the organisation, format and description. | |
Does not return | |
""" | |
logger.debug("Updating metadata of resource %s", resource['id']) | |
resource['dataset'] = dataset_id | |
resource['organization'] = TRANSPORT_ORGANIZATION_ID | |
resource['description'] = "Converstion du fichier code-code-code" | |
resource['format'] = 'NeTEx' | |
url = f"{DATAGOUV_API}/datasets/community_resources/{resource['id']}/" | |
headers = {'X-API-KEY': DATAGOUV_API_KEY} | |
ret = requests.put(url, headers=headers, json=resource) | |
ret.raise_for_status() | |
logger.debug("Updating of resource %s done", resource['id']) | |
def upload_resource(resource_id, filename): | |
""" | |
Replaces the file of an existing resource. | |
After the call, and update to that resource is needed | |
""" | |
logger.debug("Uploading an new file %s on resource %s", filename, resource_id) | |
url = f"{DATAGOUV_API}/datasets/community_resources/{resource_id}/upload/" | |
headers = {'X-API-KEY': DATAGOUV_API_KEY} | |
ret = requests.post(url, headers=headers, files={'file': open(filename, 'rb')}) | |
ret.raise_for_status() | |
logger.debug("Uploading an new file %s on resource %s done", filename, resource_id) | |
def publish_to_datagouv(dataset_id, netex_file): | |
""" | |
This will publish the netex file as a community resource of the dataset. | |
If the community resource already existed, it will be updated | |
""" | |
try: | |
logger.info("Going to add the file %s as community ressource to the dataset %s", | |
netex_file, dataset_id) | |
community_resource = find_or_create_community_resource(dataset_id, netex_file) | |
update_resource_metadata(dataset_id, community_resource) | |
logger.info("Added %s to the dataset %s", netex_file, dataset_id) | |
except requests.HTTPError as err: | |
logger.warning("Unable to add %s to the dataset %s. Http Error %s", | |
netex_file, dataset_id, err) | |
except Exception as err: | |
logger.warning("Unable to add %s to the dataset %s. Generic Error %s", | |
netex_file, dataset_id, err) | |
def main(): | |
"""Main entry point""" | |
publish_to_datagouv("588a238d88ee3846659b81a4", 'test2.txt') | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment