Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#this script will query OpenData API to get the current positions for Sydney Trains. It will then use this information to plot the
#positions on a map
from google.transit import gtfs_realtime_pb2
import requests
import datetime
import boto3
import os
import time
from pytz import timezone
from decimal import Decimal
#list all variables --start
baseURL = 'https://api.transport.nsw.gov.au/v1/gtfs/vehiclepos/' # Define URL for opendata API
headers = {'Authorization': 'apikey <insert your opendata api key here>'} # Define header for opendata API
bucketName = os.environ['BUCKET'] # Set up bucket from environment variable
transportPositionTable = os.environ['TRANSPORTPOSITION_TABLE'] #dynamodb table for pushpin labels
bucketImagesKey = "images/" # set bucket key for images
HtmlLandingpagename = "vehiclelocation.html" #this is the landing html page that will load the image file to show the current position of vehicles
#Define operators for opendata API. Possible operators are ['sydneytrains', 'buses', 'ferries', 'lightrail', 'nswtrains', 'regionbuses', 'metro']
operators = ['sydneytrains']
#we will use bing maps to plot the locations
maps_bing_prefix = "https://dev.virtualearth.net/REST/v1/Imagery/Map/Road?mapsize=20000,20000&"
maps_bing_apikey = "format=jpeg&dcl=1&key=<insert your bing maps api key here>"
maps_error_imageURL = "https://image.shutterstock.com/image-photo/image-white-text-message-will-600w-1012395526.jpg" #show this image when there are errors
#generating map
maps_bing_pushpin_limit = 100 #this is the maximum pushpins that bing maps allows
inactive_vehicle_datetime_limit = 10 #(in minutes) this will be used to identify inactive vehicles. If a vehicle's
#last stop id timestamp is at least inactive_vehicle_datetime_limit minutes old,
# then it is considered to be inactive
maps_active_vehicle_icon = '4' #this is the icon that will be used to show active vehicles on the map
maps_inactive_vehicle_icon = '10' #this is the icon that will be used to show inactive vehicles on the map
legend_vehicle_displayed_on_map_color = "#00bfff" #this color will be used to show legend entries for which the vehicle is displayed on map
lastpushpinLabel = 0 #this will be used to generate labels for pushpins. Each pushpin will have a number, which will be associated with a description.
#the association will be displayed under a legends section on the right hand side of the web page showing the map.
#the vehicle's label will be used as the description
#the dicts below allow generating labels for the pushpins that will be mapped. Each pushpin will have a label and a description.
dict_pushpinLabel_to_desc_map = {} #this dict will associate a pushpin label to its description.
dict_vehicleId_to_pushpinLabel_map = {} #this dict will associate a vehicleId to the pushpin label it has been associated to on the map.
pushpin_url = "" #this is the url for the map with the pushpins containing the vehicle positions
total_vehicles_found = 0 #this is the total vehicles found, mapped or not
dynamodb_item_expiry_in_days = 2 #number of days after which items in dynamodb will be automatically deleted. Using DynamoDB TTL
tz_sydney = timezone('Australia/Sydney') # Set up timezone
#list all variables --end
s3 = boto3.resource('s3') # Define boto3 object to do s3 operations
dynamodb = boto3.resource('dynamodb') # Define boto3 object to do dynamodb operations
pushpinLabelsTable = dynamodb.Table(transportPositionTable)
# Initialize gtfs_realtime_pb2.FeedMessage
feed = gtfs_realtime_pb2.FeedMessage()
#most of the times, lambda functions get re-used by AWS for consequetive runs. To ensure the global variables are sanitised and do not
#cause issues because they contain values from last run, we will initialise them
def initialise_global_variables():
global lastpushpinLabel
global dict_pushpinLabel_to_desc_map
global dict_vehicleId_to_pushpinLabel_map
global pushpin_url
global total_vehicles_found
#initialise the values for the above global variables
print("Initialising global variables")
lastpushpinLabel = 0
dict_pushpinLabel_to_desc_map = {}
dict_vehicleId_to_pushpinLabel_map = {}
pushpin_url = ""
total_vehicles_found = 0
#read the contents of pushpinLabelsTable and populate pushpinLabels_LabelNum and pushpinLabels_Label dict
def load_pushpinLabels_from_dynamodb(today):
global dict_pushpinLabel_to_desc_map
global dict_vehicleId_to_pushpinLabel_map
global lastpushpinLabel
rdate = today.strftime("%Y-%m-%d")
pushpinLabelsTableContents = pushpinLabelsTable.query(
IndexName='TripDate-GSI',
KeyConditionExpression='TripDate = :tripdate',
ExpressionAttributeValues={":tripdate":rdate}
)
#check to see if there were any items returned from the above query
if len(pushpinLabelsTableContents['Items']) > 0:
#there were some items returned
#the vehicle's label could be same across different vehicles however its id is unique for the day. For this reason
#we must store the vehicle id along with the pushpinLabel and pushpinLabelDesc
for item in pushpinLabelsTableContents['Items']:
pushpinLabel = item['pushpinLabel']
pushpinLabelDesc = item['pushpinLabelDesc']
vehicleId = item['VehicleId']
inDynamodbTable = True #we will mark all entries that are being populated from dynamodb, so that when we update dynamodb
#we only write items that are missing, not everything. This saves on write capacity units
#the following fields were added progressively to dynamodb. Ensure that missing items doesn't break the code. Just add blanks if missing
try:
latitude = item['latitude']
except:
latitude = ""
try:
longitude = item['longitude']
except:
longitude = ""
try:
tripId = item['tripId']
except:
tripId = ""
dict_pushpinLabel_to_desc_map[str(pushpinLabel)] = {"desc": str(pushpinLabelDesc), "vehicleId": str(vehicleId), "inDynamodbTable": inDynamodbTable, "latitude": latitude, "longitude": longitude, "tripId": tripId}
dict_vehicleId_to_pushpinLabel_map[str(vehicleId)] = {"label": str(pushpinLabel), "desc": str(pushpinLabelDesc)}
#if a higher pushpinLabel was found in pushpinLabels table, then set the lastPushpinLabel to this
if int(pushpinLabel) > int(lastpushpinLabel):
lastpushpinLabel = int(pushpinLabel)
#else there were no results for the above query, which means there were no entries in dynamodb table for today. Let's start fresh
print("load_pushpinLabels_from_dynamodb: Loaded ", len(pushpinLabelsTableContents['Items']), "pushpinLabels from dynamodb table. lastpushpinLabel:", lastpushpinLabel)
def callOpenData(operator, feed):
print("callOpenData:Obtaining vehicle location for", operator,end="...")
response = requests.get(baseURL + operator, headers=headers)
feed.ParseFromString(response.content)
print("done")
def process_feed(feed, today):
global dict_pushpinLabel_to_desc_map
global dict_vehicleId_to_pushpinLabel_map
global lastpushpinLabel
global pushpin_url
global total_vehicles_found
#we will use the python module 'time' to convert epoch time (this is what gtfsr timestamps are in) to local time
#set the timezone for time
os.environ['TZ'] = 'AEST-10AEDT-11,M10.5.0,M3.5.0'
time.tzset()
print(f'timezone set to {time.tzname}')
num_pushpin_assigned = 0
total_feed_entity = len(feed.entity)
total_vehicles_found += total_feed_entity
print('Total feed.entity:',total_feed_entity," Total Vehicles Found:",total_vehicles_found)
#As we are using Bing maps, there is a limit to the number of pushpins we can specify for our map. If we
#reach this limit, we will break out of the loop below as there is no benefit for continuing on processing
for entity in feed.entity:
if num_pushpin_assigned >= maps_bing_pushpin_limit:
break #we haev exceeded the number of pushpins that can be used with Bing Maps. Exit loop
tripupdatetimestamp_autz = time.ctime(entity.vehicle.timestamp)
vehicleId = entity.vehicle.vehicle.id
#we need to make sure that the fields used for dynamodb keys are not null. If they are then skip this record. Currently these are TripDate which is
#todays date and vehicleId. So just check vehicleId for being not null and TripDate won't be null.
if (vehicleId): #only go ahead if vehicleId is present. Otherwise just print that vehicleId for this record is missing
#lets find out if this vehicle already has a pushpinLabel assigned for today
if vehicleId in dict_vehicleId_to_pushpinLabel_map.keys():
#this vehicle already has a pushpinLabel. Get the label
pushpinLabel = dict_vehicleId_to_pushpinLabel_map[vehicleId]['label']
else:
#this vehicle doesn't have any pushpinLabel already assigned. Generate a new pushpinLabel for it
lastpushpinLabel += 1 #increment the lastpushpinLabel so that it now points to a new number
pushpinLabel = lastpushpinLabel
tripId = entity.vehicle.trip.trip_id
#the pushpinLabelDesc will be set to the vehicle's label. There have been instances where I noticed the vehicle's label is missing/null.
#In these cases, set the pushpinLabelDesc to TripId
if not entity.vehicle.vehicle.label:
pushpinLabelDesc = tripId
else:
pushpinLabelDesc = entity.vehicle.vehicle.label
#since this vehicle had not been previously assigned a pushpinLabel for today, add its details to the two dict
inDynamodbTable = False #this item has not been read from or written to dynamodb yet
latitude = entity.vehicle.position.latitude
longitude = entity.vehicle.position.longitude
dict_pushpinLabel_to_desc_map[str(pushpinLabel)] = {"desc": str(pushpinLabelDesc),"vehicleId": str(vehicleId),"inDynamodbTable":inDynamodbTable,"latitude":latitude,"longitude":longitude,"tripId":tripId}
dict_vehicleId_to_pushpinLabel_map[str(vehicleId)] = {"label": str(pushpinLabel), "desc": str(pushpinLabelDesc)}
#this vehicle will be displayed on the map. Update dict_pushpinLabel_to_desc_map for this vehicle's entry so that when the legend is
#generated, it will be coloured differently to show that it is currently displayed on the map
dict_pushpinLabel_to_desc_map[str(pushpinLabel)]['isDisplayedOnMap'] = True
#add this vehicle's details to the pushpin url
pushpin_url += "pp=" + str(entity.vehicle.position.latitude) + "," + str(entity.vehicle.position.longitude)
#based on how long ago the laststopid timestamp is, calculate if the vehicle is active or inactive and respectively assign the icon
if (datetime.datetime.now() - datetime.datetime.strptime(tripupdatetimestamp_autz,'%a %b %d %H:%M:%S %Y')) > datetime.timedelta(minutes=inactive_vehicle_datetime_limit):
#this vehicle is inactive
pushpin_url += ";" + maps_inactive_vehicle_icon + ";"
else:
pushpin_url += ";" + maps_active_vehicle_icon + ";"
pushpin_url += str(pushpinLabel) + "&"
num_pushpin_assigned += 1 #increment the counter that denotes the number of pushpins added to the pushpin url
else:
print("process_feed:MissingVehicleId:VehicleWillBeSkipped:TripId:", entity.vehicle.trip.trip_id, " VehicleLabel:", entity.vehicle.vehicle.label," Latitude: ", entity.vehicle.position.latitude, " Longitude: ", entity.vehicle.position.longitude)
def generate_vehicle_position_webpage(map_url, body, s3Bucket, s3ImageKey, imagefilename, mainHtmlfilename, today):
global dict_pushpinLabel_to_desc_map
global dict_vehicleId_to_pushpinLabel_map
global lastpushpinLabel
global total_vehicles_found
#get a handle on the s3 bucket
s3 = boto3.resource('s3')
imgObject = s3.Object(s3Bucket,s3ImageKey+imagefilename)
map_webrequest = requests.post(map_url, data=body)
#check if the maps were successfully obtained.
if map_webrequest.status_code != 200:
#there was an error. show the error
print("generate_vehicle_position_webpage:Bing Map request failed")
print("generate_vehicle_position_webpage:Reason:", map_webrequest.reason)
print("generate_vehicle_position_webpage:ErrorMessage:", map_webrequest.text)
print("generate_vehicle_position_webpage:RequestBody:", body)
#as there has been an error generating the map, display an image denoting that there has been an error
map_webrequest = requests.get(maps_error_imageURL)
else:
print("generate_vehicle_position_webpage:Bing Map request was successful. Status code:",map_webrequest.status_code)
#upload the image file to S3 bucket, set it for public read and ensure content-type is image/jpeg
print("generate_vehicle_position_webpage:Uploading map to s3 bucket")
upload_img_result = imgObject.put(Body=map_webrequest.content,ACL='public-read',ContentType='image/jpeg')
print("generate_vehicle_position_webpage:Generating landing html page:",mainHtmlfilename,end="...")
#Generate the label description using all the pushpin labels for the day. The first entry will be the header field
pushpinLabels = "<li>Label - Description <font color=" + legend_vehicle_displayed_on_map_color + ">[blue label descriptions show currently displayed vehicles]</font></li>"
for index in range(1, lastpushpinLabel + 1):
pushpinLabel_desc = dict_pushpinLabel_to_desc_map[str(index)]['desc']
#the isDisplayedOnMap attribute was added later to DynamoDB so there might be items that don't have it. This ensures that a call to get this
#attribute will not break the program
try:
isDisplayedOnMap = dict_pushpinLabel_to_desc_map[str(index)]['isDisplayedOnMap']
except:
isDisplayedOnMap = False
#for all vehicles currently displayed on map, show their label description in a different color. This makes it easy to differentiate between a vehicle
#that is currently displayed and one that was previously displayed today however it is now not mapped.
if isDisplayedOnMap:
pushpinLabels += "<font color=" + legend_vehicle_displayed_on_map_color + "><li>" + str(index) + " - " + str(pushpinLabel_desc) + "</li></font>"
else:
pushpinLabels += "<li>" + str(index) + " - " + str(pushpinLabel_desc) + "</li>"
htmlObject = s3.Object(s3Bucket,mainHtmlfilename)
htmlContent= """<!DOCTYPE html>
<html>
<head>
<meta name="viewport" content="width=device-width, initial-scale=1">
<style>
body {
font-family: Arial;
color: white;
}
.split {
height: 100%;
width: 50%;
position: fixed;
z-index: 1;
top: 0;
overflow-x: hidden;
padding-top: 20px;
}
.left {
left: 0;
width: 70%;
background-color: #111;
}
.right {
right: 0;
width: 30%;
background-color: black;
}
.centered {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
text-align: center;
}
</style>
<script>
<!--
function timedRefresh(timeoutPeriod) {
setTimeout("location.reload(true);",timeoutPeriod);
}
window.onload = timedRefresh(60000);
// -->
</script>
</head>"""
htmlContent +="""
<body>
<div class="split left">
<div class="centered">
<img src={} alt="Vehicle Location Map" width="1250" height="1000">
<p> image source {}</p>
</div>
</div>
<div class="split right">
<p>Map last updated at {} [updated every 5 min].
<br>Total vehicles found {} (at most, only the first 100 will be mapped)
<br>
<br>Icon Description
<br>Blue = active vehicles (location reported within last {} minutes)
<br>Red = inactive vehicles (location last reported at least {} minutes ago)
</p>
<ul style="list-style-type:square;">""".format(s3ImageKey+imagefilename, imagefilename, imagefilename[:-4], total_vehicles_found, inactive_vehicle_datetime_limit, inactive_vehicle_datetime_limit)
htmlContent += pushpinLabels
htmlContent +="""
</ul>
</div>
</body>
</html>"""
upload_html_result = htmlObject.put(Body=htmlContent,ACL='public-read',ContentType='text/html')
print("done")
print("generate_vehicle_position_webpage:Uploaded map:",imagefilename," UploadResult:",upload_img_result)
print("generate_vehicle_position_webpage:Uploaded htmlfile:",mainHtmlfilename," UploadResult:",upload_html_result)
def update_pushpinLabelsTable(tripdate):
global lastpushpinLabel
#update the pushpinLabels dynamodb table with all the pushpinLabels that were created in this invocation
print("update_pushpinLabelsTable:Uploading new pushpinLabels to dynamodb Table")
time_now = datetime.datetime.now(tz_sydney) #each item uploaded to dynamodb table will have the time it was inserted
time_now_str = time_now.strftime("%Y-%m-%d %H:%M:%S")
epoch_time_now = time_now.timestamp()
expirationTime = int(epoch_time_now + (dynamodb_item_expiry_in_days * 24 * 3600)) #convert expiry days to seconds
print("update_pushpinLabelsTable:time_now:", time_now_str," epoch_time_now:", epoch_time_now, " expirationTime:", expirationTime)
num_items_added_to_dynamodb = 0
for index in range(1, lastpushpinLabel + 1):
pushpinLabel = index
pushpinLabelDesc = dict_pushpinLabel_to_desc_map[str(index)]['desc']
vehicleId = dict_pushpinLabel_to_desc_map[str(index)]['vehicleId']
inDynamodbTable = dict_pushpinLabel_to_desc_map[str(index)]['inDynamodbTable']
latitude = dict_pushpinLabel_to_desc_map[str(index)]['latitude']
longitude = dict_pushpinLabel_to_desc_map[str(index)]['longitude']
tripId = dict_pushpinLabel_to_desc_map[str(index)]['tripId']
#only write back to dynamodb table those entries that are new. Dynamodb items to be regarded as immutable and should not be changed.
#using TTL (which is set to attribute ExpirationTime) allows for easy cleanup of items as we don't want items longer than 24 hours as they
#are not being mapped (default expirationTime has been set to 2 days)
if (not inDynamodbTable):
try:
pushpinLabelsTable.put_item(
Item={
'TripDate': str(tripdate),
'VehicleId': str(vehicleId),
'tripId': str(tripId),
'pushpinLabel': str(pushpinLabel),
'pushpinLabelDesc': str(pushpinLabelDesc),
'latitude': str(latitude),
'longitude': str(longitude),
'TimeAdded': time_now_str,
'ExpirationTime': expirationTime
}
)
num_items_added_to_dynamodb += 1
except Exception as e:
print("update_pushpinLabelsTable:Error with put_item operation ",str(e))
print("update_pushpinLabelsTable:TripDate:",str(tripdate)," VehicleId:",str(vehicleId)," pushpinLabel:",str(pushpinLabel)," pushpinLabelDesc:",str(pushpinLabelDesc)," Latitude:",str(latitude)," Longitude:",str(longitude)," TimeAdded:",time_now_str," ExpirationTime:",expirationTime)
num_items_added_to_dynamodb += 1
print("update_pushpinLabelsTable:Uploaded ",num_items_added_to_dynamodb," new pushpinLabel(s) to dynamodb table. LastpushpinLabel:",lastpushpinLabel)
def run(event, context):
print(datetime.datetime.now(), 'Started')
today = datetime.datetime.now(tz_sydney)
initialise_global_variables()
load_pushpinLabels_from_dynamodb(today)
#loop through each operator and get their vehicle positions
for operator in operators:
callOpenData(operator, feed)
process_feed(feed, today)
rdate = today.strftime("%Y-%m-%d")
time_now = datetime.datetime.now(tz_sydney)
mapImageName = time_now.strftime("%Y-%m-%dT%H%M") + '.jpg'
#get fhe image file and upload it to s3
generate_vehicle_position_webpage(maps_bing_prefix + maps_bing_apikey,pushpin_url[:-1], bucketName, bucketImagesKey, mapImageName, HtmlLandingpagename, rdate)
update_pushpinLabelsTable(rdate)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.