Skip to content

Instantly share code, notes, and snippets.

@nselikoff
Last active Mar 25, 2020
Embed
What would you like to do?
READMEs for scribe and librarian, two components of a reliable and scalable cloud-based data pipeline that can fetch and archive mobility data from public transit, shared mobility (bicycles and scooters), and parking APIs.

Librarian

Originally its own app, the librarian registers real-time and static data feeds in a database, with metadata that describes where to fetch them from, how often to fetch them, and the last time they were fetched and archived. It communicates with the scribe to do the actual fetching and archiving.

Registering Feeds

Manually add feed info in the data_feeds table in the database. There is currently no interactive UI for this project. But, you can visit /data_feeds to see a simple table with some of the key information for each DataFeed.

Supported Feed Types:

  • GTFS
  • GTFS_RT
  • MDS_TRIPS
  • MDS_STATUS_CHANGES
  • GBFS (free-bike-status)
  • GBFS_GEOFENCING_ZONE_INFORMATION
  • ACTIVE_BLOCKS
  • ARRIVAL_DEPARTURES
  • NFORCE_PARKINGRIGHTS

An example DataFeed may look like this when initially set up:

{
  "id" : 3,
  "feed_type" : "GTFS_RT",
  "slug" : "cfrta-gtfs-rt-vehiclepositions",
  "url" : "http://cfrta.omnimodal.io:8080/api/v1/key/d9b9b402/agency/1/command/gtfs-rt/vehiclePositions",
  "ping_rate" : 10000,
  "check_last_modified" : false,
  "archive" : true,
  "name" : "LYNX GTFS Realtime Vehicle Positions Feed",
  "country_code" : "US",
  "location" : "Orlando, FL",
  "provider_name" : "Omnimodal",
  "provider_id" : "omnimodal",
  "provider_url" : "https://omnimodal.io",
  "auth_type" : null,
  "auth_info" : null,
  "version" : null,
  "meta" : null,
  "last_modified" : null,
  "created_at" : "2019-06-27 16:42:43.96906",
  "updated_at" : "2019-07-02 18:12:00.513381",
}

Running

When the app is run, a clock process will ping all registered feeds according to their configuration attributes, and when the time is right, librarian will ask scribe to fetch and archive the feed, by POSTing with the appropriate info. If successful, scribe will respond with 201 Created and various metadata. The response is saved back into the database in the meta field, and the last_modified field is updated if that value is available in the response.meta.

Authentication

Librarian and Scribe use HTTP Basic authentication (username/password) to authenticate requests. Make sure the ENV variables for username and password are the same on each side in the given environment.

Librarian uses HTTP Basic authentication with a different username/password to access the UI (also configured in .env).

Scribe

Scribe fetches data feeds and archives them in raw form into AWS S3. It doesn't keep track of the feeds itself, but relies on API requests, e.g. from the librarian.

Written in node.js.

Prerequisites

  • npm
  • node

Setup

  • npm install
  • cp .env.example .env
  • enter environment variables in .env

Run

  • heroku local

API

API Authorization

Scribe requires HTTP Basic authentication (username and password). Set the username and password in the .env file on the scribe side, and send them in each request.

Feed Authorization

If a feed requires authorization, include the X-Auth header in the POST request, and scribe will pass along this header in the GET request for the feed.

For example, for a Lime MDS or GBFS feed:

X-Auth: Bearer limebike-A1B2C3...

Archive a Feed

Request

To archive a feed, POST to /archives with the following data:

{
  "feedType": "GTFS_RT",
  "slug": "cfrta-gtfs-rt-vehiclepositions",
  "url": "http://cfrta.omnimodal.io:8080/api/v1/key/d9b9b402/agency/1/command/gtfs-rt/vehiclePositions",
  "timeout": 8000,
  "extension": "pb"
}

Note that timeout is in milliseconds and will be used in the subsequent request that scribe makes for the feed.

Additional fields can be included in the POST and will be passed back in the response, plus included as metadata on the S3 archived object and Kinesis record. Metadata for specific feed types may be extracted from the fetched feed and included in the response and metadata as well.

Fields that are currently used within scribe:

  • lastModified: if present, will use a conditional GET request for the feed (i.e. get if updated)

Response

If the feed was successfully fetched and archived to S3, scribe will return 201 with the following data:

  • region: AWS region where the feed was archived
  • bucket: S3 bucket
  • key: S3 key
  • uri: full S3 uri where the raw data was archived
  • params: params used to fetch the feed (same params that were POSTed)
  • lastModified: last modified time for the data feed (optional, only if the server responds with the 'Last-Modified' header)
  • meta: additional metadata, typically from the data just fetched (e.g. if the feed has a timestamp/date indicating the last time it was updated, like GTFS Realtime and GBFS, that will be in meta.lastUpdated)
{
    "status": 201,
    "region": "us-east-1",
    "bucket": "data.dev.omnimodal.io/raw/GTFS_RT/cfrta-gtfs-rt-vehiclepositions",
    "key": "1561729082.pb",
    "uri": "https://data.dev.omnimodal.io/raw/GTFS_RT/cfrta-gtfs-rt-vehiclepositions/1561729082.pb",
    "params": {
        "slug": "cfrta-gtfs-rt-vehiclepositions",
        "url": "http://cfrta.omnimodal.io:8080/api/v1/key/d9b9b402/agency/1/command/gtfs-rt/vehiclePositions",
        "type": "GTFS_RT",
        "extension": "pb"
    },
    "lastModified": null,
    "meta": {
        "lastUpdated": 1561729082
    }
}

Other response codes:

  • 304 (not modified)
  • 404 (not found)
  • 400 (error)

AWS Kinesis

Scribe will record successful archives to Kinesis if the KINESIS_DATA_STREAM environment variable is set, and failed fetches to Kinesis if the KINESIS_DATA_STREAM_ERROR environment variable is set. Note that these streams need to already exist in Kinesis.

Testing

Tests are written with mocha, chai, chai-http and nock.

Run tests:

npm test

Logging is silenced by default. To run tests and see logs (requires mocha globally installed):

mocha --exit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment