Skip to content

Instantly share code, notes, and snippets.

Last active Aug 29, 2015
What would you like to do?
GTFS-realtime test harness for frequency-based trips

GTFS-realtime test harness for frequency-based trips


This is a Python-based test harness which can be used to simulate a GTFS-realtime feed with trips on a frequency-based route. Specifically, it simulates arrivals on the AirBART route (obtain BART GTFS here), at the AirBART_OAK stop. For convenience, the script simulates two vehicles, which are perpetually three and seven minutes away, respectively.


  1. Install the Google Protocol Buffers library, including Python bindings. On Ubuntu, this can be done with sudo apt-get install libprotobuf-dev libprotobuf7.
  2. Get gtfs_realtime.proto from and place it in the same directory as
  3. Generate the Python GTFS-realtime bindings by running protoc --python_out=. gtfs-realtime.proto.
  4. Install the remaining Python dependencies with pip install flask.


Run the script with python The generated feed is available at http://localhost:5000/feed, and the text debug output is available at http://localhost:5000/debug. Note that the test configuration is sensitive to the time of day and day of week—AirBART does not run all day long, and the Sunday service has a different trip ID from the Monday-Saturday service.

For Monday-Saturday service between 6 AM and midnight San Francisco time, the trip ID is M-FSAT2UP, while for Sunday service between 8 AM and midnight, the trip ID is SUN1UP. Of course, the times in frequencies.txt could be manually edited to make the feed more useful for testing at all times.

import gtfs_realtime_pb2 as gtfsrtpb
import time
from datetime import datetime
from google.protobuf import text_format
from flask import Flask, send_file
import io
app = Flask(__name__)
def buildFeed():
fm = gtfsrtpb.FeedMessage()
fh = fm.header
fh.gtfs_realtime_version = "1.0"
fh.timestamp = int(time.mktime(
fe = fm.entity.add() = "1"
buildTripUpdate(fe.trip_update, "1000", 3*60, "13:10:00");
fe = fm.entity.add() = "2"
buildTripUpdate(fe.trip_update, "2000", 7*60, "13:20:00");
return fm
def buildTripUpdate(tu, vehicleId, offset, startTime):
tu.timestamp = int(time.mktime( = vehicleId
#tu.trip.trip_id = "M-FSAT2UP"
tu.trip.trip_id = "SUN1UP"
tu.trip.route_id = "AirBART"
tu.trip.start_time = startTime
stu = tu.stop_time_update.add()
stu.stop_id = "OAK"
stu.arrival.time = int(time.mktime( + offset
stu.departure.time = stu.arrival.time
def feed():
return send_file(io.BytesIO(buildFeed().SerializeToString()))
def debugFeed():
return text_format.MessageToString(buildFeed())
if __name__ == "__main__":
header { gtfs_realtime_version: "1.0" timestamp: 1398647506 }
entity { id: "1"
trip_update { trip { trip_id: "SUN1UP"
start_time: "13:10:00"
route_id: "AirBART" }
stop_time_update { arrival { time: 1398647686 }
departure { time: 1398647686 }
stop_id: "OAK" }
vehicle { id: "1000" }
timestamp: 1398647506
entity { id: "2"
trip_update { trip { trip_id: "SUN1UP"
start_time: "13:20:00"
route_id: "AirBART" }
stop_time_update { arrival { time: 1398647926 }
departure { time: 1398647926 }
stop_id: "OAK" }
vehicle { id: "2000" }
timestamp: 1398647506
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment