Skip to content

Instantly share code, notes, and snippets.

@oto313
Created September 7, 2023 15:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oto313/a0d5144ba59a025193f70ceb13ade4f3 to your computer and use it in GitHub Desktop.
Save oto313/a0d5144ba59a025193f70ceb13ade4f3 to your computer and use it in GitHub Desktop.
undistort.py
#!/usr/bin/env python3
import cv2
import depthai as dai
import numpy as np
import subprocess as sp
from os import name as osName
camRes = dai.ColorCameraProperties.SensorResolution.THE_12_MP
camSocket = dai.CameraBoardSocket.CAM_A
ispScale = (1, 1)
width = 2016
height = 1520
command = [
"ffplay",
"-i", "-",
"-x", str(width),
"-y", str(height),
"-framerate", "60",
"-fflags", "nobuffer",
"-flags", "low_delay",
"-framedrop",
"-strict", "experimental"
]
if osName == "nt": # Running on Windows
command = ["cmd", "/c"] + command
def getMesh(calibData, ispSize):
M1 = np.array([[4191.476074,0.000000,1083.740967], [0.000000,4195.143066,807.994873], [0.000000,0.000000,1.000000]])
d1 = np.array([-5.403945446014404,32.75205612182617,0.0007517668418586254,0.0002807812998071313,-8.510465621948242,-5.083094596862793,31.99566650390625,-6.316490650177002,0.0,0.0,0.0,0.0,0.0,0.0])
R1 = np.identity(3)
mapX, mapY = cv2.initUndistortRectifyMap(M1, d1, R1, M1, ispSize, cv2.CV_32FC1)
meshCellSize = 16
mesh0 = []
# Creates subsampled mesh which will be loaded on to device to undistort the image
for y in range(mapX.shape[0] + 1): # iterating over height of the image
if y % meshCellSize == 0:
rowLeft = []
for x in range(mapX.shape[1]): # iterating over width of the image
if x % meshCellSize == 0:
if y == mapX.shape[0] and x == mapX.shape[1]:
rowLeft.append(mapX[y - 1, x - 1])
rowLeft.append(mapY[y - 1, x - 1])
elif y == mapX.shape[0]:
rowLeft.append(mapX[y - 1, x])
rowLeft.append(mapY[y - 1, x])
elif x == mapX.shape[1]:
rowLeft.append(mapX[y, x - 1])
rowLeft.append(mapY[y, x - 1])
else:
rowLeft.append(mapX[y, x])
rowLeft.append(mapY[y, x])
if (mapX.shape[1] % meshCellSize) % 2 != 0:
rowLeft.append(0)
rowLeft.append(0)
mesh0.append(rowLeft)
mesh0 = np.array(mesh0)
meshWidth = mesh0.shape[1] // 2
meshHeight = mesh0.shape[0]
mesh0.resize(meshWidth * meshHeight, 2)
mesh = list(map(tuple, mesh0))
return mesh, meshWidth, meshHeight
def create_pipeline(calibData):
pipeline = dai.Pipeline()
cam = pipeline.create(dai.node.ColorCamera)
cam.setIspScale(ispScale)
cam.setBoardSocket(camSocket)
cam.setResolution(camRes)
manip = pipeline.create(dai.node.ImageManip)
videoEncoder = pipeline.create(dai.node.VideoEncoder)
videoEncoder.setDefaultProfilePreset(30, dai.VideoEncoderProperties.Profile.H264_MAIN)
manip.initialConfig.setResize(width, height)
mesh, meshWidth, meshHeight = getMesh(calibData, [manip.initialConfig.getResizeWidth(), manip.initialConfig.getResizeHeight()])
manip.setWarpMesh(mesh, meshWidth, meshHeight)
manip.setMaxOutputFrameSize(cam.getIspWidth() * cam.getIspHeight() * 3 // 2)
manip.setFrameType(dai.RawImgFrame.Type.NV12)
cam.isp.link(manip.inputImage)
cam_xout = pipeline.create(dai.node.XLinkOut)
cam_xout.setStreamName("Undistorted")
manip.out.link(videoEncoder.input)
videoEncoder.bitstream.link(cam_xout.input)
return pipeline
with dai.Device() as device:
calibData = device.readCalibration2()
pipeline = create_pipeline(calibData)
device.startPipeline(pipeline)
queue = device.getOutputQueue('Undistorted', 4, False)
try:
proc = sp.Popen(command, stdin=sp.PIPE) # Start the ffplay process
except:
exit("Error: cannot run ffplay!\nTry running: sudo apt install ffmpeg")
while True:
vidFrames = queue.tryGetAll()
for pkt in vidFrames:
data = pkt.getData()
proc.stdin.write(data)
if cv2.waitKey(1) == ord('q'):
break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment