Last active
June 22, 2020 20:39
-
-
Save HemaZ/e00e66c7dd3524dd851514f26fda960e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class AirSim: | |
def parse_lidarData(self, data): | |
# function to create PointCloud2 from Lidar Data | |
# reshape array of floats to array of [X,Y,Z] | |
points = np.array(data.point_cloud, dtype=np.dtype('f4')) | |
points = np.reshape(points, (int(points.shape[0]/3), 3)) | |
header = Header() | |
header.frame_id = "lidar" | |
pcl = pcl2.create_cloud_xyz32(header, points) | |
return pcl | |
def run(self): | |
# Block Main Loop | |
last_time = time.time() # start time of the loop | |
while(True): | |
# timer to limit the frame rate | |
if(time.time() - last_time < self.period): | |
try: | |
time.sleep(self.period-(time.time() - last_time)) | |
except: | |
pass | |
# check for the time limit for the executed command | |
if self.move_command: | |
if self.car: | |
self.car_controls.throttle = self.command_list[0] | |
self.car_controls.steering = self.command_list[1] | |
self.client.setCarControls(self.car_controls) | |
self.alert("Moving Forward ", "INFO") | |
self.car_moved = True | |
self.stop_at = time.time() + self.command_list[2] | |
else: | |
self.alert("Moving To {}".format( | |
self.command_list), "INFO") | |
self.client.moveToPositionAsync( | |
self.command_list[0], self.command_list[1], self.command_list[2], self.command_list[3]) | |
self.move_command = False | |
if(time.time() >= self.stop_at and self.car_moved): | |
self.car_controls.throttle = 0 | |
self.client.setCarControls(self.car_controls) | |
self.car_moved = False | |
self.alert("Stopping the Car ", "INFO") | |
# AirSim Default Cameras names | |
cameras = ["front_center", "front_right", | |
"front_left", "fpv", "back_center"] | |
# Reading front_center camera | |
camera_id = 0 | |
responses = self.client.simGetImages( | |
[airsim.ImageRequest(0, airsim.ImageType.Scene, False, False)]) | |
print('Retrieved images: %d', len(responses)) | |
for response in responses: | |
# get numpy array | |
img1d = np.fromstring( | |
response.image_data_uint8, dtype=np.uint8) | |
# reshape array to 4 channel image array H X W X 4 | |
img_rgb = img1d.reshape(response.height, response.width, 3) | |
# Create Header for the ROS message | |
header = Header() | |
set_timestamp(header, time.time()) | |
header.frame_id = cameras[camera_id] | |
# Convert Numpy array to ROS message using yonoarc_utils.image | |
img_msg = from_ndarray(img_rgb, header) | |
# Publish this message on Block's output port | |
self.publish("cam_{}".format(camera_id), img_msg) | |
camera_id += 1 | |
# Read Semantic Segmentation Image from Camera 0 | |
# Create ROS message and publish it like the previous steps | |
seg_responses = self.client.simGetImages( | |
[airsim.ImageRequest(0, airsim.ImageType.Segmentation, False, False)]) | |
img1d = np.fromstring(seg_responses[0].image_data_uint8, | |
dtype=np.uint8) | |
img_rgb = img1d.reshape( | |
seg_responses[0].height, seg_responses[0].width, 3) | |
header = Header() | |
header.frame_id = "front_center" | |
img_msg = from_ndarray(img_rgb, header) | |
self.publish("sem_segm", img_msg) | |
# Read Lidar Data and convert it to PointCloud2 ROS message | |
lidarData = self.client.getLidarData() | |
if (len(lidarData.point_cloud) < 3): | |
self.alert("\tNo points received from Lidar data", "WARN") | |
else: | |
points = self.parse_lidarData(lidarData) | |
self.publish("lidar", points) | |
# Update Timer | |
last_time = time.time() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment