Created
April 19, 2021 19:21
-
-
Save soulslicer/0ccd811a47d27e19f42e148657c72723 to your computer and use it in GitHub Desktop.
carla_car.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Copyright (c) 2019 Computer Vision Center (CVC) at the Universitat Autonoma de | |
# Barcelona (UAB). | |
# | |
# This work is licensed under the terms of the MIT license. | |
# For a copy, see <https://opensource.org/licenses/MIT>. | |
# Allows controlling a vehicle with a keyboard. For a simpler and more | |
# documented example, please take a look at tutorial.py. | |
""" | |
Welcome to CARLA manual control. | |
Use ARROWS or WASD keys for control. | |
W : throttle | |
S : brake | |
AD : steer | |
Q : toggle reverse | |
Space : hand-brake | |
P : toggle autopilot | |
M : toggle manual transmission | |
,/. : gear up/down | |
TAB : change sensor position | |
` : next sensor | |
[1-9] : change to sensor [1-9] | |
C : change weather (Shift+C reverse) | |
Backspace : change vehicle | |
R : toggle recording images to disk | |
CTRL + R : toggle recording of simulation (replacing any previous) | |
CTRL + P : start replaying last recorded simulation | |
CTRL + + : increments the start time of the replay by 1 second (+SHIFT = 10 seconds) | |
CTRL + - : decrements the start time of the replay by 1 second (+SHIFT = 10 seconds) | |
F1 : toggle HUD | |
H/? : toggle help | |
ESC : quit | |
""" | |
from __future__ import print_function | |
# ============================================================================== | |
# -- find carla module --------------------------------------------------------- | |
# ============================================================================== | |
import glob | |
import os | |
import sys | |
# ============================================================================== | |
# -- imports ------------------------------------------------------------------- | |
# ============================================================================== | |
import carla | |
from carla import ColorConverter as cc | |
import argparse | |
import collections | |
import datetime | |
import logging | |
import math | |
import random | |
import re | |
import weakref | |
try: | |
import pygame | |
from pygame.locals import KMOD_CTRL | |
from pygame.locals import KMOD_SHIFT | |
from pygame.locals import K_0 | |
from pygame.locals import K_9 | |
from pygame.locals import K_BACKQUOTE | |
from pygame.locals import K_BACKSPACE | |
from pygame.locals import K_COMMA | |
from pygame.locals import K_DOWN | |
from pygame.locals import K_ESCAPE | |
from pygame.locals import K_F1 | |
from pygame.locals import K_LEFT | |
from pygame.locals import K_PERIOD | |
from pygame.locals import K_RIGHT | |
from pygame.locals import K_SLASH | |
from pygame.locals import K_SPACE | |
from pygame.locals import K_TAB | |
from pygame.locals import K_UP | |
from pygame.locals import K_a | |
from pygame.locals import K_c | |
from pygame.locals import K_d | |
from pygame.locals import K_h | |
from pygame.locals import K_m | |
from pygame.locals import K_p | |
from pygame.locals import K_q | |
from pygame.locals import K_r | |
from pygame.locals import K_s | |
from pygame.locals import K_w | |
from pygame.locals import K_MINUS | |
from pygame.locals import K_EQUALS | |
except ImportError: | |
raise RuntimeError('cannot import pygame, make sure pygame package is installed') | |
try: | |
import numpy as np | |
except ImportError: | |
raise RuntimeError('cannot import numpy, make sure numpy package is installed') | |
# ============================================================================== | |
# -- Global functions ---------------------------------------------------------- | |
# ============================================================================== | |
def find_weather_presets(): | |
rgx = re.compile('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)') | |
name = lambda x: ' '.join(m.group(0) for m in rgx.finditer(x)) | |
presets = [x for x in dir(carla.WeatherParameters) if re.match('[A-Z].+', x)] | |
return [(getattr(carla.WeatherParameters, x), name(x)) for x in presets] | |
def get_actor_display_name(actor, truncate=250): | |
name = ' '.join(actor.type_id.replace('_', '.').title().split('.')[1:]) | |
return (name[:truncate - 1] + u'\u2026') if len(name) > truncate else name | |
# ============================================================================== | |
# -- World --------------------------------------------------------------------- | |
# ============================================================================== | |
class World(object): | |
def __init__(self, carla_world, hud, args): | |
self.world = carla_world | |
self.actor_role_name = args.rolename | |
self.map = self.world.get_map() | |
self.hud = hud | |
self.player = None | |
self.collision_sensor = None | |
self.lane_invasion_sensor = None | |
self.gnss_sensor = None | |
self.camera_manager = None | |
self._weather_presets = find_weather_presets() | |
self._weather_index = 0 | |
self._actor_filter = args.filter | |
self._gamma = args.gamma | |
self.restart() | |
self.world.on_tick(hud.on_world_tick) | |
self.recording_enabled = False | |
self.recording_start = 0 | |
def restart(self): | |
# Keep same camera config if the camera manager exists. | |
cam_index = self.camera_manager.index if self.camera_manager is not None else 0 | |
cam_pos_index = self.camera_manager.transform_index if self.camera_manager is not None else 0 | |
# Get a random blueprint. | |
self._actor_filter = "vehicle.tesla*" | |
blueprint = random.choice(self.world.get_blueprint_library().filter(self._actor_filter)) | |
# kawasaki.ninja | |
blueprint.set_attribute('role_name', self.actor_role_name) | |
if blueprint.has_attribute('color'): | |
color = random.choice(blueprint.get_attribute('color').recommended_values) | |
blueprint.set_attribute('color', color) | |
if blueprint.has_attribute('driver_id'): | |
driver_id = random.choice(blueprint.get_attribute('driver_id').recommended_values) | |
blueprint.set_attribute('driver_id', driver_id) | |
if blueprint.has_attribute('is_invincible'): | |
blueprint.set_attribute('is_invincible', 'true') | |
# Spawn the player. | |
if self.player is not None: | |
spawn_point = self.player.get_transform() | |
spawn_point.location.z += 2.0 | |
spawn_point.rotation.roll = 0.0 | |
spawn_point.rotation.pitch = 0.0 | |
self.destroy() | |
self.player = self.world.try_spawn_actor(blueprint, spawn_point) | |
while self.player is None: | |
spawn_points = self.map.get_spawn_points() | |
spawn_point = random.choice(spawn_points) if spawn_points else carla.Transform() | |
self.player = self.world.try_spawn_actor(blueprint, spawn_point) | |
# Set up the sensors. | |
self.collision_sensor = CollisionSensor(self.player, self.hud) | |
self.lane_invasion_sensor = LaneInvasionSensor(self.player, self.hud) | |
self.gnss_sensor = GnssSensor(self.player) | |
self.camera_manager = CameraManager(self.player, self.hud, self._gamma) | |
self.camera_manager.transform_index = cam_pos_index | |
self.camera_manager.set_sensor(cam_index, notify=False) | |
actor_type = get_actor_display_name(self.player) | |
self.hud.notification(actor_type) | |
def next_weather(self, reverse=False): | |
self._weather_index += -1 if reverse else 1 | |
self._weather_index %= len(self._weather_presets) | |
preset = self._weather_presets[self._weather_index] | |
self.hud.notification('Weather: %s' % preset[1]) | |
self.player.get_world().set_weather(preset[0]) | |
def tick(self, clock): | |
self.hud.tick(self, clock) | |
def render(self, display): | |
self.camera_manager.render(display) | |
self.hud.render(display) | |
def destroy_sensors(self): | |
self.camera_manager.sensor.destroy() | |
self.camera_manager.sensor = None | |
self.camera_manager.index = None | |
def destroy(self): | |
actors = [ | |
self.camera_manager.sensor, | |
self.collision_sensor.sensor, | |
self.lane_invasion_sensor.sensor, | |
self.gnss_sensor.sensor, | |
self.player] | |
for actor in actors: | |
if actor is not None: | |
actor.destroy() | |
# ============================================================================== | |
# -- KeyboardControl ----------------------------------------------------------- | |
# ============================================================================== | |
import evdev | |
from evdev import ecodes, InputDevice | |
class DualControl(object): | |
def __init__(self, world, start_in_autopilot): | |
self._autopilot_enabled = start_in_autopilot | |
if isinstance(world.player, carla.Vehicle): | |
self._control = carla.VehicleControl() | |
world.player.set_autopilot(self._autopilot_enabled) | |
elif isinstance(world.player, carla.Walker): | |
self._control = carla.WalkerControl() | |
self._autopilot_enabled = False | |
self._rotation = world.player.get_transform().rotation | |
else: | |
raise NotImplementedError("Actor type not supported") | |
self._steer_cache = 0.0 | |
world.hud.notification("Press 'H' or '?' for help.", seconds=4.0) | |
# initialize steering wheel | |
pygame.joystick.init() | |
joystick_count = pygame.joystick.get_count() | |
if joystick_count > 1: | |
raise ValueError("Please Connect Just One Joystick") | |
self._joystick = pygame.joystick.Joystick(0) | |
self._joystick.init() | |
self._steer_idx = 0 | |
self._throttle_idx = 2 | |
self._brake_idx = 3 | |
self._reverse_idx = 0 | |
self._handbrake_idx = 4 | |
self.first_run = True | |
self.set_feedback(30000) | |
def set_feedback(self, val): | |
device = evdev.list_devices()[0] | |
evtdev = InputDevice(device) | |
#val = 50000 # val \in [0,65535] | |
evtdev.write(ecodes.EV_FF, ecodes.FF_AUTOCENTER, val) | |
def parse_events(self, world, clock): | |
for event in pygame.event.get(): | |
if event.type == pygame.QUIT: | |
return True | |
elif event.type == pygame.JOYBUTTONDOWN: | |
if event.button == 0: | |
world.restart() | |
elif event.button == 1: | |
world.hud.toggle_info() | |
elif event.button == 2: | |
world.camera_manager.toggle_camera() | |
elif event.button == 3: | |
world.next_weather() | |
elif event.button == self._reverse_idx: | |
self._control.gear = 1 if self._control.reverse else -1 | |
elif event.button == 23: | |
world.camera_manager.next_sensor() | |
elif event.type == pygame.KEYUP: | |
if self._is_quit_shortcut(event.key): | |
return True | |
elif event.key == K_BACKSPACE: | |
world.restart() | |
elif event.key == K_F1: | |
world.hud.toggle_info() | |
elif event.key == K_h or (event.key == K_SLASH and pygame.key.get_mods() & KMOD_SHIFT): | |
world.hud.help.toggle() | |
elif event.key == K_TAB: | |
world.camera_manager.toggle_camera() | |
elif event.key == K_c and pygame.key.get_mods() & KMOD_SHIFT: | |
world.next_weather(reverse=True) | |
elif event.key == K_c: | |
world.next_weather() | |
elif event.key == K_BACKQUOTE: | |
world.camera_manager.next_sensor() | |
elif event.key > K_0 and event.key <= K_9: | |
world.camera_manager.set_sensor(event.key - 1 - K_0) | |
elif event.key == K_r: | |
world.camera_manager.toggle_recording() | |
if isinstance(self._control, carla.VehicleControl): | |
if event.key == K_q: | |
self._control.gear = 1 if self._control.reverse else -1 | |
elif event.key == K_m: | |
self._control.manual_gear_shift = not self._control.manual_gear_shift | |
self._control.gear = world.player.get_control().gear | |
world.hud.notification('%s Transmission' % | |
('Manual' if self._control.manual_gear_shift else 'Automatic')) | |
elif self._control.manual_gear_shift and event.key == K_COMMA: | |
self._control.gear = max(-1, self._control.gear - 1) | |
elif self._control.manual_gear_shift and event.key == K_PERIOD: | |
self._control.gear = self._control.gear + 1 | |
elif event.key == K_p: | |
self._autopilot_enabled = not self._autopilot_enabled | |
world.player.set_autopilot(self._autopilot_enabled) | |
world.hud.notification('Autopilot %s' % ('On' if self._autopilot_enabled else 'Off')) | |
if not self._autopilot_enabled: | |
if isinstance(self._control, carla.VehicleControl): | |
#self._parse_vehicle_keys(pygame.key.get_pressed(), clock.get_time()) | |
self._parse_vehicle_wheel() | |
#self._control.reverse = self._control.gear < 0 | |
elif isinstance(self._control, carla.WalkerControl): | |
self._parse_walker_keys(pygame.key.get_pressed(), clock.get_time()) | |
print(self._control) | |
world.player.apply_control(self._control) | |
def _parse_vehicle_keys(self, keys, milliseconds): | |
self._control.throttle = 1.0 if keys[K_UP] or keys[K_w] else 0.0 | |
steer_increment = 5e-4 * milliseconds | |
if keys[K_LEFT] or keys[K_a]: | |
self._steer_cache -= steer_increment | |
elif keys[K_RIGHT] or keys[K_d]: | |
self._steer_cache += steer_increment | |
else: | |
self._steer_cache = 0.0 | |
self._steer_cache = min(0.7, max(-0.7, self._steer_cache)) | |
self._control.steer = round(self._steer_cache, 1) | |
self._control.brake = 1.0 if keys[K_DOWN] or keys[K_s] else 0.0 | |
self._control.hand_brake = keys[K_SPACE] | |
def _parse_vehicle_wheel(self): | |
numAxes = self._joystick.get_numaxes() | |
jsInputs = [float(self._joystick.get_axis(i)) for i in range(numAxes)] | |
# print (jsInputs) | |
jsButtons = [float(self._joystick.get_button(i)) for i in | |
range(self._joystick.get_numbuttons())] | |
fPress = bool(jsButtons[14]) | |
rPress = bool(jsButtons[15]) | |
# Custom function to map range of inputs [1, -1] to outputs [0, 1] i.e 1 from inputs means nothing is pressed | |
# For the steering, it seems fine as it is | |
K1 = 1.0 # 0.55 | |
steerCmd = K1 * math.tan(0.5 * jsInputs[self._steer_idx]) | |
K2 = 1.6 # 1.6 | |
throttleCmd = K2 + (2.05 * math.log10( | |
-0.7 * jsInputs[self._throttle_idx] + 1.4) - 1.2) / 0.92 | |
if throttleCmd <= 0: | |
throttleCmd = 0 | |
elif throttleCmd > 1: | |
throttleCmd = 1 | |
brakeCmd = 1.6 + (2.05 * math.log10( | |
-0.7 * jsInputs[self._brake_idx] + 1.4) - 1.2) / 0.92 | |
if brakeCmd <= 0: | |
brakeCmd = 0 | |
elif brakeCmd > 1: | |
brakeCmd = 1 | |
if round(throttleCmd,6) == 0.621264 or round(brakeCmd,6) == 0.621264: | |
if self.first_run: | |
throttleCmd = 0 | |
brakeCmd = 0 | |
print("Press to Continue") | |
else: | |
self.first_run = False | |
if fPress == 0 and rPress == 0: | |
brakeCmd = 0 | |
throttleCmd = 0 | |
self._control.reverse = False | |
elif fPress == 1 and rPress == 0: | |
self._control.reverse = False | |
elif fPress == 0 and rPress == 1: | |
self._control.reverse = True | |
self._control.steer = steerCmd | |
self._control.brake = brakeCmd | |
self._control.throttle = throttleCmd | |
#toggle = jsButtons[self._reverse_idx] | |
self._control.hand_brake = bool(jsButtons[self._handbrake_idx]) | |
def _parse_walker_keys(self, keys, milliseconds): | |
self._control.speed = 0.0 | |
if keys[K_DOWN] or keys[K_s]: | |
self._control.speed = 0.0 | |
if keys[K_LEFT] or keys[K_a]: | |
self._control.speed = .01 | |
self._rotation.yaw -= 0.08 * milliseconds | |
if keys[K_RIGHT] or keys[K_d]: | |
self._control.speed = .01 | |
self._rotation.yaw += 0.08 * milliseconds | |
if keys[K_UP] or keys[K_w]: | |
self._control.speed = 5.556 if pygame.key.get_mods() & KMOD_SHIFT else 2.778 | |
self._control.jump = keys[K_SPACE] | |
self._rotation.yaw = round(self._rotation.yaw, 1) | |
self._control.direction = self._rotation.get_forward_vector() | |
@staticmethod | |
def _is_quit_shortcut(key): | |
return (key == K_ESCAPE) or (key == K_q and pygame.key.get_mods() & KMOD_CTRL) | |
class KeyboardControl(object): | |
def __init__(self, world, start_in_autopilot): | |
self._autopilot_enabled = start_in_autopilot | |
if isinstance(world.player, carla.Vehicle): | |
self._control = carla.VehicleControl() | |
world.player.set_autopilot(self._autopilot_enabled) | |
elif isinstance(world.player, carla.Walker): | |
self._control = carla.WalkerControl() | |
self._autopilot_enabled = False | |
self._rotation = world.player.get_transform().rotation | |
else: | |
raise NotImplementedError("Actor type not supported") | |
self._steer_cache = 0.0 | |
world.hud.notification("Press 'H' or '?' for help.", seconds=4.0) | |
def parse_events(self, client, world, clock): | |
for event in pygame.event.get(): | |
if event.type == pygame.QUIT: | |
return True | |
elif event.type == pygame.KEYUP: | |
if self._is_quit_shortcut(event.key): | |
return True | |
elif event.key == K_BACKSPACE: | |
world.restart() | |
elif event.key == K_F1: | |
world.hud.toggle_info() | |
elif event.key == K_h or (event.key == K_SLASH and pygame.key.get_mods() & KMOD_SHIFT): | |
world.hud.help.toggle() | |
elif event.key == K_TAB: | |
world.camera_manager.toggle_camera() | |
elif event.key == K_c and pygame.key.get_mods() & KMOD_SHIFT: | |
world.next_weather(reverse=True) | |
elif event.key == K_c: | |
world.next_weather() | |
elif event.key == K_BACKQUOTE: | |
world.camera_manager.next_sensor() | |
elif event.key > K_0 and event.key <= K_9: | |
world.camera_manager.set_sensor(event.key - 1 - K_0) | |
elif event.key == K_r and not (pygame.key.get_mods() & KMOD_CTRL): | |
world.camera_manager.toggle_recording() | |
elif event.key == K_r and (pygame.key.get_mods() & KMOD_CTRL): | |
if (world.recording_enabled): | |
client.stop_recorder() | |
world.recording_enabled = False | |
world.hud.notification("Recorder is OFF") | |
else: | |
client.start_recorder("manual_recording.rec") | |
world.recording_enabled = True | |
world.hud.notification("Recorder is ON") | |
elif event.key == K_p and (pygame.key.get_mods() & KMOD_CTRL): | |
# stop recorder | |
client.stop_recorder() | |
world.recording_enabled = False | |
# work around to fix camera at start of replaying | |
currentIndex = world.camera_manager.index | |
world.destroy_sensors() | |
# disable autopilot | |
self._autopilot_enabled = False | |
world.player.set_autopilot(self._autopilot_enabled) | |
world.hud.notification("Replaying file 'manual_recording.rec'") | |
# replayer | |
client.replay_file("manual_recording.rec", world.recording_start, 0, 0) | |
world.camera_manager.set_sensor(currentIndex) | |
elif event.key == K_MINUS and (pygame.key.get_mods() & KMOD_CTRL): | |
if pygame.key.get_mods() & KMOD_SHIFT: | |
world.recording_start -= 10 | |
else: | |
world.recording_start -= 1 | |
world.hud.notification("Recording start time is %d" % (world.recording_start)) | |
elif event.key == K_EQUALS and (pygame.key.get_mods() & KMOD_CTRL): | |
if pygame.key.get_mods() & KMOD_SHIFT: | |
world.recording_start += 10 | |
else: | |
world.recording_start += 1 | |
world.hud.notification("Recording start time is %d" % (world.recording_start)) | |
if isinstance(self._control, carla.VehicleControl): | |
if event.key == K_q: | |
self._control.gear = 1 if self._control.reverse else -1 | |
elif event.key == K_m: | |
self._control.manual_gear_shift = not self._control.manual_gear_shift | |
self._control.gear = world.player.get_control().gear | |
world.hud.notification('%s Transmission' % | |
('Manual' if self._control.manual_gear_shift else 'Automatic')) | |
elif self._control.manual_gear_shift and event.key == K_COMMA: | |
self._control.gear = max(-1, self._control.gear - 1) | |
elif self._control.manual_gear_shift and event.key == K_PERIOD: | |
self._control.gear = self._control.gear + 1 | |
elif event.key == K_p and not (pygame.key.get_mods() & KMOD_CTRL): | |
self._autopilot_enabled = not self._autopilot_enabled | |
world.player.set_autopilot(self._autopilot_enabled) | |
world.hud.notification('Autopilot %s' % ('On' if self._autopilot_enabled else 'Off')) | |
if not self._autopilot_enabled: | |
if isinstance(self._control, carla.VehicleControl): | |
self._parse_vehicle_keys(pygame.key.get_pressed(), clock.get_time()) | |
self._control.reverse = self._control.gear < 0 | |
elif isinstance(self._control, carla.WalkerControl): | |
self._parse_walker_keys(pygame.key.get_pressed(), clock.get_time()) | |
print(self._control) | |
world.player.apply_control(self._control) | |
def _parse_vehicle_keys(self, keys, milliseconds): | |
self._control.throttle = 1.0 if keys[K_UP] or keys[K_w] else 0.0 | |
steer_increment = 5e-4 * milliseconds | |
if keys[K_LEFT] or keys[K_a]: | |
if self._steer_cache > 0: | |
self._steer_cache = 0 | |
else: | |
self._steer_cache -= steer_increment | |
elif keys[K_RIGHT] or keys[K_d]: | |
if self._steer_cache < 0: | |
self._steer_cache = 0 | |
else: | |
self._steer_cache += steer_increment | |
else: | |
self._steer_cache = 0.0 | |
self._steer_cache = min(0.7, max(-0.7, self._steer_cache)) | |
self._control.steer = round(self._steer_cache, 1) | |
self._control.brake = 1.0 if keys[K_DOWN] or keys[K_s] else 0.0 | |
self._control.hand_brake = keys[K_SPACE] | |
def _parse_walker_keys(self, keys, milliseconds): | |
self._control.speed = 0.0 | |
if keys[K_DOWN] or keys[K_s]: | |
self._control.speed = 0.0 | |
if keys[K_LEFT] or keys[K_a]: | |
self._control.speed = .01 | |
self._rotation.yaw -= 0.08 * milliseconds | |
if keys[K_RIGHT] or keys[K_d]: | |
self._control.speed = .01 | |
self._rotation.yaw += 0.08 * milliseconds | |
if keys[K_UP] or keys[K_w]: | |
self._control.speed = 3.333 if pygame.key.get_mods() & KMOD_SHIFT else 2.778 | |
self._control.jump = keys[K_SPACE] | |
self._rotation.yaw = round(self._rotation.yaw, 1) | |
self._control.direction = self._rotation.get_forward_vector() | |
@staticmethod | |
def _is_quit_shortcut(key): | |
return (key == K_ESCAPE) or (key == K_q and pygame.key.get_mods() & KMOD_CTRL) | |
# ============================================================================== | |
# -- HUD ----------------------------------------------------------------------- | |
# ============================================================================== | |
class HUD(object): | |
def __init__(self, width, height): | |
self.dim = (width, height) | |
font = pygame.font.Font(pygame.font.get_default_font(), 20) | |
fonts = [x for x in pygame.font.get_fonts() if 'mono' in x] | |
default_font = 'ubuntumono' | |
mono = default_font if default_font in fonts else fonts[0] | |
mono = pygame.font.match_font(mono) | |
self._font_mono = pygame.font.Font(mono, 14) | |
self._notifications = FadingText(font, (width, 40), (0, height - 40)) | |
self.help = HelpText(pygame.font.Font(mono, 24), width, height) | |
self.server_fps = 0 | |
self.frame = 0 | |
self.simulation_time = 0 | |
self._show_info = True | |
self._info_text = [] | |
self._server_clock = pygame.time.Clock() | |
def on_world_tick(self, timestamp): | |
self._server_clock.tick() | |
self.server_fps = self._server_clock.get_fps() | |
self.frame = timestamp.frame | |
self.simulation_time = timestamp.elapsed_seconds | |
def tick(self, world, clock): | |
self._notifications.tick(world, clock) | |
if not self._show_info: | |
return | |
t = world.player.get_transform() | |
v = world.player.get_velocity() | |
c = world.player.get_control() | |
heading = 'N' if abs(t.rotation.yaw) < 89.5 else '' | |
heading += 'S' if abs(t.rotation.yaw) > 90.5 else '' | |
heading += 'E' if 179.5 > t.rotation.yaw > 0.5 else '' | |
heading += 'W' if -0.5 > t.rotation.yaw > -179.5 else '' | |
colhist = world.collision_sensor.get_collision_history() | |
collision = [colhist[x + self.frame - 200] for x in range(0, 200)] | |
max_col = max(1.0, max(collision)) | |
collision = [x / max_col for x in collision] | |
vehicles = world.world.get_actors().filter('vehicle.*') | |
self._info_text = [ | |
'Server: % 16.0f FPS' % self.server_fps, | |
'Client: % 16.0f FPS' % clock.get_fps(), | |
'', | |
'Vehicle: % 20s' % get_actor_display_name(world.player, truncate=20), | |
'Map: % 20s' % world.map.name, | |
'Simulation time: % 12s' % datetime.timedelta(seconds=int(self.simulation_time)), | |
'', | |
'Speed: % 15.0f km/h' % (3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2)), | |
u'Heading:% 16.0f\N{DEGREE SIGN} % 2s' % (t.rotation.yaw, heading), | |
'Location:% 20s' % ('(% 5.1f, % 5.1f)' % (t.location.x, t.location.y)), | |
'GNSS:% 24s' % ('(% 2.6f, % 3.6f)' % (world.gnss_sensor.lat, world.gnss_sensor.lon)), | |
'Height: % 18.0f m' % t.location.z, | |
''] | |
if isinstance(c, carla.VehicleControl): | |
self._info_text += [ | |
('Throttle:', c.throttle, 0.0, 1.0), | |
('Steer:', c.steer, -1.0, 1.0), | |
('Brake:', c.brake, 0.0, 1.0), | |
('Reverse:', c.reverse), | |
('Hand brake:', c.hand_brake), | |
('Manual:', c.manual_gear_shift), | |
'Gear: %s' % {-1: 'R', 0: 'N'}.get(c.gear, c.gear)] | |
elif isinstance(c, carla.WalkerControl): | |
self._info_text += [ | |
('Speed:', c.speed, 0.0, 5.556), | |
('Jump:', c.jump)] | |
self._info_text += [ | |
'', | |
'Collision:', | |
collision, | |
'', | |
'Number of vehicles: % 8d' % len(vehicles)] | |
if len(vehicles) > 1: | |
self._info_text += ['Nearby vehicles:'] | |
distance = lambda l: math.sqrt((l.x - t.location.x)**2 + (l.y - t.location.y)**2 + (l.z - t.location.z)**2) | |
vehicles = [(distance(x.get_location()), x) for x in vehicles if x.id != world.player.id] | |
for d, vehicle in sorted(vehicles): | |
if d > 200.0: | |
break | |
vehicle_type = get_actor_display_name(vehicle, truncate=22) | |
self._info_text.append('% 4dm %s' % (d, vehicle_type)) | |
def toggle_info(self): | |
self._show_info = not self._show_info | |
def notification(self, text, seconds=2.0): | |
self._notifications.set_text(text, seconds=seconds) | |
def error(self, text): | |
self._notifications.set_text('Error: %s' % text, (255, 0, 0)) | |
def render(self, display): | |
if self._show_info: | |
info_surface = pygame.Surface((220, self.dim[1])) | |
info_surface.set_alpha(100) | |
display.blit(info_surface, (0, 0)) | |
v_offset = 4 | |
bar_h_offset = 100 | |
bar_width = 106 | |
for item in self._info_text: | |
if v_offset + 18 > self.dim[1]: | |
break | |
if isinstance(item, list): | |
if len(item) > 1: | |
points = [(x + 8, v_offset + 8 + (1.0 - y) * 30) for x, y in enumerate(item)] | |
pygame.draw.lines(display, (255, 136, 0), False, points, 2) | |
item = None | |
v_offset += 18 | |
elif isinstance(item, tuple): | |
if isinstance(item[1], bool): | |
rect = pygame.Rect((bar_h_offset, v_offset + 8), (6, 6)) | |
pygame.draw.rect(display, (255, 255, 255), rect, 0 if item[1] else 1) | |
else: | |
rect_border = pygame.Rect((bar_h_offset, v_offset + 8), (bar_width, 6)) | |
pygame.draw.rect(display, (255, 255, 255), rect_border, 1) | |
f = (item[1] - item[2]) / (item[3] - item[2]) | |
if item[2] < 0.0: | |
rect = pygame.Rect((bar_h_offset + f * (bar_width - 6), v_offset + 8), (6, 6)) | |
else: | |
rect = pygame.Rect((bar_h_offset, v_offset + 8), (f * bar_width, 6)) | |
pygame.draw.rect(display, (255, 255, 255), rect) | |
item = item[0] | |
if item: # At this point has to be a str. | |
surface = self._font_mono.render(item, True, (255, 255, 255)) | |
display.blit(surface, (8, v_offset)) | |
v_offset += 18 | |
self._notifications.render(display) | |
self.help.render(display) | |
# ============================================================================== | |
# -- FadingText ---------------------------------------------------------------- | |
# ============================================================================== | |
class FadingText(object): | |
def __init__(self, font, dim, pos): | |
self.font = font | |
self.dim = dim | |
self.pos = pos | |
self.seconds_left = 0 | |
self.surface = pygame.Surface(self.dim) | |
def set_text(self, text, color=(255, 255, 255), seconds=2.0): | |
text_texture = self.font.render(text, True, color) | |
self.surface = pygame.Surface(self.dim) | |
self.seconds_left = seconds | |
self.surface.fill((0, 0, 0, 0)) | |
self.surface.blit(text_texture, (10, 11)) | |
def tick(self, _, clock): | |
delta_seconds = 1e-3 * clock.get_time() | |
self.seconds_left = max(0.0, self.seconds_left - delta_seconds) | |
self.surface.set_alpha(500.0 * self.seconds_left) | |
def render(self, display): | |
display.blit(self.surface, self.pos) | |
# ============================================================================== | |
# -- HelpText ------------------------------------------------------------------ | |
# ============================================================================== | |
class HelpText(object): | |
def __init__(self, font, width, height): | |
lines = __doc__.split('\n') | |
self.font = font | |
self.dim = (680, len(lines) * 22 + 12) | |
self.pos = (0.5 * width - 0.5 * self.dim[0], 0.5 * height - 0.5 * self.dim[1]) | |
self.seconds_left = 0 | |
self.surface = pygame.Surface(self.dim) | |
self.surface.fill((0, 0, 0, 0)) | |
for n, line in enumerate(lines): | |
text_texture = self.font.render(line, True, (255, 255, 255)) | |
self.surface.blit(text_texture, (22, n * 22)) | |
self._render = False | |
self.surface.set_alpha(220) | |
def toggle(self): | |
self._render = not self._render | |
def render(self, display): | |
if self._render: | |
display.blit(self.surface, self.pos) | |
# ============================================================================== | |
# -- CollisionSensor ----------------------------------------------------------- | |
# ============================================================================== | |
class CollisionSensor(object): | |
def __init__(self, parent_actor, hud): | |
self.sensor = None | |
self.history = [] | |
self._parent = parent_actor | |
self.hud = hud | |
world = self._parent.get_world() | |
bp = world.get_blueprint_library().find('sensor.other.collision') | |
self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent) | |
# We need to pass the lambda a weak reference to self to avoid circular | |
# reference. | |
weak_self = weakref.ref(self) | |
self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event)) | |
def get_collision_history(self): | |
history = collections.defaultdict(int) | |
for frame, intensity in self.history: | |
history[frame] += intensity | |
return history | |
@staticmethod | |
def _on_collision(weak_self, event): | |
self = weak_self() | |
if not self: | |
return | |
actor_type = get_actor_display_name(event.other_actor) | |
self.hud.notification('Collision with %r' % actor_type) | |
impulse = event.normal_impulse | |
intensity = math.sqrt(impulse.x**2 + impulse.y**2 + impulse.z**2) | |
self.history.append((event.frame, intensity)) | |
if len(self.history) > 4000: | |
self.history.pop(0) | |
# ============================================================================== | |
# -- LaneInvasionSensor -------------------------------------------------------- | |
# ============================================================================== | |
class LaneInvasionSensor(object): | |
def __init__(self, parent_actor, hud): | |
self.sensor = None | |
self._parent = parent_actor | |
self.hud = hud | |
world = self._parent.get_world() | |
bp = world.get_blueprint_library().find('sensor.other.lane_invasion') | |
self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent) | |
# We need to pass the lambda a weak reference to self to avoid circular | |
# reference. | |
weak_self = weakref.ref(self) | |
self.sensor.listen(lambda event: LaneInvasionSensor._on_invasion(weak_self, event)) | |
@staticmethod | |
def _on_invasion(weak_self, event): | |
self = weak_self() | |
if not self: | |
return | |
lane_types = set(x.type for x in event.crossed_lane_markings) | |
text = ['%r' % str(x).split()[-1] for x in lane_types] | |
self.hud.notification('Crossed line %s' % ' and '.join(text)) | |
# ============================================================================== | |
# -- GnssSensor -------------------------------------------------------- | |
# ============================================================================== | |
class GnssSensor(object): | |
def __init__(self, parent_actor): | |
self.sensor = None | |
self._parent = parent_actor | |
self.lat = 0.0 | |
self.lon = 0.0 | |
world = self._parent.get_world() | |
bp = world.get_blueprint_library().find('sensor.other.gnss') | |
self.sensor = world.spawn_actor(bp, carla.Transform(carla.Location(x=1.0, z=2.8)), attach_to=self._parent) | |
# We need to pass the lambda a weak reference to self to avoid circular | |
# reference. | |
weak_self = weakref.ref(self) | |
self.sensor.listen(lambda event: GnssSensor._on_gnss_event(weak_self, event)) | |
@staticmethod | |
def _on_gnss_event(weak_self, event): | |
self = weak_self() | |
if not self: | |
return | |
self.lat = event.latitude | |
self.lon = event.longitude | |
# ============================================================================== | |
# -- CameraManager ------------------------------------------------------------- | |
# ============================================================================== | |
class CameraManager(object): | |
def __init__(self, parent_actor, hud, gamma_correction): | |
self.sensor = None | |
self.surface = None | |
self._parent = parent_actor | |
self.hud = hud | |
self.recording = False | |
bound_y = 0.5 + self._parent.bounding_box.extent.y | |
Attachment = carla.AttachmentType | |
self._camera_transforms = [ | |
(carla.Transform(carla.Location(x=-5.5, z=2.5), carla.Rotation(pitch=8.0)), Attachment.SpringArm), | |
(carla.Transform(carla.Location(x=1.6, z=1.7)), Attachment.Rigid), | |
(carla.Transform(carla.Location(x=5.5, y=1.5, z=1.5)), Attachment.SpringArm), | |
(carla.Transform(carla.Location(x=-8.0, z=6.0), carla.Rotation(pitch=6.0)), Attachment.SpringArm), | |
(carla.Transform(carla.Location(x=-1, y=-bound_y, z=0.5)), Attachment.Rigid)] | |
self.transform_index = 1 | |
self.sensors = [ | |
['sensor.camera.rgb', cc.Raw, 'Camera RGB', "camera01"], | |
['sensor.camera.depth', cc.Raw, 'Camera Depth (Raw)', "camera01"], | |
['sensor.camera.depth', cc.Depth, 'Camera Depth (Gray Scale)', "camera01"], | |
['sensor.camera.depth', cc.LogarithmicDepth, 'Camera Depth (Logarithmic Gray Scale)', "camera01"], | |
['sensor.camera.semantic_segmentation', cc.Raw, 'Camera Semantic Segmentation (Raw)', "camera01"], | |
['sensor.camera.semantic_segmentation', cc.CityScapesPalette, | |
'Camera Semantic Segmentation (CityScapes Palette)', "camera01"], | |
['sensor.lidar.ray_cast', None, 'Lidar (Ray-Cast)', "camera01"]] | |
world = self._parent.get_world() | |
bp_library = world.get_blueprint_library() | |
for item in self.sensors: | |
bp = bp_library.find(item[0]) | |
if item[0].startswith('sensor.camera'): | |
bp.set_attribute('image_size_x', str(hud.dim[0])) | |
bp.set_attribute('image_size_y', str(hud.dim[1])) | |
if bp.has_attribute('gamma'): | |
bp.set_attribute('gamma', str(gamma_correction)) | |
#bp.set_attribute('role_name', item[3]) | |
elif item[0].startswith('sensor.lidar'): | |
bp.set_attribute('range', '5000') | |
item.append(bp) | |
self.index = None | |
def toggle_camera(self): | |
self.transform_index = (self.transform_index + 1) % len(self._camera_transforms) | |
self.set_sensor(self.index, notify=False, force_respawn=True) | |
def set_sensor(self, index, notify=True, force_respawn=False): | |
pass | |
# index = index % len(self.sensors) | |
# needs_respawn = True if self.index is None else \ | |
# (force_respawn or (self.sensors[index][0] != self.sensors[self.index][0])) | |
# if needs_respawn: | |
# if self.sensor is not None: | |
# self.sensor.destroy() | |
# self.surface = None | |
# self.sensor = self._parent.get_world().spawn_actor( | |
# self.sensors[index][-1], | |
# self._camera_transforms[self.transform_index][0], | |
# attach_to=self._parent, | |
# attachment_type=self._camera_transforms[self.transform_index][1]) | |
# # We need to pass the lambda a weak reference to self to avoid | |
# # circular reference. | |
# weak_self = weakref.ref(self) | |
# self.sensor.listen(lambda image: CameraManager._parse_image(weak_self, image)) | |
# if notify: | |
# self.hud.notification(self.sensors[index][2]) | |
# self.index = index | |
def next_sensor(self): | |
self.set_sensor(self.index + 1) | |
def toggle_recording(self): | |
self.recording = not self.recording | |
self.hud.notification('Recording %s' % ('On' if self.recording else 'Off')) | |
def render(self, display): | |
if self.surface is not None: | |
display.blit(self.surface, (0, 0)) | |
@staticmethod | |
def _parse_image(weak_self, image): | |
self = weak_self() | |
if not self: | |
return | |
if self.sensors[self.index][0].startswith('sensor.lidar'): | |
points = np.frombuffer(image.raw_data, dtype=np.dtype('f4')) | |
points = np.reshape(points, (int(points.shape[0] / 3), 3)) | |
lidar_data = np.array(points[:, :2]) | |
lidar_data *= min(self.hud.dim) / 100.0 | |
lidar_data += (0.5 * self.hud.dim[0], 0.5 * self.hud.dim[1]) | |
lidar_data = np.fabs(lidar_data) # pylint: disable=E1111 | |
lidar_data = lidar_data.astype(np.int32) | |
lidar_data = np.reshape(lidar_data, (-1, 2)) | |
lidar_img_size = (self.hud.dim[0], self.hud.dim[1], 3) | |
lidar_img = np.zeros((lidar_img_size), dtype = int) | |
lidar_img[tuple(lidar_data.T)] = (255, 255, 255) | |
self.surface = pygame.surfarray.make_surface(lidar_img) | |
else: | |
image.convert(self.sensors[self.index][1]) | |
array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8")) | |
array = np.reshape(array, (image.height, image.width, 4)) | |
array = array[:, :, :3] | |
array = array[:, :, ::-1] | |
self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1)) | |
if self.recording: | |
image.save_to_disk('_out/%08d' % image.frame) | |
# ============================================================================== | |
# -- game_loop() --------------------------------------------------------------- | |
# ============================================================================== | |
def game_loop(args): | |
pygame.init() | |
pygame.font.init() | |
world = None | |
try: | |
client = carla.Client(args.host, args.port) | |
client.set_timeout(2.0) | |
display = pygame.display.set_mode( | |
(args.width, args.height), | |
pygame.HWSURFACE | pygame.DOUBLEBUF) | |
hud = HUD(args.width, args.height) | |
world = World(client.get_world(), hud, args) | |
if args.mode == 0: | |
controller = KeyboardControl(world, args.autopilot) | |
elif args.mode == 1: | |
controller = DualControl(world, args.autopilot) | |
# Remove the hero | |
clock = pygame.time.Clock() | |
while True: | |
clock.tick_busy_loop(60) | |
if args.mode == 0: | |
if controller.parse_events(client, world, clock): return | |
elif args.mode == 1: | |
if controller.parse_events(world, clock): return | |
world.tick(clock) | |
world.render(display) | |
pygame.display.flip() | |
pygame.time.wait(0) | |
finally: | |
if (world and world.recording_enabled): | |
client.stop_recorder() | |
if world is not None: | |
world.destroy() | |
pygame.quit() | |
# ============================================================================== | |
# -- main() -------------------------------------------------------------------- | |
# ============================================================================== | |
def main(): | |
argparser = argparse.ArgumentParser( | |
description='CARLA Manual Control Client') | |
argparser.add_argument( | |
'-v', '--verbose', | |
action='store_true', | |
dest='debug', | |
help='print debug information') | |
argparser.add_argument( | |
'--host', | |
metavar='H', | |
default='127.0.0.1', | |
help='IP of the host server (default: 127.0.0.1)') | |
argparser.add_argument( | |
'-p', '--port', | |
metavar='P', | |
default=2000, | |
type=int, | |
help='TCP port to listen to (default: 2000)') | |
argparser.add_argument( | |
'-a', '--autopilot', | |
action='store_true', | |
help='enable autopilot') | |
argparser.add_argument( | |
'--res', | |
metavar='WIDTHxHEIGHT', | |
default='320x240', | |
help='window resolution (default: 1280x720)') | |
argparser.add_argument( | |
'--filter', | |
metavar='PATTERN', | |
default='vehicle.*', | |
help='actor filter (default: "vehicle.*")') | |
argparser.add_argument( | |
'--rolename', | |
metavar='NAME', | |
default='hero', | |
help='actor role name (default: "hero")') | |
argparser.add_argument( | |
'--gamma', | |
default=2.2, | |
type=float, | |
help='Gamma correction of the camera (default: 2.2)') | |
argparser.add_argument( | |
'-m', '--mode', | |
metavar='M', | |
default=0, | |
type=int, | |
help='0 means Keyboard. 1 means Joystick') | |
args = argparser.parse_args() | |
args.width, args.height = [int(x) for x in args.res.split('x')] | |
log_level = logging.DEBUG if args.debug else logging.INFO | |
logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level) | |
logging.info('listening to server %s:%s', args.host, args.port) | |
print(__doc__) | |
try: | |
game_loop(args) | |
except KeyboardInterrupt: | |
print('\nCancelled by user. Bye!') | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment