Skip to content

Instantly share code, notes, and snippets.

@HemaZ
Last active June 22, 2020 20:39
Show Gist options
  • Save HemaZ/e00e66c7dd3524dd851514f26fda960e to your computer and use it in GitHub Desktop.
Save HemaZ/e00e66c7dd3524dd851514f26fda960e to your computer and use it in GitHub Desktop.
class AirSim:
def parse_lidarData(self, data):
# function to create PointCloud2 from Lidar Data
# reshape array of floats to array of [X,Y,Z]
points = np.array(data.point_cloud, dtype=np.dtype('f4'))
points = np.reshape(points, (int(points.shape[0]/3), 3))
header = Header()
header.frame_id = "lidar"
pcl = pcl2.create_cloud_xyz32(header, points)
return pcl
def run(self):
# Block Main Loop
last_time = time.time() # start time of the loop
while(True):
# timer to limit the frame rate
if(time.time() - last_time < self.period):
try:
time.sleep(self.period-(time.time() - last_time))
except:
pass
# check for the time limit for the executed command
if self.move_command:
if self.car:
self.car_controls.throttle = self.command_list[0]
self.car_controls.steering = self.command_list[1]
self.client.setCarControls(self.car_controls)
self.alert("Moving Forward ", "INFO")
self.car_moved = True
self.stop_at = time.time() + self.command_list[2]
else:
self.alert("Moving To {}".format(
self.command_list), "INFO")
self.client.moveToPositionAsync(
self.command_list[0], self.command_list[1], self.command_list[2], self.command_list[3])
self.move_command = False
if(time.time() >= self.stop_at and self.car_moved):
self.car_controls.throttle = 0
self.client.setCarControls(self.car_controls)
self.car_moved = False
self.alert("Stopping the Car ", "INFO")
# AirSim Default Cameras names
cameras = ["front_center", "front_right",
"front_left", "fpv", "back_center"]
# Reading front_center camera
camera_id = 0
responses = self.client.simGetImages(
[airsim.ImageRequest(0, airsim.ImageType.Scene, False, False)])
print('Retrieved images: %d', len(responses))
for response in responses:
# get numpy array
img1d = np.fromstring(
response.image_data_uint8, dtype=np.uint8)
# reshape array to 4 channel image array H X W X 4
img_rgb = img1d.reshape(response.height, response.width, 3)
# Create Header for the ROS message
header = Header()
set_timestamp(header, time.time())
header.frame_id = cameras[camera_id]
# Convert Numpy array to ROS message using yonoarc_utils.image
img_msg = from_ndarray(img_rgb, header)
# Publish this message on Block's output port
self.publish("cam_{}".format(camera_id), img_msg)
camera_id += 1
# Read Semantic Segmentation Image from Camera 0
# Create ROS message and publish it like the previous steps
seg_responses = self.client.simGetImages(
[airsim.ImageRequest(0, airsim.ImageType.Segmentation, False, False)])
img1d = np.fromstring(seg_responses[0].image_data_uint8,
dtype=np.uint8)
img_rgb = img1d.reshape(
seg_responses[0].height, seg_responses[0].width, 3)
header = Header()
header.frame_id = "front_center"
img_msg = from_ndarray(img_rgb, header)
self.publish("sem_segm", img_msg)
# Read Lidar Data and convert it to PointCloud2 ROS message
lidarData = self.client.getLidarData()
if (len(lidarData.point_cloud) < 3):
self.alert("\tNo points received from Lidar data", "WARN")
else:
points = self.parse_lidarData(lidarData)
self.publish("lidar", points)
# Update Timer
last_time = time.time()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment