-
-
Save JeffersGlass/d45610bb0b03366bee3d9117d73c6a77 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ctafunctions.py | |
import requests | |
from datetime import datetime, date, timedelta | |
import json | |
from time import sleep | |
baseUrls = {'arrivals' : 'http://lapi.transitchicago.com/api/1.0/ttarrivals.aspx', | |
'follow' : 'http://lapi.transitchicago.com/api/1.0/ttfollow.aspx', | |
'locations' : 'http://lapi.transitchicago.com/api/1.0/ttpositions.aspx' | |
} | |
#TODO: re-add yellow line. | |
allRoutes = ['red', 'blue', 'brn', 'g', 'org', 'p', 'pink','y'] | |
fullRouteNames = {'red':'red', 'blue':'blue', 'brn':'brown', 'g':'green', 'org':'orange', 'p':'purple', 'pink':'pink', 'y':'yellow'} | |
trackAlpha = 10 | |
''' | |
routeColors = { 'red': {'train':(255,0,0), 'track':(127,0,0,trackAlpha)}, | |
'blue': {'train':(0,0,200), 'track':(0,0,100,trackAlpha)}, | |
'brown': {'train':(102,59,27), 'track':(50,30,15,trackAlpha)}, | |
'green': {'train':(0,120,10), 'track':(0,60,5,trackAlpha)}, | |
'orange': {'train':(255,100,2), 'track':(127,50,1,trackAlpha)}, | |
'purple': {'train':(200,0,255), 'track':(100,0,127,trackAlpha)}, | |
'pink': {'train':(226,126,166), 'track':(108,66,84,trackAlpha)}, | |
'yellow': {'train':(255,255,0), 'track':(127,127,0,trackAlpha)} | |
}''' | |
routeColors = { 'red': {'train':(255,0,0), 'track':(127,0,0)}, | |
'blue': {'train':(0,0,200), 'track':(0,0,100)}, | |
'brown': {'train':(102,59,27), 'track':(50,30,15)}, | |
'green': {'train':(0,190,10), 'track':(0,90,5)}, | |
'orange': {'train':(255,100,2), 'track':(127,50,1)}, | |
'purple': {'train':(200,0,255), 'track':(100,0,127)}, | |
'pink': {'train':(226,126,166), 'track':(108,66,84)}, | |
'yellow': {'train':(255,255,0), 'track':(127,127,0)} | |
} | |
#returns a list of dictionarys with keys {route, lat, lon} | |
#TODO: Deal with lines that have no trains | |
def getAllTrainLocations(key): | |
payload = {'key': key, 'rt':','.join(allRoutes), 'outputType':'JSON'} | |
r = requests.get(baseUrls['locations'], params = payload) | |
j = json.loads(r.text) | |
routes = j['ctatt']['route'] | |
myTrains = list() | |
for r in routes: | |
routeName = fullRouteNames[r['@name']] | |
#print("-------" +routeName + "-------") | |
if 'train' in r: | |
for t in (r['train']): | |
#print(t) | |
if isinstance(t, dict): | |
if t['lat'] != '0' and t['lon'] != 0: | |
myTrains.append({'route':routeName, 'lat': float(t['lat']), 'lon': float(t['lon'])}) | |
else: | |
pass | |
#print ("Train not in route " + routeName) | |
#print(r) | |
return myTrains | |
def getMaxLatLon(key, scanPeriod = 30, interval = 15): | |
maxLat = -90.0 | |
maxLon = -90.0 | |
minLat = 90.0 | |
minLon = 180.0 | |
startTime = datetime.now() | |
endTime = startTime + timedelta(minutes=scanPeriod) | |
while (datetime.now() < endTime): | |
print ("Checking trains at " + str(datetime.now())) | |
try: | |
currentTrains = getAllTrainLocations(key) | |
except TypeError as e: | |
print("Oops there was an error at " + str(datetime.now())) | |
print(e) | |
print(str({'minLon':minLon, 'maxLon':maxLon, 'minLat':minLat, 'maxLat':maxLat})) | |
for t in currentTrains: | |
if float(t['lon']) == 0 or float(t['lat']) == 0: | |
pass | |
else: | |
if float(t['lon']) < float(minLon): minLon = t['lon']; print("New Minimum Longitude: " + t['route'] + " line train at " + str(minLon)) | |
if float(t['lon']) > float(maxLon): maxLon = t['lon']; print("New Maximum Longitude: " + t['route'] + " line train at " + str(maxLon)) | |
if float(t['lat']) < float(minLat): minLat = t['lat']; print("New Minimum Latitude: " + t['route'] + " line train at " + str(minLat)) | |
if float(t['lat']) > float(maxLat): maxLat = t['lat']; print("New Maximum Latitude: " + t['route'] + " line train at " + str(maxLat)) | |
sleep(interval) | |
return {'minLon':minLon, 'maxLon':maxLon, 'minLat':minLat, 'maxLat':maxLat} | |
def getRoutePoints(key, scanPeriod = 30, interval = 15): | |
startTime = datetime.now() | |
endTime = startTime + timedelta(minutes=scanPeriod) | |
pointsList = list() | |
while (datetime.now() < endTime): | |
print ("Checking trains at " + str(datetime.now())) | |
try: | |
currentTrains = getAllTrainLocations(key) | |
except TypeError as e: | |
print("Oops there was an error at " + str(datetime.now())) | |
print(e) | |
print(str({'minLon':minLon, 'maxLon':maxLon, 'minLat':minLat, 'maxLat':maxLat})) | |
for t in currentTrains: | |
if float(t['lon']) == 0 or float(t['lat']) == 0: | |
pass | |
else: | |
point = {'route':t['route'], 'lon':float(t['lon']), 'lat':float(t['lat'])} | |
if point not in pointsList: | |
pointsList.append(point) | |
print("Adding point to list: " + str(point)) | |
else: | |
print("Point " + str(point) + " is already in list") | |
sleep(interval) | |
return pointsList | |
def getArrivals(key, stopId): | |
payload = {'key': key, 'mapid': stopId, 'outputType':'JSON'} | |
r = requests.get(baseUrls['arrivals'], params = payload) | |
c = r.json()['ctatt']['eta'] | |
colString = "{:<30}{:<20}{:<20}" | |
print (colString.format("Direction", "Arrival Time", "Time From Now")) | |
print ("------") | |
for t in c: | |
arrivalTime = datetime.strptime(t['arrT'], "%Y-%m-%dT%H:%M:%S") | |
direction = t['stpDe'] | |
arrTime = str(arrivalTime.time()) | |
timeFromNow = str(arrivalTime - datetime.now())[2:7] | |
print (colString.format(direction, arrTime, timeFromNow)) | |
def getLocations(key, rt): | |
myUrl = baseUrls['locations'] + "?key=" + key | |
routeUrl = myUrl + "&rt=" + str(rt) + "&outputType=JSON" | |
myRouteData = dict() | |
r = requests.get(routeUrl) | |
c = r.json()['ctatt']['route'] | |
for route in c: | |
print ("=== " + route['@name'] + " Line ===") | |
colString = "{:<10}{:<20}{:<30}" | |
print (colString.format("Ride #", "Destination", "Next Station")) | |
for t in route: | |
print (t)# (colString.format(t['rn'], t['destNm'], t['nextStaNm'])) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pygame | |
import pickle | |
import humanize | |
from os import path | |
from ctafunctions import * | |
from arcgisfunctions import * | |
from time import sleep | |
class Map(): | |
def __init__(self, extents, mHeight, mWidth): | |
self.extents = extents | |
self.mHeight = mHeight | |
self.mWidth = mWidth | |
self.surf = pygame.Surface((mWidth, mHeight)) | |
def getMapPoint(self, lat, lon): | |
#if lat < self.extents['minLat'] or lat > self.extents['maxLat'] or lon < self.extents['minLon'] or lon > self.extents['maxLon']: | |
#return None | |
#else: | |
x = int((lon - self.extents['minLon']) * (self.mWidth) / (self.extents['maxLon'] - self.extents['minLon']) + 0) | |
y = int((lat - self.extents['minLat']) * (-self.mHeight) / (self.extents['maxLat'] - self.extents['minLat']) + self.mHeight) | |
return (x, y) | |
def getMapPoints(self, inList): | |
rList = list() | |
for i in inList: | |
rList.append(self.getMapPoint(lat=i[1], lon=i[0])) | |
return rList | |
def drawPoint(self, lat, lon, color=(127,127,127), radius = 0, width = 0): | |
point = self.getMapPoint(lat=lat, lon=lon) | |
if point != None: | |
center = (int(point[0]), int(point[1])) | |
pygame.draw.circle(self.surf, color, center, radius, width) | |
else: | |
print (f"Could not draw point at {lat},{lon}") | |
def main(): | |
with open('cta.keys', "r") as keyfile: | |
key = keyfile.readline() | |
if path.exists("extents.pkl"): | |
with open('extents.pkl', 'rb') as f: | |
extents = pickle.load(f) | |
else: | |
extents = {'minLon':-87.90422, 'maxLon':-87.60586, 'minLat':41.70, 'maxLat':42.08} | |
#figure out map aspect ratio | |
diffLon = extents['maxLon'] - extents['minLon'] | |
diffLat = extents['maxLat'] - extents['minLat'] | |
mHeight = 800 | |
lakeWidth = 100 | |
mWidth = round(mHeight * (diffLon/diffLat)) | |
padding = 30 | |
RESCAN = False | |
EXPORT = True | |
SCANTIME = 60 | |
UPDATE_SECONDS=10 | |
background = (20,20,20) | |
keyColor = (0,255,0) | |
waterColor = (32, 171, 199) | |
freewayColor = (140,140,140) | |
if RESCAN and path.exists("routepoints.pkl"): | |
rp = getRoutePoints(key, scanPeriod=SCANTIME) | |
with open("routepoints.pkl", "wb") as f: | |
pickle.dump(rp, f, 0) | |
else: | |
with open("routepoints.pkl", "rb") as f: | |
rp = pickle.load(f) | |
pygame.init() | |
screen = pygame.display.set_mode((mWidth+padding*2+lakeWidth,mHeight+padding*2)) | |
pygame.display.set_caption('CTA Train Map') | |
screen.fill((33,33,33)) | |
myMap = Map(extents, mHeight, mWidth) | |
#while(True): | |
myMap.surf.set_colorkey(keyColor) | |
myMap.surf.fill(keyColor) | |
lake = pygame.Surface((mWidth+lakeWidth, mHeight)) | |
lake.fill(background) | |
lake.set_colorkey(keyColor) | |
for r in getLakeGeometry(): | |
#print("=====New Ring=====") | |
#print(myMap.getMapPoints(r)) | |
pygame.draw.polygon(lake,waterColor, myMap.getMapPoints(r)) | |
for r in getRiverList(): | |
for p in r: | |
pygame.draw.polygon(lake,waterColor,myMap.getMapPoints(p)) | |
screen.blit(lake, (padding, padding)) | |
freeways = pygame.Surface((mWidth, mHeight)) | |
freeways.fill(keyColor) | |
freeways.set_colorkey(keyColor) | |
for f in getFreeways(): | |
point = myMap.getMapPoint(lat = float(f[1]), lon=float(f[0])) | |
pygame.draw.circle(freeways, freewayColor, point, 0) | |
screen.blit(freeways, (padding, padding)) | |
#draw tracks from GPS lat/lon data | |
for t in rp: | |
if t != None: | |
myMap.drawPoint(lon=t['lon'], lat=t['lat'], radius=2, color=routeColors[t['route']]['track']) | |
screen.blit(myMap.surf, (padding, padding)) | |
pygame.display.flip() | |
#sleep(1) | |
#draw currentTrainPositions | |
for l in getAllTrainLocations(key): | |
if l != None: | |
myMap.drawPoint(lon=l['lon'], lat=l['lat'], radius=6, color=routeColors[l['route']]['train'], width=2) | |
screen.blit(myMap.surf, (padding, padding)) | |
#sleep(.05) | |
pygame.display.flip() | |
sleep(UPDATE_SECONDS) | |
n = datetime.now() | |
dateString = '-'.join([str(n.year),str(n.month),str(n.day),str(n.hour),str(n.minute),str(n.second)]) | |
if (EXPORT): pygame.image.save(screen, "mapImages/TrainMap-" + dateString + ".png") | |
#myMap.drawPoint(lon=-87.8, lat=41.9, radius=2) | |
#screen.blit(myMap.surf, (10, 10)) | |
#pygame.display.flip() | |
sleep(1) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment