Skip to content

Instantly share code, notes, and snippets.

@kargarisaac
Last active July 2, 2022 14:50
Show Gist options
  • Save kargarisaac/57324b76c57a84cba5c8b9ec7412ba51 to your computer and use it in GitHub Desktop.
Save kargarisaac/57324b76c57a84cba5c8b9ec7412ba51 to your computer and use it in GitHub Desktop.
from __future__ import division
import numpy as np
import random
import gym
from gym import spaces
from gym.utils import seeding
import glob
import os
import torch
import sys
sys.path.append('/home/isaac/software/CARLA_0.9.9/PythonAPI/carla')
try:
sys.path.append(glob.glob('/home/isaac/software/CARLA_0.9.9/PythonAPI/carla/dist/carla-*%d.%d-%s.egg' % (
sys.version_info.major,
sys.version_info.minor,
'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
except IndexError:
pass
from agents.navigation.global_route_planner import GlobalRoutePlanner
from agents.navigation.global_route_planner_dao import GlobalRoutePlannerDAO
from agents.navigation.controller import VehiclePIDController
from agents.navigation.agent import Agent
from agents.navigation.behavior_agent import BehaviorAgent
import matplotlib.pyplot as plt
import carla
import matplotlib.pyplot as plt
class CarlaEnv2(gym.Env):
"""An OpenAI gym wrapper for CARLA simulator."""
def __init__(self, params):
super(CarlaEnv2, self).__init__()
# parameters
self.number_of_vehicles = params['number_of_vehicles']
self.dt = params['dt']
self.max_time_episode = params['max_time_episode']
self.out_lane_thres = params['out_lane_thres']
self.ego_autopilot = params['ego_autopilot']
self.clnt_no = params['clnt_no']
self.obs_size = 256
# Connect to carla server and get world object
print('connecting to Carla server...')
client = carla.Client('localhost', params['port'])
client.set_timeout(2.0)
try:
self.world = client.load_world(params['town'])
except:
pass
self.world = client.get_world()
print('Carla server connected!')
# Set weather
self.world.set_weather(carla.WeatherParameters.ClearNoon)
## added for 0.9.9 and tm autopilot
settings = self.world.get_settings()
settings.synchronous_mode = True # Enables synchronous mode
settings.fixed_delta_seconds = params['dt']
self.world.apply_settings(settings)
# get all spawn points- used for ego starting point. For others we find nearest points to ego
self.all_vehicle_spawn_points = list(
self.world.get_map().get_spawn_points())
random.shuffle(self.all_vehicle_spawn_points)
# Create the ego vehicle blueprint
self.ego_bp = self._create_vehicle_bluepprint(
params['ego_vehicle_filter'], color='49,8,8')
self.ego_bp.set_attribute('role_name', 'hero')
# Collision sensor
self.collision_hist = [] # The collision history
self.collision_hist_l = 1 # collision history length
self.collision_bp = self.world.get_blueprint_library().find('sensor.other.collision')
# Camera sensor
self.camera_img = np.zeros(
(self.obs_size, self.obs_size, 3), dtype=np.uint8)
self.camera_trans = carla.Transform(carla.Location(x=0.8, z=1.7))
self.camera_bp = self.world.get_blueprint_library().find('sensor.camera.rgb')
# Modify the attributes of the blueprint to set image resolution and field of view.
self.camera_bp.set_attribute('image_size_x', str(self.obs_size))
self.camera_bp.set_attribute('image_size_y', str(self.obs_size))
self.camera_bp.set_attribute('fov', '110')
# Set the time in seconds between sensor captures
self.camera_bp.set_attribute('sensor_tick', '0.02')
# Set fixed simulation step for synchronous mode
self.settings = self.world.get_settings()
self.settings.fixed_delta_seconds = self.dt
# Record the time of total steps and resetting steps
self.reset_step = 0
self.total_step = 0
# self.dests = None
dst = self.all_vehicle_spawn_points[1].location
self.dests = [[
dst.x,
dst.y,
dst.z
]]
## traffic manager for 0.9.10
self.tm = client.get_trafficmanager(params['tm_port'])
self.others_behavior_agents = []
# first reset and plan a global rout(insode it)
self.reset()
def reset(self):
# Clear sensor objects
self.collision_sensor = None
# self.lidar_sensor = None
self.camera_sensor = None
# Delete sensors, vehicles and walkers
self._clear_all_actors(['sensor.other.collision', 'sensor.camera.rgb', 'vehicle.*',
'controller.ai.walker', 'walker.*'])
# Disable sync mode
self._set_synchronous_mode(False)
# Spawn the ego vehicle
transform = self.all_vehicle_spawn_points[0]
self._try_spawn_ego_vehicle_at(transform)
## get nearest spawn locations
all_wps = list(
self.world.get_map().get_spawn_points())
near = []
origin = self.ego.get_location()
for wp in all_wps:
dist = np.sqrt((wp.location.x - origin.x)**2 + (wp.location.y - origin.y)**2)
if dist < 100:
near.append(wp)
self.vehicle_spawn_points = near
# Spawn surrounding vehicles
random.shuffle(self.vehicle_spawn_points)
count = self.number_of_vehicles
if count > 0:
for spawn_point in self.vehicle_spawn_points:
# print(count)
if self._try_spawn_random_vehicle_at(spawn_point, number_of_wheels=[4]):
count -= 1
if count <= 0:
break
while count > 0:
if self._try_spawn_random_vehicle_at(random.choice(self.vehicle_spawn_points),
number_of_wheels=[4]):
count -= 1
# Add collision sensor
self.collision_sensor = self.world.spawn_actor(
self.collision_bp, carla.Transform(), attach_to=self.ego)
self.collision_sensor.listen(lambda event: get_collision_hist(event))
def get_collision_hist(event):
impulse = event.normal_impulse
intensity = np.sqrt(
impulse.x ** 2 + impulse.y ** 2 + impulse.z ** 2)
self.collision_hist.append(intensity)
if len(self.collision_hist) > self.collision_hist_l:
self.collision_hist.pop(0)
self.collision_hist = []
# Add camera sensor
self.camera_sensor = self.world.spawn_actor(
self.camera_bp, self.camera_trans, attach_to=self.ego)
self.camera_sensor.listen(lambda data: get_camera_img(data))
def get_camera_img(data):
array = np.frombuffer(data.raw_data, dtype=np.dtype("uint8"))
array = np.reshape(array, (data.height, data.width, 4))
array = array[:, :, :3]
array = array[:, :, ::-1]
self.camera_img = array
# Update timesteps
self.time_step = 0
self.reset_step += 1
# Enable sync mode
self.settings.synchronous_mode = True
self.world.apply_settings(self.settings)
return self._get_obs()
def step(self, action):
# print(self.ego.get_transform())
self.world.tick()
###------- ego controller for wp following --------
# self.ego_behavior_agent.update_information(self.world)
# num_min_waypoints = 21
# # Set new destination when target has been reached
# if len(self.ego_behavior_agent.get_local_planner().waypoints_queue) < num_min_waypoints:
# self.ego_behavior_agent.reroute(self.vehicle_spawn_points)
# speed_limit = self.ego.get_speed_limit()
# self.ego_behavior_agent.get_local_planner().set_speed(speed_limit)
# control = self.ego_behavior_agent.run_step()
# self.ego.apply_control(control)
###-----others control
# for ba in self.others_behavior_agents:
# ba.update_information(self.world)
# num_min_waypoints = 21
# # Set new destination when target has been reached
# if len(ba.get_local_planner().waypoints_queue) < num_min_waypoints:
# ba.reroute(self.vehicle_spawn_points)
# speed_limit = ba.vehicle.get_speed_limit()
# ba.get_local_planner().set_speed(speed_limit)
# control = ba.run_step()
# ba.vehicle.apply_control(control)
return (self._get_obs(), self._terminal())
def _create_vehicle_bluepprint(self, actor_filter, color=None, number_of_wheels=[4]):
"""Create the blueprint for a specific actor type.
Args:
actor_filter: a string indicating the actor type, e.g, 'vehicle.lincoln*'.
Returns:
bp: the blueprint object of carla.
"""
blueprints = self.world.get_blueprint_library().filter(actor_filter)
blueprint_library = []
for nw in number_of_wheels:
blueprint_library = blueprint_library + [x for x in blueprints if
int(x.get_attribute('number_of_wheels')) == nw]
bp = random.choice(blueprint_library)
if bp.has_attribute('color'):
if not color:
color = random.choice(
bp.get_attribute('color').recommended_values)
bp.set_attribute('color', color)
return bp
def _set_synchronous_mode(self, synchronous=True):
"""Set whether to use the synchronous mode.
"""
self.settings.synchronous_mode = synchronous
self.world.apply_settings(self.settings)
def _try_spawn_random_vehicle_at(self, transform, number_of_wheels=[4]):
blueprint = self._create_vehicle_bluepprint(
'vehicle.*', number_of_wheels=number_of_wheels)
blueprint.set_attribute('role_name', 'others')
vehicle = self.world.try_spawn_actor(blueprint, transform)
tm_port = self.tm.get_port()
if vehicle is not None:
# vehicle.set_autopilot(True)
vehicle.set_autopilot(True, tm_port)
# start_point = transform.location
# behaviors = ['cautious', 'normal', 'aggressive']
# behavior_idx = np.random.randint(0, 3)
# behavior_agent = BehaviorAgent(vehicle, behavior=behaviors[behavior_idx]) # cautious, normal, aggressive
# if self.vehicle_spawn_points[0].location != behavior_agent.vehicle.get_location():
# end_point = self.vehicle_spawn_points[0].location
# else:
# end_point = self.vehicle_spawn_points[1].location
# behavior_agent.set_destination(start_point, end_point, clean=True)
# self.others_behavior_agents.append(behavior_agent)
return True
return False
def _try_spawn_ego_vehicle_at(self, transform):
"""Try to spawn the ego vehicle at specific transform.
Args:
transform: the carla transform object.
Returns:
Bool indicating whether the spawn is successful.
"""
vehicle = self.world.try_spawn_actor(self.ego_bp, transform)
tm_port = self.tm.get_port()
if vehicle is not None:
self.ego = vehicle
# start_point = transform.location
# dest_id = 0
# end_point = carla.Location(
# x=self.dests[dest_id][0], y=self.dests[dest_id][1], z=self.dests[dest_id][2])
# self.ego_behavior_agent = BehaviorAgent(self.ego, behavior='normal') # cautious, normal, aggressive
# if self.all_vehicle_spawn_points[0].location != self.ego_behavior_agent.vehicle.get_location():
# destination = self.all_vehicle_spawn_points[0].location
# else:
# destination = self.all_vehicle_spawn_points[1].location
# # if self.ego_autopilot:
# self.ego_behavior_agent.set_destination(start_point, end_point, clean=True)
# self.waypoints = self.ego_behavior_agent._local_planner.waypoints_queue
self.ego.set_autopilot(True, tm_port)
print('ego vehicle is in autolipot mode')
return True
return False
def _set_carla_transform(self, pose):
"""Get a carla tranform object given pose.
Args:
pose: [x, y, yaw].
Returns:
transform: the carla transform object
"""
transform = carla.Transform()
transform.location.x = pose[0]
transform.location.y = pose[1]
transform.rotation.yaw = pose[2]
return transform
def _get_obs(self):
"""Get the observations."""
frontview = self.camera_img
return frontview
def _get_ego_pos(self):
"""Get the ego vehicle pose (x, y)."""
ego_trans = self.ego.get_transform()
ego_x = ego_trans.location.x
ego_y = ego_trans.location.y
return ego_x, ego_y
def _terminal(self):
"""Calculate whether to terminate the current episode."""
# Get ego state
ego_x, ego_y = self._get_ego_pos()
# If collides
if len(self.collision_hist) > 0:
return True
# If reach maximum timestep
if self.time_step > self.max_time_episode:
return True
# If at destination
if self.dests is not None: # If at destination
for dest in self.dests:
if np.sqrt((ego_x - dest[0]) ** 2 + (ego_y - dest[1]) ** 2) < 6:
return True
return False
def _clear_all_actors(self, actor_filters):
"""Clear specific actors."""
for actor_filter in actor_filters:
for actor in self.world.get_actors().filter(actor_filter):
if actor.is_alive:
if actor.type_id == 'controller.ai.walker':
actor.stop()
actor.destroy()
def destroy_sensors(self):
try:
self.collision_sensor.destroy()
self.collision_sensor = None
except:
print('no collision sensor to destroy')
self.collision_sensor = None
try:
self.camera_sensor.destroy()
self.camera_sensor = None
except:
print('no camera sensor to destroy')
self.camera_sensor = None
def main():
# parameters for the gym_carla environment
params = {
'number_of_vehicles': 100,
'dt': 0.05, # time interval between two frames
'ego_vehicle_filter': 'vehicle.lincoln*', # filter for defining ego vehicle
'port': 4000, # connection port
'tm_port': 4050,
'town': 'Town01', # which town to simulate
'max_time_episode': 2000, # maximum timesteps per episode
'out_lane_thres': 2.0, # 2 # threshold for out of lane
'desired_speed': 8, # desired speed (m/s)
'ego_autopilot': False,
'clnt_no': 0,
}
# Set gym-carla environment
env = CarlaEnv2(params=params)
obs = env.reset()
while True:
action = [1.0, 0.0]
obs,done = env.step(action)
print('running ...')
if done:
obs = env.reset()
if __name__ == "__main__":
main()
@cvCaptureFromFile
Copy link

hello

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment