Skip to content

Instantly share code, notes, and snippets.

@AlexArcPy
Created August 7, 2017 10:45
Show Gist options
  • Save AlexArcPy/78232f105ac11f6cdc5857dc6c77b14d to your computer and use it in GitHub Desktop.
Save AlexArcPy/78232f105ac11f6cdc5857dc6c77b14d to your computer and use it in GitHub Desktop.
Generate a video street view stream based on a route polyline feature class using Mapillary API
'''
* generate a list of (x,y) for points along the route polyline feature class
with the set interval (using arcpy);
* save image from the API for each point trying to find the closest one as well
as the one looking at the next point in the route (using requests);
* merge the images into an output avi video file (using opencv);
'''
import os
import json
import requests
import arcpy
import cv2
WKID = 4326
CLIENT_ID = 'YOUR_CLIENT_ID'
IMAGES_FOLDER = r'C:\GIS\temp\RouteVision'
ROUTE = r'C:\GIS\Temp\ArcGISHomeFolder\scratch.gdb\MapillaryRoute'
IMAGES_URL = "https://a.mapillary.com/v3/images?client_id={client_id}".format(
client_id=CLIENT_ID)
#----------------------------------------------------------------------
def get_points_along_line(input_line_feature, interval, number_of_points=None):
"""given a single polyline feature shape will return a list of point features
positioned along the line at set interval"""
out_pnts = []
intervals_list = []
if number_of_points:
interval = input_line_feature.length / number_of_points
i = 0
while i < input_line_feature.length:
intervals_list.append(i)
out_pnts.append(input_line_feature.positionAlongLine(i))
i += interval
out_pnts.append(
input_line_feature.positionAlongLine(input_line_feature.length))
intervals_list.append(input_line_feature.length)
return out_pnts
#----------------------------------------------------------------------
def get_coords_along_line(interval=50):
"""get list of (x,y) for every point along the line with the defined interval"""
line_geometry = [g[0] for g in arcpy.da.SearchCursor(ROUTE, 'SHAPE@')][0]
points = get_points_along_line(line_geometry, interval)
return [(pnt.projectAs(arcpy.SpatialReference(WKID)).firstPoint.X,
pnt.projectAs(arcpy.SpatialReference(WKID)).firstPoint.Y)
for pnt in points]
#----------------------------------------------------------------------
def get_closest_feature(point, next_point):
"""return a geoJSON feature for the closest image to the input point"""
search_radius = 10
search_step = 10
res = requests.get(
IMAGES_URL,
params={
'closeto': point,
'lookat': next_point,
'radius': search_radius
})
while not res.json()['features']:
try:
print(
"Could not find image in radius of {0}m, will search with radius of {1}m".
format(search_radius, search_radius + search_step))
search_radius += search_step
res = requests.get(
IMAGES_URL,
params={
'closeto': point,
'lookat': next_point,
'radius': search_radius
})
search_step += 10
except:
print("Could not read image. Reason: ", res)
return None
search_radius = 10
search_step = 10
return res.json()['features'][0]
#----------------------------------------------------------------------
def get_image_from_key(key):
"""return the image source file for the image key"""
url = 'https://d1cuyjsrcm0gby.cloudfront.net/{key}/thumb-1024.jpg'.format(
key=key)
return requests.get(url)
#----------------------------------------------------------------------
def save_images_from_coords(points_coords):
"""download and save images for every (x,y) pair in the point list"""
for idx, point in enumerate(points_coords, 1):
try:
next_point = points_coords[idx]
except:
next_point = None
closest_feature = get_closest_feature(point, next_point)
if closest_feature:
image_data = get_image_from_key(
closest_feature['properties']['key'])
with open(
os.path.join(IMAGES_FOLDER, '{0}.jpg'.format(idx)),
'wb') as f:
f.write(image_data.content)
#----------------------------------------------------------------------
def images_folder_to_video(input_folder, output_avi_path):
"""create an video file joining the input images"""
fourcc = cv2.VideoWriter_fourcc('M', 'J', 'P', 'G')
height, width, cols = cv2.imread(
os.path.join(input_folder, os.listdir(input_folder)[0])).shape
out = cv2.VideoWriter(output_avi_path, fourcc, 2, (width, height))
for i in xrange(1, len(os.listdir(input_folder)) + 1):
img = cv2.imread(os.path.join(input_folder, '{0}.jpg'.format(i)))
out.write(img)
cv2.destroyAllWindows()
out.release()
if __name__ == '__main__':
points_coords = get_coords_along_line()
save_images_from_coords(points_coords)
images_folder_to_video(
IMAGES_FOLDER, output_avi_path=r'C:\GIS\Temp\output.avi')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment