Skip to content

Instantly share code, notes, and snippets.

@prerakmody
Last active September 15, 2020 14:19
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save prerakmody/110f80c3e1d99ac100c481d6428e3c75 to your computer and use it in GitHub Desktop.
Save prerakmody/110f80c3e1d99ac100c481d6428e3c75 to your computer and use it in GitHub Desktop.
import os
import re
import sys
import copy
import struct
import pprint
import traceback
def pcd_read(filename, verbose=0, verbose_steps=0):
# Refer this to unpack - https://docs.python.org/3/library/struct.html#format-characters
with open(filename, 'rb') as fp:
data_ip = fp.read()
data_op = {}
data_header = ''
data_payload = ''
matchObj = re.finditer(b'[\n]DATA\s(\S*)[\n]', data_ip, flags = 0)
for group in matchObj:
## Step1 : Get the header
data_header = data_ip[:group.end(0)]
data_header = data_header.decode('utf-8').split('\n')
field_names = data_header[2].split(' ')[1:]
field_size = data_header[3].split(' ')[1:]
field_type = data_header[4].split(' ')[1:]
data_count = int(data_header[9].split(' ')[1])
viewpoint = data_header[8]
data_type = data_header[10].split(' ')[1].lower()
if verbose:
print (' - field_names : ', field_names)
print (' - field_size : ', field_size)
print (' - field_type : ', field_type)
print (' - data_count : ', data_count, ' rows ')
print (' - viewpoint : ', viewpoint)
print (' - data_type : ', data_type)
## Step1.1 : Acumulate data on fields --> fields_dict
fields_dict = {}
for i, field_name in enumerate(field_names):
if int(field_size[i]) == 8:
fields_dict[i] = [str(field_name), int(field_size[i]), 'd']
elif int(field_size[i]) == 2:
fields_dict[i] = [str(field_name), int(field_size[i]), 'H']
else:
fields_dict[i] = [str(field_name), int(field_size[i]), str(field_type[i]).lower()]
if verbose :
print ('\n ----- Fields -----')
pprint.pprint(fields_dict, indent=4)
## Step2 - Iterate through the payload
data_payload = data_ip[group.end(0):]
data_count_tmp = 0
i = 0
for id_ in fields_dict:
data_op[fields_dict[id_][0]] = []
if verbose :
print ('\n ----- Output Skeleton -----')
pprint.pprint(data_op)
if data_type == 'ascii':
data_payload = data_payload.decode('utf-8')
rows = data_payload.split('\n')
while (i < len(rows)):
row_vals = rows[i].split(' ')
for id_ in fields_dict:
field_name = str(fields_dict[id_][0])
field_size = int(fields_dict[id_][1])
field_type = str(fields_dict[id_][2])
if field_type in ['f', 'd']:
tmp_data = float(row_vals[id_])
elif field_type == 'H':
tmp_data = int(row_vals[id_])
data_op[field_name].append(tmp_data)
i += 1
data_count_tmp += 1
if data_count_tmp == data_count:
break
elif data_type == 'binary':
while(i < len(data_payload)):
try:
for id_ in fields_dict:
field_name = str(fields_dict[id_][0])
field_size = int(fields_dict[id_][1])
field_type = str(fields_dict[id_][2])
tmp_data = data_payload[i:i+field_size]
if len(tmp_data) == field_size:
tmp_data = struct.unpack(field_type, tmp_data)[0]
elif len(tmp_data) == 0:
break
if verbose_steps :
print ('data_count_tmp : ',data_count_tmp+1, ' || i : ', i, ' || field : ', field_name,' || data : ', tmp_data)
i += field_size
data_op[field_name].append(tmp_data)
data_count_tmp += 1
if data_count_tmp == data_count:
break
except Exception as e:
print (' - Error : ', e)
traceback.print_exc()
sys.exit(1)
if verbose:
for id_ in fields_dict:
field_name = str(fields_dict[id_][0])
print ('\n ----- Output Count -----')
print (' - Field : ', field_name, ' || Data Count : ', len(data_op[field_name]))
break
return data_op, fields_dict
def pcd_write_bin(data_op, fields_dict, data_op_file, fields_required=[],verbose=0):
data = []
field_names = []
if len(fields_required):
for id_ in fields_dict:
field_name = fields_dict[id_][0]
if field_name not in fields_required:
data_op.pop(field_name)
else:
field_names.append(field_name)
else:
field_names = [fields_dict[id_][0] for id_ in fields_dict]
## Step2 - binary data
print ('\n ----- Writing .bin file -----')
print (' - Writing these field_names : ', field_names)
for i in range(len(data_op[field_names[0]])): # scroll down the rows
for field_name in field_names:
data.append(data_op[field_name][i])
payload = bytes()
payload = payload.join((struct.pack('f', val) for val in data))
print (' - Writing to ', data_op_file)
with open(data_op_file, 'wb') as fp:
fp.write(payload)
if __name__ == "__main__":
data_pcds = ['data/1523643737.998116000.pcd']
for data_pcd in data_pcds:
data_op, fields_dict = pcd_read(data_pcd, verbose=1)
data_op_file = data_pcd.split('.pcd')[0] + '_modified.bin'
pcd_write_bin(data_op, fields_dict, data_op_file, fields_required=['x', 'y', 'z', 'intensity'],verbose=1)
if 0:
## Cross Check Data :
rows_count = 4
print ('\n ----- Original Data ----- : ', len(data_op['x']))
for i in range(rows_count):
print (data_op['x'][i], data_op['y'][i], data_op['z'][i], data_op['intensity'][i])
data_bin = np.fromfile(data_op_file, dtype=np.float32)
print ('\n ----- .bin data ----- : ', len(data_bin))
print (' - ', data_bin[:rows_count * rows_count])
import re
from struct import *
def read_pcd(filename):
with open(filename, 'rb') as fp:
data = fp.read()
data_header = ''
data_payload = ''
## Step1 : Get the header
matchObj = re.finditer(b'[\n]DATA\s(\S*)[\n]', data, flags = 0)
for group in matchObj:
data_header = data[:group.end(0)]
data_header = data_header.decode('utf-8').split('\n')
print (' - Header : \n', data_header) ##- Header : ['# .PCD v0.7 - Point Cloud Data file format', 'VERSION 0.7', 'FIELDS x y z rgba', 'SIZE 4 4 4 4', 'TYPE F F F U', 'COUNT 1 1 1 1', 'WIDTH 0', 'HEIGHT 0', 'VIEWPOINT 0 0 0 1 0 0 0', 'POINTS 212001', 'DATA binary', '']
# Step2 - Iterate through the payload
data_payload = data[group.end(0):]
prev_i = 0
count = 0
X,Y,Z,R,G,B, A = [], [], [], [], [], [], []
for i in range(4,200,4):
if count == 3:
count = -1
R.append(data_payload[i])
G.append(data_payload[i+1])
B.append(data_payload[i+2])
A.append(data_payload[i+3])
else:
if count == 0 : X.append(unpack('f', data_payload[prev_i:i]))
if count == 1 : Y.append(unpack('f', data_payload[prev_i:i]))
if count == 2 : Z.append(unpack('f', data_payload[prev_i:i]))
prev_i = i
count += 1
return data_header, [X,Y,Z,R,G,B,A]
if __name__ == "__main__":
filename = '../boxes2.pcd'
data_header, data_payload = read_pcd(filename)
"""
Topic : Visualize the KiTTI Dataset (LiDAR)
References
- LiDAR Dataset : www.cvlibs.net/datasets/kitti
- Data Download : www.cvlibs.net/datasets/kitti/raw_data.php
- Blog :
- Notes
- Tested on Python3.5
"""
import os
import h5py # pip3 install h5py
import pykitti # pip3 install pykitti
import vispy # pip3 install vispy
import vispy.scene
import numpy as np
## 1. Convert multiple .bin files to single .h5 format
def data_write_h5(url_hdf5, dataset, dataset_limit=1000):
with h5py.File(url_hdf5, "w") as hf:
for i, each in enumerate(dataset.velo):
if i % 10 == 0:
print (' - Velo File : ', i+1)
points = each[:,[0,1,2]]
colors = []
for pt in points:
colors.append((1, 1, 1, 1))
colors = np.array(colors)
hf.create_dataset('points_{0}'.format('%.3d' % (i)), data=points)
hf.create_dataset('colors_{0}'.format('%.3d' % (i)), data=colors)
if i > dataset_limit:
break
## 2. Plot data from a single .h5 file
def data_plot_h5(url_hdf5, point_size=3):
## Step1 - Read Data
points = []
colors = []
with h5py.File(url_hdf5, "r") as hf:
for name in hf:
if 'points' in name:
points.append(np.array(hf.get(name)))
if 'colors' in name:
colors.append(np.array(hf.get(name)))
## Step2 - Setup the canvas
canvas = vispy.scene.SceneCanvas(keys='interactive', show=True)
view = canvas.central_widget.add_view()
view.bgcolor = '#111111'
view.camera = ['perspective', 'panzoom', 'fly', 'arcball', 'base', 'turntable', None][2]
if 1:
view.camera.fov = 60
view.camera.scale_factor = 0.7
view.camera.keymap['Right'] = (1,5)
view.camera.keymap['Left'] = (-1,5)
view.camera.keymap['Up'] = (1,4)
view.camera.keymap['Down'] = (-1,4)
axis = vispy.scene.visuals.XYZAxis(parent=view.scene)
scatter = vispy.scene.visuals.Markers(parent=view.scene)
canvas.show()
## Step3 - Update function
def update(data):
matrix, colors = data
scatter.set_data(matrix
, edge_color=None, face_color=colors
, size=point_size)
## Step4 - Update canvas with first lidar frame
update([points[0], colors[t]])
# Step5 - Handling key events
@canvas.events.key_press.connect
def keypress(e):
global t
print(' - File Index : ', t)
if e._key.name == '=':
t = min(t + 1, len(points) - 1)
update([points[t], colors[t]])
elif e._key.name == '-':
t = max(t - 1, 0)
update([points[t], colors[t]])
else:
pass
# print (' - key : ', e._key.name)
if __name__ == "__main__":
## Define filenames
url_hdf5 = 'lidar_kitti_plot.h5'
data_write = 0
## First read the .bin files from a KITTI drive and convert to .h5 (data_write=1)
if data_write :
DATA_DIR = os.path.abspath(os.path.join(os.getcwd(), '../data'))
print (' - Data Dir : ', DATA_DIR)
basedir = DATA_DIR
date = '2011_09_26'
drive = '0001'
dataset = pykitti.raw(basedir, date, drive)#, frames=range(0, 50, 5))
print ('\n - Read KITTI Dataset')
data_write_h5(url_hdf5, dataset, dataset_limit=40)
## Then plot that 3D LiDAR data (data_write=0)
## You can use wasd and arrow keys to move around. (f,c)-> increase or decrease your z
## Note : Zooming does not currently work, so scrolling wont have an effect
else:
t = 0
data_plot_h5(url_hdf5)
vispy.app.run()
"""
Topic : Visualize the UMichigan Ford Dataset
References
- Umich Dataset : http://robots.engin.umich.edu/SoftwareData/Ford
- Data Download : http://robots.engin.umich.edu/uploads/SoftwareData/Ford/dataset-1-subset.tgz
- Blog :
- Notes
- Tested on Python3.5
"""
## Import Libraries
import os
import h5py # pip3 install h5py
import vispy # pip3 install vispy
import vispy.scene
import numpy as np
import scipy.io as sio
## 1. Read a matlab file
def get_data(filename_mat, verbose = 0):
mat_contents = sio.loadmat(filename_mat)
keys = mat_contents.keys()
data = mat_contents['SCAN']
a = data[0][0]
if verbose:
for each in a:
print (each.shape)
data = a[0].T
return data
## 2. Read a directory containing matlab files and store as hfd5
def write_datum_h5(dirname_mat, filename_h5, file_count_max = 100):
datum = []
file_count = 0
if os.path.exists(dirname_mat):
with h5py.File(filename_h5, 'w') as hf:
for file_id, file in enumerate(sorted(os.listdir(dirname_mat))):
file_tmp = os.path.join(dirname_mat, file)
print (' - File : ', file_tmp)
data = get_data(file_tmp)
hf.create_dataset('data_%.4d' % (file_id) , data=data)
file_count += 1
if file_count > file_count_max:
return 1
return 1
else:
return 0
## 3. Read a h5d5 file
def read_datum_h5(filename_h5):
datum = []
with h5py.File(filename_h5, 'r') as hf:
for node in hf:
print (' ->', node)
datum.append(np.array(hf.get(node)))
return datum
## Plot U.Mich LiDAR data saved as .h5 file
def plot_data(datum, point_size=3):
canvas = vispy.scene.SceneCanvas(keys='interactive', show=True)
view = canvas.central_widget.add_view()
view.bgcolor = '#ffffff'
view.bgcolor = '#111111'
view.camera = ['perspective', 'panzoom', 'fly', 'arcball', 'base', 'turntable', None][2]
if 1:
view.camera.fov = 60
view.camera.scale_factor = 0.7
view.camera.keymap['Right'] = (1,5)
view.camera.keymap['Left'] = (-1,5)
view.camera.keymap['Up'] = (1,4)
view.camera.keymap['Down'] = (-1,4)
axis = vispy.scene.visuals.XYZAxis(parent=view.scene)
scatter = vispy.scene.visuals.Markers(parent=view.scene)
scatter.set_data(datum[0], size=point_size)
def update(data):
scatter.set_data(data, size=point_size)
@canvas.events.key_press.connect
def keypress(e):
print (' - Event : ', e._key.name)
global t
print(' - File Index : ', t)
if e._key.name == '=':
t = min(t + 1, len(datum) - 1)
update(datum[t])
if e._key.name == '-':
t = max(t - 1, 0)
update(datum[t])
canvas.show()
if __name__ == '__main__':
## Define the directory containing the .mat files
dirname_umich_scans = './plot_vispy' #contains the .mat files
filename_h5 = 'data_umich.h5'; file_count_max = 1000
## First convert all .mat files into a .h5 file (data_write=1)
data_write = 1
if data_write:
write_datum_h5(dirname_umich_scans, filename_h5, file_count_max)
## Then plot that 3D LiDAR data (data_write=0)
## You can use wasd and arrow keys to move around. (f,c)-> increase or decrease your z
## Note : Zooming does not currently work, so scrolling wont have an effect
else:
t = 0
datum_3d = read_datum_h5(filename_h5)
plot_data(datum_3d, point_size=2)
vispy.app.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment