Last active
July 17, 2019 13:01
-
-
Save marcgpuig/8317fdb8ffd2a01f7156417d29d05857 to your computer and use it in GitHub Desktop.
Example of a client code for CARLA simulator that allows you to render the position of each car.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Copyright (c) 2017 Computer Vision Center (CVC) at the Universitat Autonoma de | |
# Barcelona (UAB). | |
# | |
# This work is licensed under the terms of the MIT license. | |
# For a copy, see <https://opensource.org/licenses/MIT>. | |
# Keyboard controlling for CARLA. Please refer to client_example.py for a simpler | |
# and more documented example. | |
""" | |
Welcome to CARLA manual control. | |
Use ARROWS or WASD keys for control. | |
W : throttle | |
S : brake | |
AD : steer | |
Q : toggle reverse | |
Space : hand-brake | |
P : toggle autopilot | |
R : restart level | |
STARTING in a moment... | |
""" | |
from __future__ import print_function | |
import argparse | |
import logging | |
import random | |
import time | |
import math | |
import colorsys | |
try: | |
import pygame | |
from pygame.locals import K_DOWN | |
from pygame.locals import K_LEFT | |
from pygame.locals import K_RIGHT | |
from pygame.locals import K_SPACE | |
from pygame.locals import K_UP | |
from pygame.locals import K_a | |
from pygame.locals import K_d | |
from pygame.locals import K_p | |
from pygame.locals import K_q | |
from pygame.locals import K_r | |
from pygame.locals import K_s | |
from pygame.locals import K_w | |
except ImportError: | |
raise RuntimeError('cannot import pygame, make sure pygame package is installed') | |
try: | |
import numpy as np | |
from numpy.linalg import pinv, inv | |
except ImportError: | |
raise RuntimeError('cannot import numpy, make sure numpy package is installed') | |
from carla import image_converter | |
from carla import sensor | |
from carla.client import make_carla_client, VehicleControl | |
from carla.planner.map import CarlaMap | |
from carla.settings import CarlaSettings | |
from carla.tcp import TCPConnectionError | |
from carla.util import print_over_same_line | |
from carla.transform import Transform | |
WINDOW_WIDTH = 800 | |
WINDOW_HEIGHT = 600 | |
MINI_WINDOW_WIDTH = 320 | |
MINI_WINDOW_HEIGHT = 180 | |
WINDOW_WIDTH_HALF = WINDOW_WIDTH / 2 | |
WINDOW_HEIGHT_HALF = WINDOW_HEIGHT / 2 | |
# np.set_printoptions(precision=2, suppress=True) | |
np.set_printoptions(suppress=True) | |
def point_in_canvas(pos): | |
"""Return true if point is in canvas""" | |
if (pos[0] >= 0) and (pos[0] < WINDOW_HEIGHT) and (pos[1] >= 0) and (pos[1] < WINDOW_WIDTH): | |
return True | |
return False | |
def draw_rect(array, pos, size, color=(255, 0, 255)): | |
"""Draws a rect""" | |
point_0 = (pos[0]-size/2, pos[1]-size/2) | |
point_1 = (pos[0]+size/2, pos[1]+size/2) | |
if point_in_canvas(point_0) and point_in_canvas(point_1): | |
for i in range(size): | |
for j in range(size): | |
array[int(point_0[0]+i), int(point_0[1]+j)] = color | |
def rand_color(seed): | |
"""Return random color based on a seed""" | |
random.seed(seed) | |
col = colorsys.hls_to_rgb(random.random(), random.uniform(.2, .8), 1.0) | |
return (int(col[0]*255), int(col[1]*255), int(col[2]*255)) | |
def make_carla_settings(args): | |
"""Make a CarlaSettings object with the settings we need.""" | |
settings = CarlaSettings() | |
settings.set( | |
SynchronousMode=False, | |
SendNonPlayerAgentsInfo=True, | |
NumberOfVehicles=35, | |
NumberOfPedestrians=15, | |
WeatherId=random.choice([1, 3, 7, 8, 14]), | |
QualityLevel=args.quality_level) | |
settings.randomize_seeds() | |
camera0 = sensor.Camera('CameraRGB') | |
camera0.set_image_size(WINDOW_WIDTH, WINDOW_HEIGHT) | |
camera0.set_position(2.0, 0.0, 1.4) | |
camera0.set_rotation(0.0, 0.0, 0.0) | |
settings.add_sensor(camera0) | |
if args.lidar: | |
lidar = sensor.Lidar('Lidar32') | |
lidar.set_position(0, 0, 2.5) | |
lidar.set_rotation(0, 0, 0) | |
lidar.set( | |
Channels=32, | |
Range=50, | |
PointsPerSecond=100000, | |
RotationFrequency=10, | |
UpperFovLimit=10, | |
LowerFovLimit=-30) | |
settings.add_sensor(lidar) | |
# (Intrinsic) K Matrix | |
k = np.identity(3) | |
k[0, 2] = WINDOW_WIDTH_HALF | |
k[1, 2] = WINDOW_HEIGHT_HALF | |
k[0, 0] = k[1, 1] = WINDOW_WIDTH / \ | |
(2.0 * math.tan(90.0 * math.pi / 360.0)) | |
camera_to_car_transform = camera0.get_unreal_transform() | |
# camera_to_car_transform = camera0.get_transform() | |
return settings, k, camera_to_car_transform | |
class Timer(object): | |
def __init__(self): | |
self.step = 0 | |
self._lap_step = 0 | |
self._lap_time = time.time() | |
def tick(self): | |
self.step += 1 | |
def lap(self): | |
self._lap_step = self.step | |
self._lap_time = time.time() | |
def ticks_per_second(self): | |
return float(self.step - self._lap_step) / self.elapsed_seconds_since_lap() | |
def elapsed_seconds_since_lap(self): | |
return time.time() - self._lap_time | |
class CarlaGame(object): | |
def __init__(self, carla_client, args): | |
self.client = carla_client | |
self._carla_settings, self._intrinsic, self._camera_to_car_transform = make_carla_settings(args) | |
self._timer = None | |
self._display = None | |
self._main_image = None | |
self._mini_view_image1 = None | |
self._mini_view_image2 = None | |
self._enable_autopilot = args.autopilot | |
self._lidar_measurement = None | |
self._map_view = None | |
self._is_on_reverse = False | |
self._city_name = args.map_name | |
self._map = CarlaMap(self._city_name, 16.43, 50.0) if self._city_name is not None else None | |
self._map_shape = self._map.map_image.shape if self._city_name is not None else None | |
self._map_view = self._map.get_map(WINDOW_HEIGHT) if self._city_name is not None else None | |
self._position = None | |
self._agent_positions = None | |
self._measurements = None | |
self._extrinsic = None | |
def execute(self): | |
"""Launch the PyGame.""" | |
pygame.init() | |
self._initialize_game() | |
try: | |
while True: | |
for event in pygame.event.get(): | |
if event.type == pygame.QUIT: | |
return | |
self._on_loop() | |
self._on_render() | |
finally: | |
pygame.quit() | |
def _initialize_game(self): | |
if self._city_name is not None: | |
self._display = pygame.display.set_mode( | |
(WINDOW_WIDTH + int((WINDOW_HEIGHT/float(self._map.map_image.shape[0]))*self._map.map_image.shape[1]), WINDOW_HEIGHT), | |
pygame.HWSURFACE | pygame.DOUBLEBUF) | |
else: | |
self._display = pygame.display.set_mode( | |
(WINDOW_WIDTH, WINDOW_HEIGHT), | |
pygame.HWSURFACE | pygame.DOUBLEBUF) | |
logging.debug('pygame started') | |
self._on_new_episode() | |
def _on_new_episode(self): | |
self._carla_settings.randomize_seeds() | |
self._carla_settings.randomize_weather() | |
scene = self.client.load_settings(self._carla_settings) | |
number_of_player_starts = len(scene.player_start_spots) | |
player_start = np.random.randint(number_of_player_starts) | |
print('Starting new episode...') | |
self.client.start_episode(player_start) | |
self._timer = Timer() | |
self._is_on_reverse = False | |
def _on_loop(self): | |
self._timer.tick() | |
measurements, sensor_data = self.client.read_data() | |
# (Extrinsic) Rt Matrix | |
# (Camera) local 3d to world 3d. | |
# Get the transform from the player protobuf transformation. | |
world_transform = Transform( | |
measurements.player_measurements.transform | |
) | |
# Compute the final transformation matrix. | |
self._extrinsic = world_transform * self._camera_to_car_transform | |
self._measurements = measurements | |
self._main_image = sensor_data.get('CameraRGB', None) | |
self._lidar_measurement = sensor_data.get('Lidar32', None) | |
# Print measurements every second. | |
if self._timer.elapsed_seconds_since_lap() > 1.0: | |
if self._city_name is not None: | |
# Function to get car position on map. | |
map_position = self._map.convert_to_pixel([ | |
measurements.player_measurements.transform.location.x, | |
measurements.player_measurements.transform.location.y, | |
measurements.player_measurements.transform.location.z]) | |
# Function to get orientation of the road car is in. | |
lane_orientation = self._map.get_lane_orientation([ | |
measurements.player_measurements.transform.location.x, | |
measurements.player_measurements.transform.location.y, | |
measurements.player_measurements.transform.location.z]) | |
self._print_player_measurements_map( | |
measurements.player_measurements, | |
map_position, | |
lane_orientation) | |
else: | |
self._print_player_measurements(measurements.player_measurements) | |
# Plot position on the map as well. | |
self._timer.lap() | |
control = self._get_keyboard_control(pygame.key.get_pressed()) | |
# Set the player position | |
if self._city_name is not None: | |
self._position = self._map.convert_to_pixel([ | |
measurements.player_measurements.transform.location.x, | |
measurements.player_measurements.transform.location.y, | |
measurements.player_measurements.transform.location.z]) | |
self._agent_positions = measurements.non_player_agents | |
if control is None: | |
self._on_new_episode() | |
elif self._enable_autopilot: | |
self.client.send_control(measurements.player_measurements.autopilot_control) | |
else: | |
self.client.send_control(control) | |
def _get_keyboard_control(self, keys): | |
""" | |
Return a VehicleControl message based on the pressed keys. Return None | |
if a new episode was requested. | |
""" | |
if keys[K_r]: | |
return None | |
control = VehicleControl() | |
if keys[K_LEFT] or keys[K_a]: | |
control.steer = -1.0 | |
if keys[K_RIGHT] or keys[K_d]: | |
control.steer = 1.0 | |
if keys[K_UP] or keys[K_w]: | |
control.throttle = 1.0 | |
if keys[K_DOWN] or keys[K_s]: | |
control.brake = 1.0 | |
if keys[K_SPACE]: | |
control.hand_brake = True | |
if keys[K_q]: | |
self._is_on_reverse = not self._is_on_reverse | |
if keys[K_p]: | |
self._enable_autopilot = not self._enable_autopilot | |
control.reverse = self._is_on_reverse | |
return control | |
def _print_player_measurements_map( | |
self, | |
player_measurements, | |
map_position, | |
lane_orientation): | |
message = 'Step {step} ({fps:.1f} FPS): ' | |
message += 'Map Position ({map_x:.1f},{map_y:.1f}) ' | |
message += 'Lane Orientation ({ori_x:.1f},{ori_y:.1f}) ' | |
message += '{speed:.2f} km/h, ' | |
message += '{other_lane:.0f}% other lane, {offroad:.0f}% off-road' | |
message = message.format( | |
map_x=map_position[0], | |
map_y=map_position[1], | |
ori_x=lane_orientation[0], | |
ori_y=lane_orientation[1], | |
step=self._timer.step, | |
fps=self._timer.ticks_per_second(), | |
speed=player_measurements.forward_speed * 3.6, | |
other_lane=100 * player_measurements.intersection_otherlane, | |
offroad=100 * player_measurements.intersection_offroad) | |
print_over_same_line(message) | |
def _print_player_measurements(self, player_measurements): | |
message = 'Step {step} ({fps:.1f} FPS): ' | |
message += '{speed:.2f} km/h, ' | |
message += '{other_lane:.0f}% other lane, {offroad:.0f}% off-road' | |
message = message.format( | |
step=self._timer.step, | |
fps=self._timer.ticks_per_second(), | |
speed=player_measurements.forward_speed * 3.6, | |
other_lane=100 * player_measurements.intersection_otherlane, | |
offroad=100 * player_measurements.intersection_offroad) | |
print_over_same_line(message) | |
def _on_render(self): | |
if self._main_image is not None: | |
array = image_converter.to_rgb_array(self._main_image) | |
array.setflags(write=1) | |
for agent in self._measurements.non_player_agents: | |
if agent.HasField('vehicle'): | |
pos = agent.vehicle.transform.location | |
pos_vector = np.array([[pos.x], [pos.y], [pos.z], [1.0]]) | |
transformed_3d_pos = np.dot(inv(self._extrinsic.matrix), pos_vector) | |
pos2d = np.dot(self._intrinsic, transformed_3d_pos[:3]) | |
pos2d = np.array([ | |
pos2d[0] / pos2d[2], | |
pos2d[1] / pos2d[2], | |
pos2d[2]]) | |
if pos2d[2] > 0: | |
x_2d = WINDOW_WIDTH - pos2d[0] | |
y_2d = WINDOW_HEIGHT - pos2d[1] | |
draw_rect(array, (y_2d, x_2d), 10, rand_color(agent.id)) | |
surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) | |
self._display.blit(surface, (0, 0)) | |
if self._lidar_measurement is not None: | |
lidar_data = np.array(self._lidar_measurement.data[:, :2]) | |
lidar_data *= 2.0 | |
lidar_data += 100.0 | |
lidar_data = np.fabs(lidar_data) | |
lidar_data = lidar_data.astype(np.int32) | |
lidar_data = np.reshape(lidar_data, (-1, 2)) | |
#draw lidar | |
lidar_img_size = (200, 200, 3) | |
lidar_img = np.zeros(lidar_img_size) | |
lidar_img[tuple(lidar_data.T)] = (255, 255, 255) | |
surface = pygame.surfarray.make_surface(lidar_img) | |
self._display.blit(surface, (10, 10)) | |
if self._map_view is not None: | |
array = self._map_view | |
array = array[:, :, :3] | |
new_window_width = \ | |
(float(WINDOW_HEIGHT) / float(self._map_shape[0])) * \ | |
float(self._map_shape[1]) | |
surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) | |
w_pos = int(self._position[0]*(float(WINDOW_HEIGHT)/float(self._map_shape[0]))) | |
h_pos = int(self._position[1] * (new_window_width/float(self._map_shape[1]))) | |
pygame.draw.circle(surface, [255, 0, 0, 255], (w_pos, h_pos), 6, 0) | |
for agent in self._agent_positions: | |
if agent.HasField('vehicle'): | |
agent_position = self._map.convert_to_pixel([ | |
agent.vehicle.transform.location.x, | |
agent.vehicle.transform.location.y, | |
agent.vehicle.transform.location.z]) | |
w_pos = int(agent_position[0]*(float(WINDOW_HEIGHT)/float(self._map_shape[0]))) | |
h_pos = int(agent_position[1] *(new_window_width/float(self._map_shape[1]))) | |
pygame.draw.circle(surface, [255, 0, 255, 255], (w_pos, h_pos), 4, 0) | |
self._display.blit(surface, (WINDOW_WIDTH, 0)) | |
pygame.display.flip() | |
def main(): | |
argparser = argparse.ArgumentParser( | |
description='CARLA Manual Control Client') | |
argparser.add_argument( | |
'-v', '--verbose', | |
action='store_true', | |
dest='debug', | |
help='print debug information') | |
argparser.add_argument( | |
'--host', | |
metavar='H', | |
default='localhost', | |
help='IP of the host server (default: localhost)') | |
argparser.add_argument( | |
'-p', '--port', | |
metavar='P', | |
default=2000, | |
type=int, | |
help='TCP port to listen to (default: 2000)') | |
argparser.add_argument( | |
'-a', '--autopilot', | |
action='store_true', | |
help='enable autopilot') | |
argparser.add_argument( | |
'-l', '--lidar', | |
action='store_true', | |
help='enable Lidar') | |
argparser.add_argument( | |
'-q', '--quality-level', | |
choices=['Low', 'Epic'], | |
type=lambda s: s.title(), | |
default='Epic', | |
help='graphics quality level, a lower level makes the simulation run considerably faster.') | |
argparser.add_argument( | |
'-m', '--map-name', | |
metavar='M', | |
default=None, | |
help='plot the map of the current city (needs to match active map in ' | |
'server, options: Town01 or Town02)') | |
args = argparser.parse_args() | |
log_level = logging.DEBUG if args.debug else logging.INFO | |
logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level) | |
logging.info('listening to server %s:%s', args.host, args.port) | |
print(__doc__) | |
while True: | |
try: | |
with make_carla_client(args.host, args.port) as client: | |
game = CarlaGame(client, args) | |
game.execute() | |
break | |
except TCPConnectionError as error: | |
logging.error(error) | |
time.sleep(1) | |
if __name__ == '__main__': | |
try: | |
main() | |
except KeyboardInterrupt: | |
print('\nCancelled by user. Bye!') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for the help, I now have a script to display 2D bounding boxes. I also have implemented the camera recording used in manual_control.py, but I found out that some frames were not recorded in the _out folder. Since the client you linked to is already in sync mode, I do not know how I could make sure that I get all the frames. Would it be better if I recorded compressed images instead?
Thanks again for the help.