Skip to content

Instantly share code, notes, and snippets.

@cmpute
Last active June 5, 2024 05:22
Show Gist options
  • Save cmpute/df3d6325f3781191304279be11976502 to your computer and use it in GitHub Desktop.
Save cmpute/df3d6325f3781191304279be11976502 to your computer and use it in GitHub Desktop.
Data collector in Carla with autopilot ego and random generated vehicle and walkers
#!/usr/bin/env python
"""This script is modified from PythonAPI/examples/generate_traffic.py and PythonAPI/examples/automatic_control.py"""
from __future__ import print_function
import argparse
import collections
import datetime
import glob
import logging
import math
import os
import numpy.random as random
import re
import sys
import time
import weakref
try:
import pygame
from pygame.locals import KMOD_CTRL
from pygame.locals import K_ESCAPE
from pygame.locals import K_q
except ImportError:
raise RuntimeError('cannot import pygame, make sure pygame package is installed')
try:
import numpy as np
except ImportError:
raise RuntimeError(
'cannot import numpy, make sure numpy package is installed')
import carla
from carla import ColorConverter as cc
from agents.navigation.behavior_agent import BehaviorAgent # pylint: disable=import-error
from agents.navigation.basic_agent import BasicAgent # pylint: disable=import-error
from agents.navigation.constant_velocity_agent import ConstantVelocityAgent # pylint: disable=import-error
# ==============================================================================
# -- Global functions ----------------------------------------------------------
# ==============================================================================
def find_weather_presets():
"""Method to find weather presets"""
rgx = re.compile('.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)')
def name(x): return ' '.join(m.group(0) for m in rgx.finditer(x))
presets = [x for x in dir(carla.WeatherParameters) if re.match('[A-Z].+', x)]
return [(getattr(carla.WeatherParameters, x), name(x)) for x in presets]
def get_actor_display_name(actor, truncate=250):
"""Method to get actor display name"""
name = ' '.join(actor.type_id.replace('_', '.').title().split('.')[1:])
return (name[:truncate - 1] + u'\u2026') if len(name) > truncate else name
def get_actor_blueprints(world, filter, generation):
bps = world.get_blueprint_library().filter(filter)
if generation.lower() == "all":
return bps
# If the filter returns only one bp, we assume that this one needed
# and therefore, we ignore the generation
if len(bps) == 1:
return bps
try:
int_generation = int(generation)
# Check if generation is in available generations
if int_generation in [1, 2, 3]:
bps = [x for x in bps if int(x.get_attribute('generation')) == int_generation]
return bps
else:
print(" Warning! Actor Generation is not valid. No actor will be spawned.")
return []
except:
print(" Warning! Actor Generation is not valid. No actor will be spawned.")
return []
# ==============================================================================
# -- Traffic Manager ---------------------------------------------------------------
# ==============================================================================
class TrafficManager(object):
def __init__(self, world, traffic_manager, args):
self.vehicles_list = []
self.walkers_list = []
self.all_id = []
self.client = carla.Client(args.host, args.port)
self.client.set_timeout(10.0)
self.world = world
self.traffic_manager = traffic_manager
traffic_manager.set_global_distance_to_leading_vehicle(2.5)
if args.respawn:
traffic_manager.set_respawn_dormant_vehicles(True)
if args.hybrid:
traffic_manager.set_hybrid_physics_mode(True)
traffic_manager.set_hybrid_physics_radius(70.0)
if args.seed is not None:
traffic_manager.set_random_device_seed(args.seed)
settings = world.get_settings()
if not args.sync:
print("You are currently in asynchronous mode. If this is a traffic simulation, "
"you could experience some issues. If it's not working correctly, switch to synchronous "
"mode by using traffic_manager.set_synchronous_mode(True)")
# if args.no_rendering:
# settings.no_rendering_mode = True
world.apply_settings(settings)
blueprints = get_actor_blueprints(world, args.filterv, args.generationv)
if not blueprints:
raise ValueError("Couldn't find any vehicles with the specified filters")
blueprintsWalkers = get_actor_blueprints(world, args.filterw, args.generationw)
if not blueprintsWalkers:
raise ValueError("Couldn't find any walkers with the specified filters")
if args.safe:
blueprints = [x for x in blueprints if x.get_attribute('base_type') == 'car']
blueprints = sorted(blueprints, key=lambda bp: bp.id)
spawn_points = world.get_map().get_spawn_points()
number_of_spawn_points = len(spawn_points)
if args.number_of_vehicles < number_of_spawn_points:
random.shuffle(spawn_points)
elif args.number_of_vehicles > number_of_spawn_points:
msg = 'requested %d vehicles, but could only find %d spawn points'
logging.warning(msg, args.number_of_vehicles, number_of_spawn_points)
args.number_of_vehicles = number_of_spawn_points
# @todo cannot import these directly.
SpawnActor = carla.command.SpawnActor
SetAutopilot = carla.command.SetAutopilot
FutureActor = carla.command.FutureActor
# --------------
# Spawn vehicles
# --------------
batch = []
for n, transform in enumerate(spawn_points):
if n >= args.number_of_vehicles:
break
blueprint = random.choice(blueprints)
if blueprint.has_attribute('color'):
color = random.choice(blueprint.get_attribute('color').recommended_values)
blueprint.set_attribute('color', color)
if blueprint.has_attribute('driver_id'):
driver_id = random.choice(blueprint.get_attribute('driver_id').recommended_values)
blueprint.set_attribute('driver_id', driver_id)
else:
blueprint.set_attribute('role_name', 'autopilot')
# spawn the cars and set their autopilot and light state all together
batch.append(SpawnActor(blueprint, transform)
.then(SetAutopilot(FutureActor, True, traffic_manager.get_port())))
for response in self.client.apply_batch_sync(batch, False):
if response.error:
logging.error(response.error)
else:
self.vehicles_list.append(response.actor_id)
# Set automatic vehicle lights update if specified
if args.car_lights_on:
all_vehicle_actors = world.get_actors(self.vehicles_list)
for actor in all_vehicle_actors:
traffic_manager.update_vehicle_lights(actor, True)
# -------------
# Spawn Walkers
# -------------
# some settings
percentagePedestriansRunning = 0.0 # how many pedestrians will run
percentagePedestriansCrossing = 0.0 # how many pedestrians will walk through the road
if args.seedw:
world.set_pedestrians_seed(args.seedw)
random.seed(args.seedw)
# 1. take all the random locations to spawn
spawn_points = []
for i in range(args.number_of_walkers):
spawn_point = carla.Transform()
loc = world.get_random_location_from_navigation()
if (loc != None):
spawn_point.location = loc
spawn_points.append(spawn_point)
# 2. we spawn the walker object
batch = []
walker_speed = []
for spawn_point in spawn_points:
walker_bp = random.choice(blueprintsWalkers)
# set as not invincible
if walker_bp.has_attribute('is_invincible'):
walker_bp.set_attribute('is_invincible', 'false')
# set the max speed
if walker_bp.has_attribute('speed'):
if (random.random() > percentagePedestriansRunning):
# walking
walker_speed.append(walker_bp.get_attribute('speed').recommended_values[1])
else:
# running
walker_speed.append(walker_bp.get_attribute('speed').recommended_values[2])
else:
print("Walker has no speed")
walker_speed.append(0.0)
batch.append(SpawnActor(walker_bp, spawn_point))
results = self.client.apply_batch_sync(batch, True)
walker_speed2 = []
for i in range(len(results)):
if results[i].error:
logging.error(results[i].error)
else:
self.walkers_list.append({"id": results[i].actor_id})
walker_speed2.append(walker_speed[i])
walker_speed = walker_speed2
# 3. we spawn the walker controller
batch = []
walker_controller_bp = world.get_blueprint_library().find('controller.ai.walker')
for i in range(len(self.walkers_list)):
batch.append(SpawnActor(walker_controller_bp, carla.Transform(), self.walkers_list[i]["id"]))
results = self.client.apply_batch_sync(batch, True)
for i in range(len(results)):
if results[i].error:
logging.error(results[i].error)
else:
self.walkers_list[i]["con"] = results[i].actor_id
# 4. we put together the walkers and controllers id to get the objects from their id
for i in range(len(self.walkers_list)):
self.all_id.append(self.walkers_list[i]["con"])
self.all_id.append(self.walkers_list[i]["id"])
all_actors = world.get_actors(self.all_id)
# wait for a tick to ensure client receives the last transform of the walkers we have just created
if not args.sync:
world.wait_for_tick()
else:
world.tick()
# 5. initialize each controller and set target to walk to (list is [controler, actor, controller, actor ...])
# set how many pedestrians can cross the road
world.set_pedestrians_cross_factor(percentagePedestriansCrossing)
for i in range(0, len(self.all_id), 2):
# start walker
all_actors[i].start()
# set walk to random point
all_actors[i].go_to_location(world.get_random_location_from_navigation())
# max speed
all_actors[i].set_max_speed(float(walker_speed[int(i/2)]))
print('spawned %d vehicles and %d walkers, press Ctrl+C to exit.' % (len(self.vehicles_list), len(self.walkers_list)))
# Example of how to use Traffic Manager parameters
traffic_manager.global_percentage_speed_difference(30.0)
def destroy(self):
print('\ndestroying %d vehicles' % len(self.vehicles_list))
self.client.apply_batch([carla.command.DestroyActor(x) for x in self.vehicles_list])
# stop walker controllers (list is [controller, actor, controller, actor ...])
all_actors = self.world.get_actors(self.all_id)
for i in range(0, len(self.all_id), 2):
all_actors[i].stop()
print('\ndestroying %d walkers' % len(self.walkers_list))
self.client.apply_batch([carla.command.DestroyActor(x) for x in self.all_id])
time.sleep(0.5)
# ==============================================================================
# -- World ---------------------------------------------------------------
# ==============================================================================
class World(object):
""" Class representing the surrounding environment """
def __init__(self, carla_world, hud, args):
"""Constructor method"""
self._args = args
self.world = carla_world
try:
self.map = self.world.get_map()
except RuntimeError as error:
print('RuntimeError: {}'.format(error))
print(' The server could not send the OpenDRIVE (.xodr) file:')
print(' Make sure it exists, has the same name of your town, and is correct.')
sys.exit(1)
self.player = None
self.collision_sensor = None
self.lane_invasion_sensor = None
self.gnss_sensor = None
self.camera_manager = None
self._weather_presets = find_weather_presets()
self._weather_index = 0
self._actor_filter = args.filter
self._actor_generation = args.generation
self.hud = hud
self.headless = args.headless
if not self.headless:
self.restart(args)
self.world.on_tick(hud.on_world_tick)
else:
self.restart_headless(args)
self.recording_enabled = False
self.recording_start = 0
def spawn_player(self):
# Get a random blueprint.
blueprint_list = get_actor_blueprints(self.world, self._actor_filter, self._actor_generation)
if not blueprint_list:
raise ValueError("Couldn't find any blueprints with the specified filters")
blueprint = random.choice(blueprint_list)
blueprint.set_attribute('role_name', 'hero')
if blueprint.has_attribute('color'):
color = random.choice(blueprint.get_attribute('color').recommended_values)
blueprint.set_attribute('color', color)
# Spawn the player.
if self.player is not None:
spawn_point = self.player.get_transform()
spawn_point.location.z += 2.0
spawn_point.rotation.roll = 0.0
spawn_point.rotation.pitch = 0.0
self.destroy()
self.player = self.world.try_spawn_actor(blueprint, spawn_point)
self.modify_vehicle_physics(self.player)
while self.player is None:
if not self.map.get_spawn_points():
print('There are no spawn points available in your map/town.')
print('Please add some Vehicle Spawn Point to your UE4 scene.')
sys.exit(1)
spawn_points = self.map.get_spawn_points()
spawn_point = random.choice(spawn_points) if spawn_points else carla.Transform()
self.player = self.world.try_spawn_actor(blueprint, spawn_point)
self.modify_vehicle_physics(self.player)
def restart_headless(self, args):
"""Restart the world without HUD"""
# spawn the ego vehicle
self.spawn_player()
if self._args.sync:
self.world.tick()
else:
self.world.wait_for_tick()
# spawn necessary sensors
self.collision_sensor = CollisionSensor(self.player, self.hud)
def restart(self, args):
"""Restart the world"""
# spawn the ego vehicle
self.spawn_player()
if self._args.sync:
self.world.tick()
else:
self.world.wait_for_tick()
# Keep same camera config if the camera manager exists.
cam_index = self.camera_manager.index if self.camera_manager is not None else 0
cam_pos_id = self.camera_manager.transform_index if self.camera_manager is not None else 0
# spawn necessary sensors
self.collision_sensor = CollisionSensor(self.player, self.hud)
self.lane_invasion_sensor = LaneInvasionSensor(self.player, self.hud)
self.gnss_sensor = GnssSensor(self.player)
self.camera_manager = CameraManager(self.player, self.hud)
self.camera_manager.transform_index = cam_pos_id
self.camera_manager.set_sensor(cam_index, notify=False)
actor_type = get_actor_display_name(self.player)
self.hud.notification(actor_type)
def next_weather(self, reverse=False):
"""Get next weather setting"""
self._weather_index += -1 if reverse else 1
self._weather_index %= len(self._weather_presets)
preset = self._weather_presets[self._weather_index]
self.hud.notification('Weather: %s' % preset[1])
self.player.get_world().set_weather(preset[0])
def modify_vehicle_physics(self, actor):
#If actor is not a vehicle, we cannot use the physics control
try:
physics_control = actor.get_physics_control()
physics_control.use_sweep_wheel_collision = True
actor.apply_physics_control(physics_control)
except Exception:
pass
def tick(self, clock):
"""Method for every tick"""
if not self.headless:
self.hud.tick(self, clock)
def render(self, display):
"""Render world"""
self.camera_manager.render(display)
self.hud.render(display)
def destroy_sensors(self):
"""Destroy sensors"""
self.camera_manager.sensor.destroy()
self.camera_manager.sensor = None
self.camera_manager.index = None
def destroy(self):
"""Destroys all actors"""
actors = [
getattr(self.camera_manager, "sensor", None),
getattr(self.collision_sensor, "sensor", None),
getattr(self.lane_invasion_sensor, "sensor", None),
getattr(self.gnss_sensor, "sensor", None),
self.player,
]
for actor in actors:
if actor is not None:
actor.destroy()
# ==============================================================================
# -- DataStorage ---------------------------------------------------------------
# ==============================================================================
class DataStorage(object):
def __init__(self, world):
self.world = world
self.timers = []
self.log_interval = 1
def tick(self):
self.timers.append(time.time())
if self.timers[-1] - self.timers[0] > self.log_interval:
fps = (len(self.timers) - 1) / (self.timers[-1] - self.timers[0])
ego_location = self.world.player.get_location()
colhist = self.world.collision_sensor.get_collision_history()
print("FPS: {:.2f}, ego location: {:.2f}, {:.2f}, collision frames: {}".format(fps, ego_location.x, ego_location.y, len(colhist)))
self.timers.clear()
# ==============================================================================
# -- KeyboardControl -----------------------------------------------------------
# ==============================================================================
class KeyboardControl(object):
def __init__(self, world):
world.hud.notification("Press 'H' or '?' for help.", seconds=4.0)
def parse_events(self):
for event in pygame.event.get():
if event.type == pygame.QUIT:
return True
if event.type == pygame.KEYUP:
if self._is_quit_shortcut(event.key):
return True
@staticmethod
def _is_quit_shortcut(key):
"""Shortcut for quitting"""
return (key == K_ESCAPE) or (key == K_q and pygame.key.get_mods() & KMOD_CTRL)
# ==============================================================================
# -- HUD -----------------------------------------------------------------------
# ==============================================================================
class HUD(object):
"""Class for HUD text"""
def __init__(self, width, height):
"""Constructor method"""
self.dim = (width, height)
font = pygame.font.Font(pygame.font.get_default_font(), 20)
font_name = 'courier' if os.name == 'nt' else 'mono'
fonts = [x for x in pygame.font.get_fonts() if font_name in x]
default_font = 'ubuntumono'
mono = default_font if default_font in fonts else fonts[0]
mono = pygame.font.match_font(mono)
self._font_mono = pygame.font.Font(mono, 12 if os.name == 'nt' else 14)
self._notifications = FadingText(font, (width, 40), (0, height - 40))
self.help = HelpText(pygame.font.Font(mono, 24), width, height)
self.server_fps = 0
self.frame = 0
self.simulation_time = 0
self._show_info = True
self._info_text = []
self._server_clock = pygame.time.Clock()
def on_world_tick(self, timestamp):
"""Gets informations from the world at every tick"""
self._server_clock.tick()
self.server_fps = self._server_clock.get_fps()
self.frame = timestamp.frame_count
self.simulation_time = timestamp.elapsed_seconds
def tick(self, world, clock):
"""HUD method for every tick"""
self._notifications.tick(world, clock)
if not self._show_info:
return
transform = world.player.get_transform()
vel = world.player.get_velocity()
control = world.player.get_control()
heading = 'N' if abs(transform.rotation.yaw) < 89.5 else ''
heading += 'S' if abs(transform.rotation.yaw) > 90.5 else ''
heading += 'E' if 179.5 > transform.rotation.yaw > 0.5 else ''
heading += 'W' if -0.5 > transform.rotation.yaw > -179.5 else ''
colhist = world.collision_sensor.get_collision_history()
collision = [colhist[x + self.frame - 200] for x in range(0, 200)]
max_col = max(1.0, max(collision))
collision = [x / max_col for x in collision]
vehicles = world.world.get_actors().filter('vehicle.*')
self._info_text = [
'Server: % 16.0f FPS' % self.server_fps,
'Client: % 16.0f FPS' % clock.get_fps(),
'',
'Vehicle: % 20s' % get_actor_display_name(world.player, truncate=20),
'Map: % 20s' % world.map.name.split('/')[-1],
'Simulation time: % 12s' % datetime.timedelta(seconds=int(self.simulation_time)),
'',
'Speed: % 15.0f km/h' % (3.6 * math.sqrt(vel.x**2 + vel.y**2 + vel.z**2)),
u'Heading:% 16.0f\N{DEGREE SIGN} % 2s' % (transform.rotation.yaw, heading),
'Location:% 20s' % ('(% 5.1f, % 5.1f)' % (transform.location.x, transform.location.y)),
'GNSS:% 24s' % ('(% 2.6f, % 3.6f)' % (world.gnss_sensor.lat, world.gnss_sensor.lon)),
'Height: % 18.0f m' % transform.location.z,
'']
if isinstance(control, carla.VehicleControl):
self._info_text += [
('Throttle:', control.throttle, 0.0, 1.0),
('Steer:', control.steer, -1.0, 1.0),
('Brake:', control.brake, 0.0, 1.0),
('Reverse:', control.reverse),
('Hand brake:', control.hand_brake),
('Manual:', control.manual_gear_shift),
'Gear: %s' % {-1: 'R', 0: 'N'}.get(control.gear, control.gear)]
elif isinstance(control, carla.WalkerControl):
self._info_text += [
('Speed:', control.speed, 0.0, 5.556),
('Jump:', control.jump)]
self._info_text += [
'',
'Collision:',
collision,
'',
'Number of vehicles: % 8d' % len(vehicles)]
if len(vehicles) > 1:
self._info_text += ['Nearby vehicles:']
def dist(l):
return math.sqrt((l.x - transform.location.x)**2 + (l.y - transform.location.y)
** 2 + (l.z - transform.location.z)**2)
vehicles = [(dist(x.get_location()), x) for x in vehicles if x.id != world.player.id]
for dist, vehicle in sorted(vehicles):
if dist > 200.0:
break
vehicle_type = get_actor_display_name(vehicle, truncate=22)
self._info_text.append('% 4dm %s' % (dist, vehicle_type))
def toggle_info(self):
"""Toggle info on or off"""
self._show_info = not self._show_info
def notification(self, text, seconds=2.0):
"""Notification text"""
self._notifications.set_text(text, seconds=seconds)
def error(self, text):
"""Error text"""
self._notifications.set_text('Error: %s' % text, (255, 0, 0))
def render(self, display):
"""Render for HUD class"""
if self._show_info:
info_surface = pygame.Surface((220, self.dim[1]))
info_surface.set_alpha(100)
display.blit(info_surface, (0, 0))
v_offset = 4
bar_h_offset = 100
bar_width = 106
for item in self._info_text:
if v_offset + 18 > self.dim[1]:
break
if isinstance(item, list):
if len(item) > 1:
points = [(x + 8, v_offset + 8 + (1 - y) * 30) for x, y in enumerate(item)]
pygame.draw.lines(display, (255, 136, 0), False, points, 2)
item = None
v_offset += 18
elif isinstance(item, tuple):
if isinstance(item[1], bool):
rect = pygame.Rect((bar_h_offset, v_offset + 8), (6, 6))
pygame.draw.rect(display, (255, 255, 255), rect, 0 if item[1] else 1)
else:
rect_border = pygame.Rect((bar_h_offset, v_offset + 8), (bar_width, 6))
pygame.draw.rect(display, (255, 255, 255), rect_border, 1)
fig = (item[1] - item[2]) / (item[3] - item[2])
if item[2] < 0.0:
rect = pygame.Rect(
(bar_h_offset + fig * (bar_width - 6), v_offset + 8), (6, 6))
else:
rect = pygame.Rect((bar_h_offset, v_offset + 8), (fig * bar_width, 6))
pygame.draw.rect(display, (255, 255, 255), rect)
item = item[0]
if item: # At this point has to be a str.
surface = self._font_mono.render(item, True, (255, 255, 255))
display.blit(surface, (8, v_offset))
v_offset += 18
self._notifications.render(display)
self.help.render(display)
# ==============================================================================
# -- FadingText ----------------------------------------------------------------
# ==============================================================================
class FadingText(object):
""" Class for fading text """
def __init__(self, font, dim, pos):
"""Constructor method"""
self.font = font
self.dim = dim
self.pos = pos
self.seconds_left = 0
self.surface = pygame.Surface(self.dim)
def set_text(self, text, color=(255, 255, 255), seconds=2.0):
"""Set fading text"""
text_texture = self.font.render(text, True, color)
self.surface = pygame.Surface(self.dim)
self.seconds_left = seconds
self.surface.fill((0, 0, 0, 0))
self.surface.blit(text_texture, (10, 11))
def tick(self, _, clock):
"""Fading text method for every tick"""
delta_seconds = 1e-3 * clock.get_time()
self.seconds_left = max(0.0, self.seconds_left - delta_seconds)
self.surface.set_alpha(500.0 * self.seconds_left)
def render(self, display):
"""Render fading text method"""
display.blit(self.surface, self.pos)
# ==============================================================================
# -- HelpText ------------------------------------------------------------------
# ==============================================================================
class HelpText(object):
""" Helper class for text render"""
def __init__(self, font, width, height):
"""Constructor method"""
lines = __doc__.split('\n')
self.font = font
self.dim = (680, len(lines) * 22 + 12)
self.pos = (0.5 * width - 0.5 * self.dim[0], 0.5 * height - 0.5 * self.dim[1])
self.seconds_left = 0
self.surface = pygame.Surface(self.dim)
self.surface.fill((0, 0, 0, 0))
for i, line in enumerate(lines):
text_texture = self.font.render(line, True, (255, 255, 255))
self.surface.blit(text_texture, (22, i * 22))
self._render = False
self.surface.set_alpha(220)
def toggle(self):
"""Toggle on or off the render help"""
self._render = not self._render
def render(self, display):
"""Render help text method"""
if self._render:
display.blit(self.surface, self.pos)
# ==============================================================================
# -- CollisionSensor -----------------------------------------------------------
# ==============================================================================
class CollisionSensor(object):
""" Class for collision sensors"""
def __init__(self, parent_actor, hud):
"""Constructor method"""
self.sensor = None
self.history = []
self._parent = parent_actor
self.hud = hud
world = self._parent.get_world()
blueprint = world.get_blueprint_library().find('sensor.other.collision')
self.sensor = world.spawn_actor(blueprint, carla.Transform(), attach_to=self._parent)
# We need to pass the lambda a weak reference to
# self to avoid circular reference.
weak_self = weakref.ref(self)
self.sensor.listen(lambda event: CollisionSensor._on_collision(weak_self, event))
def get_collision_history(self):
"""Gets the history of collisions"""
history = collections.defaultdict(int)
for frame, intensity in self.history:
history[frame] += intensity
return history
@staticmethod
def _on_collision(weak_self, event):
"""On collision method"""
self = weak_self()
if not self:
return
actor_type = get_actor_display_name(event.other_actor)
self.hud.notification('Collision with %r' % actor_type)
impulse = event.normal_impulse
intensity = math.sqrt(impulse.x ** 2 + impulse.y ** 2 + impulse.z ** 2)
self.history.append((event.frame, intensity))
if len(self.history) > 4000:
self.history.pop(0)
# ==============================================================================
# -- LaneInvasionSensor --------------------------------------------------------
# ==============================================================================
class LaneInvasionSensor(object):
"""Class for lane invasion sensors"""
def __init__(self, parent_actor, hud):
"""Constructor method"""
self.sensor = None
self._parent = parent_actor
self.hud = hud
world = self._parent.get_world()
bp = world.get_blueprint_library().find('sensor.other.lane_invasion')
self.sensor = world.spawn_actor(bp, carla.Transform(), attach_to=self._parent)
# We need to pass the lambda a weak reference to self to avoid circular
# reference.
weak_self = weakref.ref(self)
self.sensor.listen(lambda event: LaneInvasionSensor._on_invasion(weak_self, event))
@staticmethod
def _on_invasion(weak_self, event):
"""On invasion method"""
self = weak_self()
if not self:
return
lane_types = set(x.type for x in event.crossed_lane_markings)
text = ['%r' % str(x).split()[-1] for x in lane_types]
self.hud.notification('Crossed line %s' % ' and '.join(text))
# ==============================================================================
# -- GnssSensor --------------------------------------------------------
# ==============================================================================
class GnssSensor(object):
""" Class for GNSS sensors"""
def __init__(self, parent_actor):
"""Constructor method"""
self.sensor = None
self._parent = parent_actor
self.lat = 0.0
self.lon = 0.0
world = self._parent.get_world()
blueprint = world.get_blueprint_library().find('sensor.other.gnss')
self.sensor = world.spawn_actor(blueprint, carla.Transform(carla.Location(x=1.0, z=2.8)),
attach_to=self._parent)
# We need to pass the lambda a weak reference to
# self to avoid circular reference.
weak_self = weakref.ref(self)
self.sensor.listen(lambda event: GnssSensor._on_gnss_event(weak_self, event))
@staticmethod
def _on_gnss_event(weak_self, event):
"""GNSS method"""
self = weak_self()
if not self:
return
self.lat = event.latitude
self.lon = event.longitude
# ==============================================================================
# -- CameraManager -------------------------------------------------------------
# ==============================================================================
class CameraManager(object):
""" Class for camera management"""
def __init__(self, parent_actor, hud):
"""Constructor method"""
self.sensor = None
self.surface = None
self._parent = parent_actor
self.hud = hud
self.recording = False
bound_x = 0.5 + self._parent.bounding_box.extent.x
bound_y = 0.5 + self._parent.bounding_box.extent.y
bound_z = 0.5 + self._parent.bounding_box.extent.z
attachment = carla.AttachmentType
self._camera_transforms = [
(carla.Transform(carla.Location(x=-2.0*bound_x, y=+0.0*bound_y, z=2.0*bound_z), carla.Rotation(pitch=8.0)), attachment.SpringArmGhost),
(carla.Transform(carla.Location(x=+0.8*bound_x, y=+0.0*bound_y, z=1.3*bound_z)), attachment.Rigid),
(carla.Transform(carla.Location(x=+1.9*bound_x, y=+1.0*bound_y, z=1.2*bound_z)), attachment.SpringArmGhost),
(carla.Transform(carla.Location(x=-2.8*bound_x, y=+0.0*bound_y, z=4.6*bound_z), carla.Rotation(pitch=6.0)), attachment.SpringArmGhost),
(carla.Transform(carla.Location(x=-1.0, y=-1.0*bound_y, z=0.4*bound_z)), attachment.Rigid)]
self.transform_index = 1
self.sensors = [
['sensor.camera.rgb', cc.Raw, 'Camera RGB'],
['sensor.camera.depth', cc.Raw, 'Camera Depth (Raw)'],
['sensor.camera.depth', cc.Depth, 'Camera Depth (Gray Scale)'],
['sensor.camera.depth', cc.LogarithmicDepth, 'Camera Depth (Logarithmic Gray Scale)'],
['sensor.camera.semantic_segmentation', cc.Raw, 'Camera Semantic Segmentation (Raw)'],
['sensor.camera.semantic_segmentation', cc.CityScapesPalette,
'Camera Semantic Segmentation (CityScapes Palette)'],
['sensor.lidar.ray_cast', None, 'Lidar (Ray-Cast)']]
world = self._parent.get_world()
bp_library = world.get_blueprint_library()
for item in self.sensors:
blp = bp_library.find(item[0])
if item[0].startswith('sensor.camera'):
blp.set_attribute('image_size_x', str(hud.dim[0]))
blp.set_attribute('image_size_y', str(hud.dim[1]))
elif item[0].startswith('sensor.lidar'):
blp.set_attribute('range', '50')
item.append(blp)
self.index = None
def toggle_camera(self):
"""Activate a camera"""
self.transform_index = (self.transform_index + 1) % len(self._camera_transforms)
self.set_sensor(self.index, notify=False, force_respawn=True)
def set_sensor(self, index, notify=True, force_respawn=False):
"""Set a sensor"""
index = index % len(self.sensors)
needs_respawn = True if self.index is None else (
force_respawn or (self.sensors[index][0] != self.sensors[self.index][0]))
if needs_respawn:
if self.sensor is not None:
self.sensor.destroy()
self.surface = None
self.sensor = self._parent.get_world().spawn_actor(
self.sensors[index][-1],
self._camera_transforms[self.transform_index][0],
attach_to=self._parent,
attachment_type=self._camera_transforms[self.transform_index][1])
# We need to pass the lambda a weak reference to
# self to avoid circular reference.
weak_self = weakref.ref(self)
self.sensor.listen(lambda image: CameraManager._parse_image(weak_self, image))
if notify:
self.hud.notification(self.sensors[index][2])
self.index = index
def next_sensor(self):
"""Get the next sensor"""
self.set_sensor(self.index + 1)
def toggle_recording(self):
"""Toggle recording on or off"""
self.recording = not self.recording
self.hud.notification('Recording %s' % ('On' if self.recording else 'Off'))
def render(self, display):
"""Render method"""
if self.surface is not None:
display.blit(self.surface, (0, 0))
@staticmethod
def _parse_image(weak_self, image):
self = weak_self()
if not self:
return
if self.sensors[self.index][0].startswith('sensor.lidar'):
points = np.frombuffer(image.raw_data, dtype=np.dtype('f4'))
points = np.reshape(points, (int(points.shape[0] / 4), 4))
lidar_data = np.array(points[:, :2])
lidar_data *= min(self.hud.dim) / 100.0
lidar_data += (0.5 * self.hud.dim[0], 0.5 * self.hud.dim[1])
lidar_data = np.fabs(lidar_data) # pylint: disable=assignment-from-no-return
lidar_data = lidar_data.astype(np.int32)
lidar_data = np.reshape(lidar_data, (-1, 2))
lidar_img_size = (self.hud.dim[0], self.hud.dim[1], 3)
lidar_img = np.zeros(lidar_img_size)
lidar_img[tuple(lidar_data.T)] = (255, 255, 255)
self.surface = pygame.surfarray.make_surface(lidar_img)
else:
image.convert(self.sensors[self.index][1])
array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8"))
array = np.reshape(array, (image.height, image.width, 4))
array = array[:, :, :3]
array = array[:, :, ::-1]
self.surface = pygame.surfarray.make_surface(array.swapaxes(0, 1))
if self.recording:
image.save_to_disk('_out/%08d' % image.frame)
# ==============================================================================
# -- Game Loop ---------------------------------------------------------
# ==============================================================================
def carla_setup(args):
client = carla.Client(args.host, args.port)
client.set_timeout(10.0)
traffic_manager = client.get_trafficmanager(args.tm_port)
world = client.get_world()
if args.sync:
settings = world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = 0.05
world.apply_settings(settings)
traffic_manager.set_synchronous_mode(True)
return client, traffic_manager, world
def agent_setup(args, world):
if args.agent == "Basic":
return BasicAgent(world.player, 30)
agent.follow_speed_limits(True)
elif args.agent == "Constant":
return ConstantVelocityAgent(world.player, 30)
ground_loc = world.world.ground_projection(world.player.get_location(), 5)
if ground_loc:
world.player.set_location(ground_loc.location + carla.Location(z=0.01))
agent.follow_speed_limits(True)
elif args.agent == "Behavior":
return BehaviorAgent(world.player, behavior=args.behavior)
def game_loop(args):
"""
Main loop of the simulation. It handles updating all the HUD information,
ticking the agent and, if needed, the world.
"""
pygame.init()
pygame.font.init()
if args.seed:
random.seed(args.seed)
client, sim_trafficman, sim_world = carla_setup(args)
world = trafficman = None
try:
display = pygame.display.set_mode(
(args.width, args.height),
pygame.HWSURFACE | pygame.DOUBLEBUF)
hud = HUD(args.width, args.height)
world = World(sim_world, hud, args)
controller = KeyboardControl(world)
trafficman = TrafficManager(sim_world, sim_trafficman, args)
datastore = DataStorage(world)
# Set the agent destination
agent = agent_setup(args, world)
spawn_points = world.map.get_spawn_points()
destination = random.choice(spawn_points).location
agent.set_destination(destination)
clock = pygame.time.Clock()
while True:
clock.tick()
if args.sync:
sim_world.tick()
else:
sim_world.wait_for_tick()
if controller.parse_events():
return
world.tick(clock)
world.render(display)
pygame.display.flip()
datastore.tick()
if agent.done():
if args.loop:
agent.set_destination(random.choice(spawn_points).location)
world.hud.notification("Target reached", seconds=4.0)
print("The target has been reached, searching for another target")
else:
print("The target has been reached, stopping the simulation")
break
control = agent.run_step()
control.manual_gear_shift = False
world.player.apply_control(control)
finally:
settings = sim_world.get_settings()
settings.synchronous_mode = False
settings.fixed_delta_seconds = None
sim_world.apply_settings(settings)
sim_trafficman.set_synchronous_mode(True)
if world is not None:
world.destroy()
if trafficman is not None:
trafficman.destroy()
pygame.quit()
def headless_loop(args):
"""
Main loop of the simulation in headless mode.
"""
sim_world = sim_trafficman = None
world = trafficman = None
try:
if args.seed:
random.seed(args.seed)
client, sim_trafficman, sim_world = carla_setup(args)
world = World(sim_world, None, args)
trafficman = TrafficManager(sim_world, sim_trafficman, args)
datastore = DataStorage(world)
# Set the agent destination
agent = agent_setup(args, world)
spawn_points = world.map.get_spawn_points()
destination = random.choice(spawn_points).location
agent.set_destination(destination)
while True:
if args.sync:
sim_world.tick()
else:
sim_world.wait_for_tick()
world.tick(None)
datastore.tick()
if agent.done():
if args.loop:
agent.set_destination(random.choice(spawn_points).location)
print("The target has been reached, searching for another target")
else:
print("The target has been reached, stopping the simulation")
break
control = agent.run_step()
control.manual_gear_shift = False
world.player.apply_control(control)
finally:
settings = sim_world.get_settings()
settings.synchronous_mode = False
settings.fixed_delta_seconds = None
sim_world.apply_settings(settings)
sim_trafficman.set_synchronous_mode(True)
if world is not None:
world.destroy()
if trafficman is not None:
trafficman.destroy()
# ==============================================================================
# -- main() --------------------------------------------------------------
# ==============================================================================
def main():
"""Main method"""
argparser = argparse.ArgumentParser(description='CARLA Automatic Control Client')
common_group = argparser.add_argument_group('common', "Common settings related to Carla")
common_group.add_argument(
'-v', '--verbose',
action='store_true',
dest='debug',
help='Print debug information')
common_group.add_argument(
'--host',
metavar='H',
default='127.0.0.1',
help='IP of the host server (default: 127.0.0.1)')
common_group.add_argument(
'-p', '--port',
metavar='P',
default=2000,
type=int,
help='TCP port to listen to (default: 2000)')
common_group.add_argument(
'-m', '--tm-port',
metavar='P',
default=6000,
type=int,
help='Port to communicate with TM (default: 6000)')
common_group.add_argument(
'--res',
metavar='WIDTHxHEIGHT',
default='1280x720',
help='Window resolution (default: 1280x720)')
common_group.add_argument(
'--sync',
action='store_true',
help='Synchronous mode execution')
common_group.add_argument(
'-x', '--headless',
help='Run the collection script without Pygame window',
action="store_true"
)
common_group.add_argument(
'-s', '--seed',
help='Set seed for repeating executions (default: None)',
default=None,
type=int)
agent_group = argparser.add_argument_group('agent', "Agent settings")
agent_group.add_argument(
'--filter',
metavar='PATTERN',
default='vehicle.*',
help='Actor filter (default: "vehicle.*")')
agent_group.add_argument(
'--generation',
metavar='G',
default='2',
help='restrict to certain actor generation (values: "1","2","All" - default: "2")')
agent_group.add_argument(
'-l', '--loop',
action='store_true',
help='Sets a new random destination upon reaching the previous one (default: False)')
agent_group.add_argument(
"-a", "--agent", type=str,
choices=["Behavior", "Basic", "Constant"],
help="select which agent to run",
default="Behavior")
agent_group.add_argument(
'-b', '--behavior', type=str,
choices=["cautious", "normal", "aggressive"],
help='Choose one of the possible agent behaviors (default: normal) ',
default='normal')
traffic_group = argparser.add_argument_group('traffic', "Traffic manager settings")
traffic_group.add_argument(
'--seedw',
metavar='S',
default=0,
type=int,
help='Set the seed for pedestrians module')
traffic_group.add_argument(
'--car-lights-on',
action='store_true',
default=False,
help='Enable automatic car light management')
traffic_group.add_argument(
'--respawn',
action='store_true',
default=False,
help='Automatically respawn dormant vehicles (only in large maps)')
traffic_group.add_argument(
'-n', '--number-of-vehicles',
metavar='N',
default=30,
type=int,
help='Number of vehicles (default: 30)')
traffic_group.add_argument(
'-w', '--number-of-walkers',
metavar='W',
default=10,
type=int,
help='Number of walkers (default: 10)')
traffic_group.add_argument(
'--safe',
action='store_true',
help='Avoid spawning vehicles prone to accidents')
traffic_group.add_argument(
'--filterv',
metavar='PATTERN',
default='vehicle.*',
help='Filter vehicle model (default: "vehicle.*")')
traffic_group.add_argument(
'--generationv',
metavar='G',
default='All',
help='restrict to certain vehicle generation (values: "1","2","All" - default: "All")')
traffic_group.add_argument(
'--filterw',
metavar='PATTERN',
default='walker.pedestrian.*',
help='Filter pedestrian type (default: "walker.pedestrian.*")')
traffic_group.add_argument(
'--generationw',
metavar='G',
default='2',
help='restrict to certain pedestrian generation (values: "1","2","All" - default: "2")')
traffic_group.add_argument(
'--hybrid',
action='store_true',
help='Activate hybrid mode for Traffic Manager')
args = argparser.parse_args()
args.width, args.height = [int(x) for x in args.res.split('x')]
log_level = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(format='%(levelname)s: %(message)s', level=log_level)
logging.info('listening to server %s:%s', args.host, args.port)
print(__doc__)
try:
if args.headless:
headless_loop(args)
else:
game_loop(args)
except KeyboardInterrupt:
print('\nCancelled by user. Bye!')
if __name__ == '__main__':
main()
import curses
import os
import argparse
import carla
ACTION_MAP = {
"w": lambda world, ego: carla.command.ApplyImpulse(ego.id, ego.get_transform().get_forward_vector() * ego.get_physics_control().mass * 10),
"s": lambda world, ego: carla.command.ApplyImpulse(ego.id, ego.get_transform().get_forward_vector() * ego.get_physics_control().mass * -10),
"d": lambda world, ego: carla.command.ApplyImpulse(ego.id, ego.get_transform().get_right_vector() * ego.get_physics_control().mass * 10),
"a": lambda world, ego: carla.command.ApplyImpulse(ego.id, ego.get_transform().get_right_vector() * ego.get_physics_control().mass * -10),
"q": lambda world, ego: carla.command.ApplyAngularImpulse(ego.id, ego.get_transform().get_up_vector() * ego.get_physics_control().mass * -2e6),
"e": lambda world, ego: carla.command.ApplyAngularImpulse(ego.id, ego.get_transform().get_up_vector() * ego.get_physics_control().mass * 2e6),
"1": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.Position),
"2": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.LowBeam),
"3": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.HighBeam),
"4": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.Brake),
"5": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.RightBlinker),
"6": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.LeftBlinker),
"7": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.Reverse),
"8": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.Fog),
"9": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ carla.VehicleLightState.Interior),
"0": lambda world, ego: carla.command.SetVehicleLightState(ego.id, ego.get_light_state() ^ (carla.VehicleLightState.Special1 | carla.VehicleLightState.Special2)),
"-": lambda world, ego: carla.command.SetVehicleLightState(ego.id, carla.VehicleLightState.NONE),
"+": lambda world, ego: carla.command.SetVehicleLightState(ego.id, carla.VehicleLightState.All),
# TODO: support open door, hand brake
}
def window_main(win, client, world, player):
win.nodelay(True)
win.clear()
inst = """Instructions:
[awds]: Apply force pulse in certain direction relative to the hero object.
[qe]: Apply torque pulse in z-axis of the hero object.
[0-9|-+]: Switch vehicle light state"""
inst_lines = inst.count('\n') + 1
win.addstr(0, 0, inst)
win.hline(inst_lines, 0, '-', 30)
win.addstr(inst_lines + 1, 0, "Press key to trigger actions:")
pad_lines = inst_lines + 2
pending_keys = []
last_snapshot = None
def print_pending():
for i, key in enumerate(pending_keys):
win.addstr(pad_lines + i, 0, str(key))
win.clrtobot()
while True:
# process key events
try:
key = win.getkey()
except Exception as e:
key = None
if key:
pending_keys.append(key)
# apply actions
snapshot = world.wait_for_tick() # TODO: change to on_tick with queue
if last_snapshot and last_snapshot.frame < snapshot.frame:
actions = [ACTION_MAP[key](world, player) for key in pending_keys]
client.apply_batch(actions)
pending_keys.clear()
last_snapshot = snapshot
# print status
print_pending()
# TODO: check ego status
def main():
argparser = argparse.ArgumentParser(
description='CARLA Manual Control Client')
argparser.add_argument(
'--host',
metavar='H',
default='127.0.0.1',
help='IP of the host server (default: 127.0.0.1)')
argparser.add_argument(
'-p', '--port',
metavar='P',
default=2000,
type=int,
help='TCP port to listen to (default: 2000)')
argparser.add_argument(
'--rolename',
metavar='NAME',
default='hero',
help='actor role name (default: "hero")')
args = argparser.parse_args()
client = carla.Client(args.host, args.port)
client.set_timeout(60.0)
world = client.get_world()
for actor in world.get_actors():
if actor.attributes.get('role_name') == args.rolename:
player = actor
break
else:
# player = None
raise RuntimeError("can't find an actor with the given role name in the simulation")
curses.wrapper(window_main, client, world, player)
if __name__ == "__main__":
main()
# curses.wrapper(window_main, None, None)
import os, argparse
import carla
import curses
import math
def vec_to_rot(vec: carla.Vector3D):
pitch = math.atan2(vec.z, math.hypot(vec.x, vec.y))
yaw = math.atan2(vec.y, vec.x)
roll = 0
return carla.Rotation(pitch, yaw, roll)
def window_main(win, client, world, args):
world_map = world.get_map()
spectator = world.get_spectator()
win.nodelay(True)
win.clear()
inst = """Instructions:
[a]: Save the current casted point (the yellow point) on the ground
[s]: Save the current nearest waypoint (the cyan point) on the ground
[z]: Add the current casted point to the polyline
[x]: Add the current nearest waypoint to the polyline
[c]: Save and clear the current polyline"""
inst_lines = inst.count('\n') + 1
win.addstr(0, 0, inst)
win.hline(inst_lines, 0, '-', 30)
win.addstr(inst_lines + 1, 0, "Press key to trigger actions:")
saved_points = []
saved_polylines = []
current_polyline = []
def get_marker_size(distance, max_size = 1.0):
return max_size / math.exp(distance / args.ray_length)
def format_location(location: carla.Location):
return f"({location.x:.3f}, {location.y:.3f}, {location.z:.3f})"
def print_total():
return f" ({len(saved_points)} points & {len(saved_polylines)} lines saved in total)"
def print_poly_total():
return f" ({len(current_polyline)} points saved in current polyline)"
while True:
# find casted points from spectator
pose = spectator.get_transform()
direction = pose.get_forward_vector()
casted = world.cast_ray(pose.location, pose.location + direction * args.ray_length)
# draw polyline
if len(current_polyline) > 1:
for a, b in zip(current_polyline[:-1], current_polyline[1:]):
world.debug.draw_line(a, b, color=carla.Color(0, 200, 0), life_time=0.1)
# draw the casted points
if len(casted) > 0:
first = casted[0]
cast_dists = [(p.location - pose.location).length() for p in casted]
hit_text = ["%dhits" % len(casted)] + ["%.2f" % d for d in cast_dists]
text = "{} @ {} | {}".format(first.label, format_location(first.location), ", ".join(hit_text))
if len(current_polyline) > 0:
# print the direction
# TODO: change to use a key to activate the rotation display from last saved point (and draw line)
direction = first.location - current_polyline[-1]
text += f"\n{vec_to_rot(direction)}"
world.debug.draw_point(first.location, color=carla.Color(200, 200, 0), size=get_marker_size(cast_dists[0]), life_time=0.1)
world.debug.draw_string(first.location, text, color=carla.Color(255, 255, 255), draw_shadow=True)
# draw nearby waypoints
waypoint = world_map.get_waypoint(first.location, lane_type=carla.LaneType.Driving)
if waypoint is not None:
world.debug.draw_point(waypoint.transform.location, color=carla.Color(0, 200, 200), size=get_marker_size(cast_dists[0]), life_time=0.1)
world.debug.draw_string(
waypoint.transform.location,
"{} @ {} | ({}, {}, {:.3f})".format(waypoint.lane_type, format_location(waypoint.transform.location), waypoint.road_id, waypoint.lane_id, waypoint.s),
color=carla.Color(255, 255, 255), draw_shadow=True
)
# draw polyline to the current point
if len(current_polyline) > 0:
world.debug.draw_line(current_polyline[-1], first.location, color=carla.Color(0, 200, 0), life_time=0.1)
else:
first = waypoint = None
# process key events
try:
key = win.getkey()
except Exception as e:
key = None
if key == 'a' or key == 'z':
if first is not None:
loc = first.location
if key == 'a':
saved_points.append(loc)
win.addstr(inst_lines + 2, 0, f"Saved {format_location(loc)}." + print_total())
else: # key == 'z'
current_polyline.append(loc)
win.addstr(inst_lines + 2, 0, f"Saved {format_location(loc)} to polyline." + print_poly_total())
else:
win.addstr(inst_lines + 2, 0, f"No point selected")
elif key == 's' or key == 'x':
if waypoint is not None:
loc = waypoint.transform.location
if key == 's':
saved_points.append(loc)
win.addstr(inst_lines + 2, 0, f"Saved {format_location(loc)}." + print_total())
else: # key == 'x'
current_polyline.append(loc)
win.addstr(inst_lines + 2, 0, f"Saved {format_location(loc)} to polyline." + print_poly_total())
else:
win.addstr(inst_lines + 2, 0, f"No waypoint selected")
elif key == 'c':
saved_polylines.append(current_polyline)
win.addstr(inst_lines + 2, 0, f"Saved polyline with {len(current_polyline)} points")
current_polyline = []
else:
pass
win.clrtobot()
world.wait_for_tick()
def main():
argparser = argparse.ArgumentParser(
description='Viewpoint visualization')
argparser.add_argument(
'--host',
metavar='H',
default='127.0.0.1',
help='IP of the host server (default: 127.0.0.1)')
argparser.add_argument(
'-p', '--port',
metavar='P',
default=2000,
type=int,
help='TCP port to listen to (default: 2000)')
argparser.add_argument(
'-l', '--ray-length',
metavar='L',
default=10.0,
type=float,
help='Length of ray-casting'
)
args = argparser.parse_args()
client = carla.Client(args.host, args.port)
client.set_timeout(60.0)
world = client.get_world()
curses.wrapper(window_main, client, world, args)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment