Skip to content

Instantly share code, notes, and snippets.

@Tristramg
Created April 17, 2020 07:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Tristramg/c97e658f9ec2dca2df5d737ffdd6ef14 to your computer and use it in GitHub Desktop.
Save Tristramg/c97e658f9ec2dca2df5d737ffdd6ef14 to your computer and use it in GitHub Desktop.
"""
This tool is meant to process GTFS files from transport.data.gouv.fr,
convert them to the NeTEx format,
and upload them as community resources to transport.data.gouv.fr
"""
import logging
import subprocess
import tempfile
import urllib.request
import requests
logger = logging.getLogger('uploader')
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)
CONVERTER = "/home/tristram/beta.gouv.fr/tartare-tools/target/release/gtfs2netexfr"
DATAGOUV_API = "https://demo.data.gouv.fr/api/1"
DATAGOUV_API_KEY = "eyJhbGciOiJIUzUxMiJ9.eyJ1c2VyIjoiNTlmMzM1Mjk4OGVlMzg3M2IxNTgzNTY2IiwidGltZSI6MTU2MjA4NDQ5My43MjY4MDF9.bAs_wRzy1qbn5MfJRVq9t0G_aAkv7u8h-Cb4NiY8NI-sb_BdoDoXmIp_Mr1_uGAhhyBAFygxDwCu0Sy4imEcng"
TRANSPORT_ORGANIZATION_ID = "5abca8d588ee386ee6ece479"
def download_gtfs(url):
"""
Downloads the requested GTFS and saves it as local file.
Returns the path to that file
"""
local_filename, _headers = urllib.request.urlretrieve(url)
return local_filename
def convert(gtfs_src, publisher):
"""
Converts a given gtfs file and returns the path to the generated netex zip file.
The publisher is the name of the organization that published that dataset.
"""
with tempfile.TemporaryDirectory() as netex_dir:
ret = subprocess.run([
CONVERTER,
"--input", gtfs_src,
"--output", netex_dir,
"--participant", publisher
])
if ret.returncode == 0:
netex_zip = f"{netex_dir}.zip"
ret = subprocess.run([f"zip {netex_zip} -r {netex_dir}/*"], shell=True)
if ret.returncode == 0:
return netex_zip
raise "Unable to zip file"
raise "Unable to convert file"
def find_community_resources(dataset_id, netex_file):
"""
Checks if the a community resource already exists
"""
logger.debug("Searching community ressource %s in dataset %s", netex_file, dataset_id)
url = f"{DATAGOUV_API}/datasets/community_resources/"
params = {
'dataset': dataset_id,
'organization': TRANSPORT_ORGANIZATION_ID
}
ret = requests.get(url, params=params)
ret.raise_for_status()
data = ret.json()['data']
if data is not None:
filtered = [r for r in data if r['title'] == netex_file]
if len(filtered) == 0:
logger.debug("Found the dataset %s, but no existing ressource", dataset_id)
return None
if len(filtered) > 1:
logger.warning("More that one community resource %s in dataset %s",
netex_file, dataset_id)
logger.debug("Found dataset %s and matching community resource, with id %s",
dataset_id, filtered[0]['id'])
return filtered[0]
raise Exception(f"Searched community ressources of dataset {dataset_id}, could not understand response")
def create_community_resource(dataset_id, netex_file):
"""
Creates a community resource and uploads the file
This call will not link the resource. It requires and extra call
"""
logger.debug('Creating a community resource on dataset %s', dataset_id)
headers = {'X-API-KEY': DATAGOUV_API_KEY}
files = {'file': open(netex_file, 'rb')}
url = f"{DATAGOUV_API}/datasets/{dataset_id}/upload/community/"
ret = requests.post(url, headers=headers, files=files)
ret.raise_for_status()
json = ret.json()
logger.debug("Created a new community resource %s on dataset %s", json['id'], dataset_id)
return json
def find_or_create_community_resource(dataset_id, netex_file):
"""
When publishing a file, either the community resource already existed,
then we only update the file.
Otherwise we create a new resource
"""
community_resource = find_community_resources(dataset_id, netex_file)
if community_resource is not None:
upload_resource(community_resource['id'], netex_file)
return community_resource
return create_community_resource(dataset_id, netex_file)
def update_resource_metadata(dataset_id, resource):
"""
Updates metadata of the resources.
This call is opportant to link the resource to a dataset.
It also sets the organisation, format and description.
Does not return
"""
logger.debug("Updating metadata of resource %s", resource['id'])
resource['dataset'] = dataset_id
resource['organization'] = TRANSPORT_ORGANIZATION_ID
resource['description'] = "Converstion du fichier code-code-code"
resource['format'] = 'NeTEx'
url = f"{DATAGOUV_API}/datasets/community_resources/{resource['id']}/"
headers = {'X-API-KEY': DATAGOUV_API_KEY}
ret = requests.put(url, headers=headers, json=resource)
ret.raise_for_status()
logger.debug("Updating of resource %s done", resource['id'])
def upload_resource(resource_id, filename):
"""
Replaces the file of an existing resource.
After the call, and update to that resource is needed
"""
logger.debug("Uploading an new file %s on resource %s", filename, resource_id)
url = f"{DATAGOUV_API}/datasets/community_resources/{resource_id}/upload/"
headers = {'X-API-KEY': DATAGOUV_API_KEY}
ret = requests.post(url, headers=headers, files={'file': open(filename, 'rb')})
ret.raise_for_status()
logger.debug("Uploading an new file %s on resource %s done", filename, resource_id)
def publish_to_datagouv(dataset_id, netex_file):
"""
This will publish the netex file as a community resource of the dataset.
If the community resource already existed, it will be updated
"""
try:
logger.info("Going to add the file %s as community ressource to the dataset %s",
netex_file, dataset_id)
community_resource = find_or_create_community_resource(dataset_id, netex_file)
update_resource_metadata(dataset_id, community_resource)
logger.info("Added %s to the dataset %s", netex_file, dataset_id)
except requests.HTTPError as err:
logger.warning("Unable to add %s to the dataset %s. Http Error %s",
netex_file, dataset_id, err)
except Exception as err:
logger.warning("Unable to add %s to the dataset %s. Generic Error %s",
netex_file, dataset_id, err)
def main():
"""Main entry point"""
publish_to_datagouv("588a238d88ee3846659b81a4", 'test2.txt')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment