Skip to content

Instantly share code, notes, and snippets.

@JeffersGlass

JeffersGlass/ctafunctions.py Secret

Created Sep 13, 2020
Embed
What would you like to do?
#ctafunctions.py
import requests
from datetime import datetime, date, timedelta
import json
from time import sleep
baseUrls = {'arrivals' : 'http://lapi.transitchicago.com/api/1.0/ttarrivals.aspx',
'follow' : 'http://lapi.transitchicago.com/api/1.0/ttfollow.aspx',
'locations' : 'http://lapi.transitchicago.com/api/1.0/ttpositions.aspx'
}
#TODO: re-add yellow line.
allRoutes = ['red', 'blue', 'brn', 'g', 'org', 'p', 'pink','y']
fullRouteNames = {'red':'red', 'blue':'blue', 'brn':'brown', 'g':'green', 'org':'orange', 'p':'purple', 'pink':'pink', 'y':'yellow'}
trackAlpha = 10
'''
routeColors = { 'red': {'train':(255,0,0), 'track':(127,0,0,trackAlpha)},
'blue': {'train':(0,0,200), 'track':(0,0,100,trackAlpha)},
'brown': {'train':(102,59,27), 'track':(50,30,15,trackAlpha)},
'green': {'train':(0,120,10), 'track':(0,60,5,trackAlpha)},
'orange': {'train':(255,100,2), 'track':(127,50,1,trackAlpha)},
'purple': {'train':(200,0,255), 'track':(100,0,127,trackAlpha)},
'pink': {'train':(226,126,166), 'track':(108,66,84,trackAlpha)},
'yellow': {'train':(255,255,0), 'track':(127,127,0,trackAlpha)}
}'''
routeColors = { 'red': {'train':(255,0,0), 'track':(127,0,0)},
'blue': {'train':(0,0,200), 'track':(0,0,100)},
'brown': {'train':(102,59,27), 'track':(50,30,15)},
'green': {'train':(0,190,10), 'track':(0,90,5)},
'orange': {'train':(255,100,2), 'track':(127,50,1)},
'purple': {'train':(200,0,255), 'track':(100,0,127)},
'pink': {'train':(226,126,166), 'track':(108,66,84)},
'yellow': {'train':(255,255,0), 'track':(127,127,0)}
}
#returns a list of dictionarys with keys {route, lat, lon}
#TODO: Deal with lines that have no trains
def getAllTrainLocations(key):
payload = {'key': key, 'rt':','.join(allRoutes), 'outputType':'JSON'}
r = requests.get(baseUrls['locations'], params = payload)
j = json.loads(r.text)
routes = j['ctatt']['route']
myTrains = list()
for r in routes:
routeName = fullRouteNames[r['@name']]
#print("-------" +routeName + "-------")
if 'train' in r:
for t in (r['train']):
#print(t)
if isinstance(t, dict):
if t['lat'] != '0' and t['lon'] != 0:
myTrains.append({'route':routeName, 'lat': float(t['lat']), 'lon': float(t['lon'])})
else:
pass
#print ("Train not in route " + routeName)
#print(r)
return myTrains
def getMaxLatLon(key, scanPeriod = 30, interval = 15):
maxLat = -90.0
maxLon = -90.0
minLat = 90.0
minLon = 180.0
startTime = datetime.now()
endTime = startTime + timedelta(minutes=scanPeriod)
while (datetime.now() < endTime):
print ("Checking trains at " + str(datetime.now()))
try:
currentTrains = getAllTrainLocations(key)
except TypeError as e:
print("Oops there was an error at " + str(datetime.now()))
print(e)
print(str({'minLon':minLon, 'maxLon':maxLon, 'minLat':minLat, 'maxLat':maxLat}))
for t in currentTrains:
if float(t['lon']) == 0 or float(t['lat']) == 0:
pass
else:
if float(t['lon']) < float(minLon): minLon = t['lon']; print("New Minimum Longitude: " + t['route'] + " line train at " + str(minLon))
if float(t['lon']) > float(maxLon): maxLon = t['lon']; print("New Maximum Longitude: " + t['route'] + " line train at " + str(maxLon))
if float(t['lat']) < float(minLat): minLat = t['lat']; print("New Minimum Latitude: " + t['route'] + " line train at " + str(minLat))
if float(t['lat']) > float(maxLat): maxLat = t['lat']; print("New Maximum Latitude: " + t['route'] + " line train at " + str(maxLat))
sleep(interval)
return {'minLon':minLon, 'maxLon':maxLon, 'minLat':minLat, 'maxLat':maxLat}
def getRoutePoints(key, scanPeriod = 30, interval = 15):
startTime = datetime.now()
endTime = startTime + timedelta(minutes=scanPeriod)
pointsList = list()
while (datetime.now() < endTime):
print ("Checking trains at " + str(datetime.now()))
try:
currentTrains = getAllTrainLocations(key)
except TypeError as e:
print("Oops there was an error at " + str(datetime.now()))
print(e)
print(str({'minLon':minLon, 'maxLon':maxLon, 'minLat':minLat, 'maxLat':maxLat}))
for t in currentTrains:
if float(t['lon']) == 0 or float(t['lat']) == 0:
pass
else:
point = {'route':t['route'], 'lon':float(t['lon']), 'lat':float(t['lat'])}
if point not in pointsList:
pointsList.append(point)
print("Adding point to list: " + str(point))
else:
print("Point " + str(point) + " is already in list")
sleep(interval)
return pointsList
def getArrivals(key, stopId):
payload = {'key': key, 'mapid': stopId, 'outputType':'JSON'}
r = requests.get(baseUrls['arrivals'], params = payload)
c = r.json()['ctatt']['eta']
colString = "{:<30}{:<20}{:<20}"
print (colString.format("Direction", "Arrival Time", "Time From Now"))
print ("------")
for t in c:
arrivalTime = datetime.strptime(t['arrT'], "%Y-%m-%dT%H:%M:%S")
direction = t['stpDe']
arrTime = str(arrivalTime.time())
timeFromNow = str(arrivalTime - datetime.now())[2:7]
print (colString.format(direction, arrTime, timeFromNow))
def getLocations(key, rt):
myUrl = baseUrls['locations'] + "?key=" + key
routeUrl = myUrl + "&rt=" + str(rt) + "&outputType=JSON"
myRouteData = dict()
r = requests.get(routeUrl)
c = r.json()['ctatt']['route']
for route in c:
print ("=== " + route['@name'] + " Line ===")
colString = "{:<10}{:<20}{:<30}"
print (colString.format("Ride #", "Destination", "Next Station"))
for t in route:
print (t)# (colString.format(t['rn'], t['destNm'], t['nextStaNm']))
import pygame
import pickle
import humanize
from os import path
from ctafunctions import *
from arcgisfunctions import *
from time import sleep
class Map():
def __init__(self, extents, mHeight, mWidth):
self.extents = extents
self.mHeight = mHeight
self.mWidth = mWidth
self.surf = pygame.Surface((mWidth, mHeight))
def getMapPoint(self, lat, lon):
#if lat < self.extents['minLat'] or lat > self.extents['maxLat'] or lon < self.extents['minLon'] or lon > self.extents['maxLon']:
#return None
#else:
x = int((lon - self.extents['minLon']) * (self.mWidth) / (self.extents['maxLon'] - self.extents['minLon']) + 0)
y = int((lat - self.extents['minLat']) * (-self.mHeight) / (self.extents['maxLat'] - self.extents['minLat']) + self.mHeight)
return (x, y)
def getMapPoints(self, inList):
rList = list()
for i in inList:
rList.append(self.getMapPoint(lat=i[1], lon=i[0]))
return rList
def drawPoint(self, lat, lon, color=(127,127,127), radius = 0, width = 0):
point = self.getMapPoint(lat=lat, lon=lon)
if point != None:
center = (int(point[0]), int(point[1]))
pygame.draw.circle(self.surf, color, center, radius, width)
else:
print (f"Could not draw point at {lat},{lon}")
def main():
with open('cta.keys', "r") as keyfile:
key = keyfile.readline()
if path.exists("extents.pkl"):
with open('extents.pkl', 'rb') as f:
extents = pickle.load(f)
else:
extents = {'minLon':-87.90422, 'maxLon':-87.60586, 'minLat':41.70, 'maxLat':42.08}
#figure out map aspect ratio
diffLon = extents['maxLon'] - extents['minLon']
diffLat = extents['maxLat'] - extents['minLat']
mHeight = 800
lakeWidth = 100
mWidth = round(mHeight * (diffLon/diffLat))
padding = 30
RESCAN = False
EXPORT = True
SCANTIME = 60
UPDATE_SECONDS=10
background = (20,20,20)
keyColor = (0,255,0)
waterColor = (32, 171, 199)
freewayColor = (140,140,140)
if RESCAN and path.exists("routepoints.pkl"):
rp = getRoutePoints(key, scanPeriod=SCANTIME)
with open("routepoints.pkl", "wb") as f:
pickle.dump(rp, f, 0)
else:
with open("routepoints.pkl", "rb") as f:
rp = pickle.load(f)
pygame.init()
screen = pygame.display.set_mode((mWidth+padding*2+lakeWidth,mHeight+padding*2))
pygame.display.set_caption('CTA Train Map')
screen.fill((33,33,33))
myMap = Map(extents, mHeight, mWidth)
#while(True):
myMap.surf.set_colorkey(keyColor)
myMap.surf.fill(keyColor)
lake = pygame.Surface((mWidth+lakeWidth, mHeight))
lake.fill(background)
lake.set_colorkey(keyColor)
for r in getLakeGeometry():
#print("=====New Ring=====")
#print(myMap.getMapPoints(r))
pygame.draw.polygon(lake,waterColor, myMap.getMapPoints(r))
for r in getRiverList():
for p in r:
pygame.draw.polygon(lake,waterColor,myMap.getMapPoints(p))
screen.blit(lake, (padding, padding))
freeways = pygame.Surface((mWidth, mHeight))
freeways.fill(keyColor)
freeways.set_colorkey(keyColor)
for f in getFreeways():
point = myMap.getMapPoint(lat = float(f[1]), lon=float(f[0]))
pygame.draw.circle(freeways, freewayColor, point, 0)
screen.blit(freeways, (padding, padding))
#draw tracks from GPS lat/lon data
for t in rp:
if t != None:
myMap.drawPoint(lon=t['lon'], lat=t['lat'], radius=2, color=routeColors[t['route']]['track'])
screen.blit(myMap.surf, (padding, padding))
pygame.display.flip()
#sleep(1)
#draw currentTrainPositions
for l in getAllTrainLocations(key):
if l != None:
myMap.drawPoint(lon=l['lon'], lat=l['lat'], radius=6, color=routeColors[l['route']]['train'], width=2)
screen.blit(myMap.surf, (padding, padding))
#sleep(.05)
pygame.display.flip()
sleep(UPDATE_SECONDS)
n = datetime.now()
dateString = '-'.join([str(n.year),str(n.month),str(n.day),str(n.hour),str(n.minute),str(n.second)])
if (EXPORT): pygame.image.save(screen, "mapImages/TrainMap-" + dateString + ".png")
#myMap.drawPoint(lon=-87.8, lat=41.9, radius=2)
#screen.blit(myMap.surf, (10, 10))
#pygame.display.flip()
sleep(1)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.