Skip to content

Instantly share code, notes, and snippets.

@ximion
Created November 18, 2020 22:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ximion/8457efada8299e20a2d5cc792fe670a1 to your computer and use it in GitHub Desktop.
Save ximion/8457efada8299e20a2d5cc792fe670a1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# SPDX-License-Identifier: LGPL-3.0-or-later
'''
MKV to .mat conversion script.
Extracted from our bigger pipeline for Miniscope data analysis
using the MIN1PIPE pipeline by Lu et al.
@author: Matthias Klumpp
'''
import os
import sys
import argparse
import cv2 as cv
import numpy as np
import h5py
import logging as log
from datetime import datetime
from contextlib import contextmanager
from natsort import natsorted
@contextmanager
def open_hdf5_with_matlab_header(fname, **kwargs):
now = datetime.now()
# fake MATLAB HDF5 header string
s = 'MATLAB 7.3 MAT-file, Platform: unknown-any ' \
+ '0.1' + ', Created on: ' \
+ now.strftime('%a %b %d %H:%M:%S %Y') \
+ ' HDF5 schema 1.00 .'
# create HDF5 file template
hf = h5py.File(fname, mode='w', userblock_size=512)
hf.close()
hf = None
# Make the bytearray while padding with spaces up to
# 128-12 (the minus 12 is there since the last 12 bytes
# are special).
b = bytearray(s + (128 - 12 - len(s)) * ' ', encoding='utf-8')
# Add 8 nulls (0) and the magic number that MATLAB uses.
b.extend(bytearray.fromhex('00000000 00000000 0002494D'))
# write MATLAB userblock
with open(fname, 'r+b') as f:
f.write(b)
# reopen the file in append-mode, skipping userblock
hf = h5py.File(fname,
mode='a',
libver=('earliest', 'v110'),
**kwargs)
yield hf
hf.flush()
hf.close()
def videos_to_mat(vid_fnames, mat_fname):
''' Convert list of videos into a single MATLAB-compatible HDF5 file. '''
if not vid_fnames:
raise Exception('No videos to analyze have been passed.')
# get common video properties
vreader = cv.VideoCapture(vid_fnames[0])
if not vreader.isOpened():
raise Exception('Unable to read from video file!')
width = int(vreader.get(cv.CAP_PROP_FRAME_WIDTH))
height = int(vreader.get(cv.CAP_PROP_FRAME_HEIGHT))
fps = int(vreader.get(cv.CAP_PROP_FPS))
frames_total = 0
vreader.release()
# validate all videos
for v_fname in vid_fnames:
vreader = cv.VideoCapture(vid_fnames[0])
if not vreader.isOpened():
raise Exception('Unable to read from video file!')
if width != int(vreader.get(cv.CAP_PROP_FRAME_WIDTH)):
print('Video {} has invalid width (expected {}px)'.format(v_fname, width))
sys.exit(2)
if height != int(vreader.get(cv.CAP_PROP_FRAME_HEIGHT)):
print('Video {} has invalid height (expected {}px)'.format(v_fname, height))
sys.exit(2)
if fps != int(vreader.get(cv.CAP_PROP_FPS)):
print('Video {} has invalid framerate (expected {}fps)'.format(v_fname, fps))
sys.exit(2)
frames_total += int(vreader.get(cv.CAP_PROP_FRAME_COUNT))
vreader.release()
# read all videos
frames_all = np.zeros((frames_total, width, height), dtype=np.single)
cur_total_frame_n = 0
for v_fname in vid_fnames:
vreader = cv.VideoCapture(v_fname)
if not vreader.isOpened():
raise Exception('Unable to read from video file!')
fmt = int(vreader.get(cv.CAP_PROP_FORMAT))
expected_frames_n = int(vreader.get(cv.CAP_PROP_FRAME_COUNT))
log.info('Reading video: {}'.format(v_fname))
frame_n = 0
while True:
ret, mat = vreader.read()
if not ret:
break
cur_total_frame_n += 1
frame_n += 1
if frame_n > expected_frames_n:
raise Exception('Read more frames than the expected numer ({})'.format(expected_frames_n))
# we should already have an 8-bit grayscale image, but we convert it just in case
gray_mat = cv.cvtColor(mat, cv.COLOR_BGR2GRAY)
frames_all[cur_total_frame_n - 1, :, :] = gray_mat.T.astype(np.single)
vreader.release()
if expected_frames_n != frame_n:
raise Exception('Read an unexpected amount of frames (expected {}, got {}'.format(expected_frames_n, frame_n))
log.info('Video file contained {} frames @ {}fps {}x{}px, fmt-{}; now converted to 8bit gray type:single matrices'.format(frame_n, fps, width, height, fmt))
if frames_total != cur_total_frame_n:
raise Exception('Read an invalid amount of frames: {} instead of {}'.format(cur_total_frame_n, frames_total))
log.info('Total number of frames to process: {}'.format(frames_total))
with open_hdf5_with_matlab_header(mat_fname) as f:
log.info('Saving raw video data to MATLAB-compatible HDF5 file...')
# direct assignment is *much* faster than writing to the HDF5 file in chunks
f.create_dataset('frame_all', data=frames_all)
log.info('Video conversion for mmap done.')
return (width, height), fps
def main(options):
if not options.videos:
print('No video files set.')
sys.exit(1)
if not options.out_fname:
print('No destination for filename set.')
sys.exit(1)
videos_to_mat(natsorted(options.videos),
options.out_fname)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Convert MKV video to MATLAB .mat file')
parser.add_argument('-o', '--output', action='store', dest='out_fname',
help='Output filename.')
parser.add_argument('videos', action='store', nargs='+',
help='The Miniscope video files to analyze.')
optn = parser.parse_args(sys.argv[1:])
main(optn)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment