Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import os
import re
import sys
import copy
import struct
import pprint
import traceback
def pcd_read(filename, verbose=0, verbose_steps=0):
# Refer this to unpack - https://docs.python.org/3/library/struct.html#format-characters
with open(filename, 'rb') as fp:
data_ip = fp.read()
data_op = {}
data_header = ''
data_payload = ''
matchObj = re.finditer(b'[\n]DATA\s(\S*)[\n]', data_ip, flags = 0)
for group in matchObj:
## Step1 : Get the header
data_header = data_ip[:group.end(0)]
data_header = data_header.decode('utf-8').split('\n')
field_names = data_header[2].split(' ')[1:]
field_size = data_header[3].split(' ')[1:]
field_type = data_header[4].split(' ')[1:]
data_count = int(data_header[9].split(' ')[1])
viewpoint = data_header[8]
data_type = data_header[10].split(' ')[1].lower()
if verbose:
print (' - field_names : ', field_names)
print (' - field_size : ', field_size)
print (' - field_type : ', field_type)
print (' - data_count : ', data_count, ' rows ')
print (' - viewpoint : ', viewpoint)
print (' - data_type : ', data_type)
## Step1.1 : Acumulate data on fields --> fields_dict
fields_dict = {}
for i, field_name in enumerate(field_names):
if int(field_size[i]) == 8:
fields_dict[i] = [str(field_name), int(field_size[i]), 'd']
elif int(field_size[i]) == 2:
fields_dict[i] = [str(field_name), int(field_size[i]), 'H']
else:
fields_dict[i] = [str(field_name), int(field_size[i]), str(field_type[i]).lower()]
if verbose :
print ('\n ----- Fields -----')
pprint.pprint(fields_dict, indent=4)
## Step2 - Iterate through the payload
data_payload = data_ip[group.end(0):]
data_count_tmp = 0
i = 0
for id_ in fields_dict:
data_op[fields_dict[id_][0]] = []
if verbose :
print ('\n ----- Output Skeleton -----')
pprint.pprint(data_op)
if data_type == 'ascii':
data_payload = data_payload.decode('utf-8')
rows = data_payload.split('\n')
while (i < len(rows)):
row_vals = rows[i].split(' ')
for id_ in fields_dict:
field_name = str(fields_dict[id_][0])
field_size = int(fields_dict[id_][1])
field_type = str(fields_dict[id_][2])
if field_type in ['f', 'd']:
tmp_data = float(row_vals[id_])
elif field_type == 'H':
tmp_data = int(row_vals[id_])
data_op[field_name].append(tmp_data)
i += 1
data_count_tmp += 1
if data_count_tmp == data_count:
break
elif data_type == 'binary':
while(i < len(data_payload)):
try:
for id_ in fields_dict:
field_name = str(fields_dict[id_][0])
field_size = int(fields_dict[id_][1])
field_type = str(fields_dict[id_][2])
tmp_data = data_payload[i:i+field_size]
if len(tmp_data) == field_size:
tmp_data = struct.unpack(field_type, tmp_data)[0]
elif len(tmp_data) == 0:
break
if verbose_steps :
print ('data_count_tmp : ',data_count_tmp+1, ' || i : ', i, ' || field : ', field_name,' || data : ', tmp_data)
i += field_size
data_op[field_name].append(tmp_data)
data_count_tmp += 1
if data_count_tmp == data_count:
break
except Exception as e:
print (' - Error : ', e)
traceback.print_exc()
sys.exit(1)
if verbose:
for id_ in fields_dict:
field_name = str(fields_dict[id_][0])
print ('\n ----- Output Count -----')
print (' - Field : ', field_name, ' || Data Count : ', len(data_op[field_name]))
break
return data_op, fields_dict
def pcd_write_bin(data_op, fields_dict, data_op_file, fields_required=[],verbose=0):
data = []
field_names = []
if len(fields_required):
for id_ in fields_dict:
field_name = fields_dict[id_][0]
if field_name not in fields_required:
data_op.pop(field_name)
else:
field_names.append(field_name)
else:
field_names = [fields_dict[id_][0] for id_ in fields_dict]
## Step2 - binary data
print ('\n ----- Writing .bin file -----')
print (' - Writing these field_names : ', field_names)
for i in range(len(data_op[field_names[0]])): # scroll down the rows
for field_name in field_names:
data.append(data_op[field_name][i])
payload = bytes()
payload = payload.join((struct.pack('f', val) for val in data))
print (' - Writing to ', data_op_file)
with open(data_op_file, 'wb') as fp:
fp.write(payload)
if __name__ == "__main__":
data_pcds = ['data/1523643737.998116000.pcd']
for data_pcd in data_pcds:
data_op, fields_dict = pcd_read(data_pcd, verbose=1)
data_op_file = data_pcd.split('.pcd')[0] + '_modified.bin'
pcd_write_bin(data_op, fields_dict, data_op_file, fields_required=['x', 'y', 'z', 'intensity'],verbose=1)
if 0:
## Cross Check Data :
rows_count = 4
print ('\n ----- Original Data ----- : ', len(data_op['x']))
for i in range(rows_count):
print (data_op['x'][i], data_op['y'][i], data_op['z'][i], data_op['intensity'][i])
data_bin = np.fromfile(data_op_file, dtype=np.float32)
print ('\n ----- .bin data ----- : ', len(data_bin))
print (' - ', data_bin[:rows_count * rows_count])
import re
from struct import *
def read_pcd(filename):
with open(filename, 'rb') as fp:
data = fp.read()
data_header = ''
data_payload = ''
## Step1 : Get the header
matchObj = re.finditer(b'[\n]DATA\s(\S*)[\n]', data, flags = 0)
for group in matchObj:
data_header = data[:group.end(0)]
data_header = data_header.decode('utf-8').split('\n')
print (' - Header : \n', data_header) ##- Header : ['# .PCD v0.7 - Point Cloud Data file format', 'VERSION 0.7', 'FIELDS x y z rgba', 'SIZE 4 4 4 4', 'TYPE F F F U', 'COUNT 1 1 1 1', 'WIDTH 0', 'HEIGHT 0', 'VIEWPOINT 0 0 0 1 0 0 0', 'POINTS 212001', 'DATA binary', '']
# Step2 - Iterate through the payload
data_payload = data[group.end(0):]
prev_i = 0
count = 0
X,Y,Z,R,G,B, A = [], [], [], [], [], [], []
for i in range(4,200,4):
if count == 3:
count = -1
R.append(data_payload[i])
G.append(data_payload[i+1])
B.append(data_payload[i+2])
A.append(data_payload[i+3])
else:
if count == 0 : X.append(unpack('f', data_payload[prev_i:i]))
if count == 1 : Y.append(unpack('f', data_payload[prev_i:i]))
if count == 2 : Z.append(unpack('f', data_payload[prev_i:i]))
prev_i = i
count += 1
return data_header, [X,Y,Z,R,G,B,A]
if __name__ == "__main__":
filename = '../boxes2.pcd'
data_header, data_payload = read_pcd(filename)
"""
Topic : Visualize the KiTTI Dataset (LiDAR)
References
- LiDAR Dataset : www.cvlibs.net/datasets/kitti
- Data Download : www.cvlibs.net/datasets/kitti/raw_data.php
- Blog :
- Notes
- Tested on Python3.5
"""
import os
import h5py # pip3 install h5py
import pykitti # pip3 install pykitti
import vispy # pip3 install vispy
import vispy.scene
import numpy as np
## 1. Convert multiple .bin files to single .h5 format
def data_write_h5(url_hdf5, dataset, dataset_limit=1000):
with h5py.File(url_hdf5, "w") as hf:
for i, each in enumerate(dataset.velo):
if i % 10 == 0:
print (' - Velo File : ', i+1)
points = each[:,[0,1,2]]
colors = []
for pt in points:
colors.append((1, 1, 1, 1))
colors = np.array(colors)
hf.create_dataset('points_{0}'.format('%.3d' % (i)), data=points)
hf.create_dataset('colors_{0}'.format('%.3d' % (i)), data=colors)
if i > dataset_limit:
break
## 2. Plot data from a single .h5 file
def data_plot_h5(url_hdf5, point_size=3):
## Step1 - Read Data
points = []
colors = []
with h5py.File(url_hdf5, "r") as hf:
for name in hf:
if 'points' in name:
points.append(np.array(hf.get(name)))
if 'colors' in name:
colors.append(np.array(hf.get(name)))
## Step2 - Setup the canvas
canvas = vispy.scene.SceneCanvas(keys='interactive', show=True)
view = canvas.central_widget.add_view()
view.bgcolor = '#111111'
view.camera = ['perspective', 'panzoom', 'fly', 'arcball', 'base', 'turntable', None][2]
if 1:
view.camera.fov = 60
view.camera.scale_factor = 0.7
view.camera.keymap['Right'] = (1,5)
view.camera.keymap['Left'] = (-1,5)
view.camera.keymap['Up'] = (1,4)
view.camera.keymap['Down'] = (-1,4)
axis = vispy.scene.visuals.XYZAxis(parent=view.scene)
scatter = vispy.scene.visuals.Markers(parent=view.scene)
canvas.show()
## Step3 - Update function
def update(data):
matrix, colors = data
scatter.set_data(matrix
, edge_color=None, face_color=colors
, size=point_size)
## Step4 - Update canvas with first lidar frame
update([points[0], colors[t]])
# Step5 - Handling key events
@canvas.events.key_press.connect
def keypress(e):
global t
print(' - File Index : ', t)
if e._key.name == '=':
t = min(t + 1, len(points) - 1)
update([points[t], colors[t]])
elif e._key.name == '-':
t = max(t - 1, 0)
update([points[t], colors[t]])
else:
pass
# print (' - key : ', e._key.name)
if __name__ == "__main__":
## Define filenames
url_hdf5 = 'lidar_kitti_plot.h5'
data_write = 0
## First read the .bin files from a KITTI drive and convert to .h5 (data_write=1)
if data_write :
DATA_DIR = os.path.abspath(os.path.join(os.getcwd(), '../data'))
print (' - Data Dir : ', DATA_DIR)
basedir = DATA_DIR
date = '2011_09_26'
drive = '0001'
dataset = pykitti.raw(basedir, date, drive)#, frames=range(0, 50, 5))
print ('\n - Read KITTI Dataset')
data_write_h5(url_hdf5, dataset, dataset_limit=40)
## Then plot that 3D LiDAR data (data_write=0)
## You can use wasd and arrow keys to move around. (f,c)-> increase or decrease your z
## Note : Zooming does not currently work, so scrolling wont have an effect
else:
t = 0
data_plot_h5(url_hdf5)
vispy.app.run()
"""
Topic : Visualize the UMichigan Ford Dataset
References
- Umich Dataset : http://robots.engin.umich.edu/SoftwareData/Ford
- Data Download : http://robots.engin.umich.edu/uploads/SoftwareData/Ford/dataset-1-subset.tgz
- Blog :
- Notes
- Tested on Python3.5
"""
## Import Libraries
import os
import h5py # pip3 install h5py
import vispy # pip3 install vispy
import vispy.scene
import numpy as np
import scipy.io as sio
## 1. Read a matlab file
def get_data(filename_mat, verbose = 0):
mat_contents = sio.loadmat(filename_mat)
keys = mat_contents.keys()
data = mat_contents['SCAN']
a = data[0][0]
if verbose:
for each in a:
print (each.shape)
data = a[0].T
return data
## 2. Read a directory containing matlab files and store as hfd5
def write_datum_h5(dirname_mat, filename_h5, file_count_max = 100):
datum = []
file_count = 0
if os.path.exists(dirname_mat):
with h5py.File(filename_h5, 'w') as hf:
for file_id, file in enumerate(sorted(os.listdir(dirname_mat))):
file_tmp = os.path.join(dirname_mat, file)
print (' - File : ', file_tmp)
data = get_data(file_tmp)
hf.create_dataset('data_%.4d' % (file_id) , data=data)
file_count += 1
if file_count > file_count_max:
return 1
return 1
else:
return 0
## 3. Read a h5d5 file
def read_datum_h5(filename_h5):
datum = []
with h5py.File(filename_h5, 'r') as hf:
for node in hf:
print (' ->', node)
datum.append(np.array(hf.get(node)))
return datum
## Plot U.Mich LiDAR data saved as .h5 file
def plot_data(datum, point_size=3):
canvas = vispy.scene.SceneCanvas(keys='interactive', show=True)
view = canvas.central_widget.add_view()
view.bgcolor = '#ffffff'
view.bgcolor = '#111111'
view.camera = ['perspective', 'panzoom', 'fly', 'arcball', 'base', 'turntable', None][2]
if 1:
view.camera.fov = 60
view.camera.scale_factor = 0.7
view.camera.keymap['Right'] = (1,5)
view.camera.keymap['Left'] = (-1,5)
view.camera.keymap['Up'] = (1,4)
view.camera.keymap['Down'] = (-1,4)
axis = vispy.scene.visuals.XYZAxis(parent=view.scene)
scatter = vispy.scene.visuals.Markers(parent=view.scene)
scatter.set_data(datum[0], size=point_size)
def update(data):
scatter.set_data(data, size=point_size)
@canvas.events.key_press.connect
def keypress(e):
print (' - Event : ', e._key.name)
global t
print(' - File Index : ', t)
if e._key.name == '=':
t = min(t + 1, len(datum) - 1)
update(datum[t])
if e._key.name == '-':
t = max(t - 1, 0)
update(datum[t])
canvas.show()
if __name__ == '__main__':
## Define the directory containing the .mat files
dirname_umich_scans = './plot_vispy' #contains the .mat files
filename_h5 = 'data_umich.h5'; file_count_max = 1000
## First convert all .mat files into a .h5 file (data_write=1)
data_write = 1
if data_write:
write_datum_h5(dirname_umich_scans, filename_h5, file_count_max)
## Then plot that 3D LiDAR data (data_write=0)
## You can use wasd and arrow keys to move around. (f,c)-> increase or decrease your z
## Note : Zooming does not currently work, so scrolling wont have an effect
else:
t = 0
datum_3d = read_datum_h5(filename_h5)
plot_data(datum_3d, point_size=2)
vispy.app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.