Skip to content

Instantly share code, notes, and snippets.

@crackwitz
Last active August 14, 2019 17:36
Show Gist options
  • Save crackwitz/5f48cd37d36d26e42c595383757ec224 to your computer and use it in GitHub Desktop.
Save crackwitz/5f48cd37d36d26e42c595383757ec224 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import os
import sys
import time
import numpy as np
import cv2 as cv
import threading
# also acts (partly) like a cv.VideoCapture
class FreshestFrame(threading.Thread):
def __init__(self, capture, name='FreshestFrame'):
self.capture = capture
assert self.capture.isOpened()
# this lets the read() method block until there's a new frame
self.cond = threading.Condition()
# this allows us to stop the thread gracefully
self.running = False
# keeping the newest frame around
self.frame = None
# passing a sequence number allows read() to NOT block
# if the currently available one is exactly the one you ask for
self.latestnum = 0
# this is just for demo purposes
self.callback = None
super().__init__(name=name)
self.start()
def start(self):
self.running = True
super().start()
def release(self, timeout=None):
self.running = False
self.join(timeout=timeout)
self.capture.release()
def run(self):
counter = 0
while self.running:
# block for fresh frame
(rv, img) = self.capture.read()
assert rv
counter += 1
# publish the frame
with self.cond: # lock the condition for this operation
self.frame = img if rv else None
self.latestnum = counter
self.cond.notify_all()
if self.callback:
self.callback(img)
def read(self, wait=True, seqnumber=None, timeout=None):
# with no arguments (wait=True), it always blocks for a fresh frame
# with wait=False it returns the current frame immediately (polling)
# with a seqnumber, it blocks until that frame is available (if it even needs to wait)
# with timeout argument, may return an earlier frame;
# may even be (0,None) if nothing received yet
with self.cond:
if wait:
if seqnumber is None:
seqnumber = self.latestnum+1
if seqnumber < 1:
seqnumber = 1
rv = self.cond.wait_for(lambda: self.latestnum >= seqnumber, timeout=timeout)
if not rv:
return (self.latestnum, self.frame)
return (self.latestnum, self.frame)
class Timecode(threading.Thread):
def __init__(self, name='Timecode', delay=10):
self.delay = delay
self.timecode = None
self.shape = (10,10) # 64 bits, 8x8 + white border
# TODO: orientation marker? upper 8 bits blank?
super().__init__(name=name)
self.name = name
self.start()
def _model(self):
tc_model = np.float32([
[0,0], # top left
[0,1], # bottom left, appears to be the order of findContours
[1,1],
[1,0],
])
tc_model *= self.shape
return tc_model
def _pattern(self):
# black quiet zone
# white rectangle
# inner part
# ....
# ..
nbits = 64
nrows = 8
ncols = nbits // nrows
timestamp = int(time.time() * 1e3)
code = timestamp
code = linear2gray(timestamp)
code_bits = tobits(code, nbits=nbits)
code_bits = np.uint8(code_bits) * 255
code_bits.shape = (nrows, ncols)
im = np.zeros((4+nrows, 4 + nbits//nrows), dtype=np.uint8)
im[1:-1, 1:-1] = 255 # white rectangle
im[2:-2, 2:-2] = code_bits
return (timestamp, im)
def start(self):
self.running = True
super().start()
def release(self, timeout=None):
self.running = False
self.join(timeout=timeout)
def run(self):
cv.namedWindow(self.name, cv.WINDOW_NORMAL)
while self.running:
# show it now, so it's least jittery (synchronize to camera)
tc_value, tc_pattern = self._pattern()
self.timecode = tc_value
tc_resized = upscale(drawscale, tc_pattern)
cv.imshow(self.name, tc_resized)
key = cv.waitKey(self.delay)
if key == 27:
self.running = False
def linear2gray(num):
return num ^ (num >> 1)
def gray2linear(num, nbits=64):
while nbits >= 2:
nbits //= 2
num ^= num >> nbits
return num
def tobits(num, nbits=64):
assert num >= 0
assert num.bit_length() <= nbits
result = [(num >> k) & 1 for k in range(nbits)]
# little endian, index = position
return result
def frombits(bits):
return sum(bool(d) << p for p,d in enumerate(bits))
def upscale(factor, im):
h,w = im.shape[:2]
return cv.resize(im, (w*factor, h*factor), interpolation=cv.INTER_NEAREST)
def simplify_contour(c):
length = cv.arcLength(c, True)
approx = cv.approxPolyDP(c, length * 0.01, True)
# length changed noticeably?
if length > 10 and abs(cv.arcLength(approx, True) / length - 1) > 0.1:
return None
return approx
def refine_corners(contour, image):
#return contour
contour = cv.cornerSubPix(image,
contour.astype(np.float32),
(5,5),
(1,1),
criteria=(cv.TERM_CRITERIA_COUNT*0 | cv.TERM_CRITERIA_EPS, 10, 0.1))
return contour
def contour_sense(contour):
# sum angles. positive -> clockwise
# cross product of successive vectors
contour = contour.reshape((-1, 2)).astype(np.float32)
vectors = np.roll(contour, -1, axis=0) - contour
vectors /= np.linalg.norm(vectors, axis=1).reshape((-1, 1))
crossed = np.arcsin(np.cross(vectors, np.roll(vectors, -1, axis=0)))
return crossed.sum()
def find_quads(monochrome, mask, minarea=100**2):
(contours, _) = cv.findContours(mask, cv.RETR_LIST, cv.CHAIN_APPROX_SIMPLE)
contours = [(c,simplify_contour(c)) for c in contours]
contours = [(c,s) for c,s in contours if s is not None and len(s) == 4]
contours = [(c,s) for c,s in contours if contour_sense(s) < -np.pi]
contours = [(c,refine_corners(s, monochrome)) for c,s in contours]
contours = [(c,s) for c,s in contours if cv.contourArea(s) >= minarea]
contours.sort(key=lambda c: -cv.contourArea(c[1]))
return contours
largest = max(contours, key=lambda c: cv.contourArea(c))
return largest
def rotate_topleft(contour):
distances = np.linalg.norm(contour, axis=(1,2))
shift = np.argmin(distances)
return np.vstack([
contour[shift:],
contour[:shift]
])
def fixn(value, shift):
factor = 1<<shift
if isinstance(value, (tuple, list)):
return type(value)(int(round(v * factor)) for v in value)
elif isinstance(value, np.ndarray):
return tuple(np.round(value * factor).astype(np.int))
elif isinstance(value, (int, float)):
return int(round(value * factor))
ft2 = cv.freetype.createFreeType2()
ft2.loadFontData("C:\\Windows\\Fonts\\times.ttf", 0)
#ft2.loadFontData("C:\\Windows\\Fonts\\consola.ttf", 0)
def centeredText(im, text, origin, fontScale, color, thickness, background=None, *args, **kwargs):
fontFace = cv.FONT_HERSHEY_SIMPLEX
((w,h), baseLine) = cv.getTextSize(text, fontFace, fontScale, thickness)
ox,oy = origin
if background is not None:
cv.rectangle(im,
fixn((ox - w/2 - 10, oy - h/2 - 10, w+20, h+20), 4),
color=background,
thickness=cv.FILLED, lineType=cv.LINE_AA, shift=4)
cv.putText(im,
text,
fixn((ox - w/2, oy + h/2), 0),
fontFace, fontScale, color, thickness)
#ft2.putText(im,
# text,
# (ox - w//2, oy + h//2),
# fontHeight=fontHeight,
# color=color,
# thickness=-1,
# line_type=cv.LINE_AA,
# bottomLeftOrigin=False)
pass
drawscale = 32
timecode = Timecode()
tc_model = timecode._model()
tch,tcw = timecode.shape
cap = cv.VideoCapture(int(sys.argv[1]) if len(sys.argv) >= 2 else 0)
cap.set(cv.CAP_PROP_FPS, 30)
capw = int(cap.get(cv.CAP_PROP_FRAME_WIDTH))
caph = int(cap.get(cv.CAP_PROP_FRAME_HEIGHT))
cap = FreshestFrame(cap)
cv.namedWindow("camera", cv.WINDOW_NORMAL) # resizable
cv.resizeWindow("camera", capw, caph)
maxwidgets = 2
alpha = 0.02
meanval = [0.0] * maxwidgets
meanerr = [0.0] * maxwidgets
while True:
if not timecode.running: break
if not cap.running: break
rv,im = cap.read()
frameh,framew = im.shape[:2]
now = time.time()
if not rv: break
# image analysis...
monochrome = cv.cvtColor(im, cv.COLOR_BGR2GRAY)
th_effective, mask = cv.threshold(monochrome, 0, 255, cv.THRESH_BINARY | cv.THRESH_OTSU)
cv.imshow("thresholded", mask)
quads = find_quads(monochrome, mask)[:maxwidgets]
print(f"{len(quads)} quads")
for index,(rawquad,quad) in enumerate(quads):
quad = rotate_topleft(quad)
quadf = np.float32(quad)
quadi = np.int0(quad)
for p,q in zip(quad, np.roll(quad, -1, axis=0)):
cv.line(im,
fixn(p[0], 4), fixn(q[0], 4),
color=(255,0,255),
thickness=2,
lineType=cv.LINE_AA,
shift=4
)
#cv.drawContours(im, [quadi], -1, (255,0,255), 1, lineType=cv.LINE_AA)
#cv.drawContours(im, [quad], -1, (255,0,255), 2)
cv.circle(im, fixn(quadf[0,0], 4), fixn(10, 4), (255, 255, 0), thickness=2, lineType=cv.LINE_AA, shift=4)
cv.circle(im, fixn(quadf[1,0], 4), fixn(10, 4), (0, 255, 255), thickness=2, lineType=cv.LINE_AA, shift=4)
H = cv.getPerspectiveTransform(quadf, tc_model * drawscale)
#print(H)
if False:
straight = cv.warpPerspective(mask, H, (tcw*drawscale, tch*drawscale))
cv.imshow(f"straight {index}", straight)
#cv.perspectiveTransform
rx = np.arange(tcw) + 0.5
ry = np.arange(tch) + 0.5
(mgx, mgy) = np.meshgrid(rx * drawscale, ry * drawscale)
coords = np.dstack([mgx, mgy])
Hinv = np.linalg.inv(H)
sample_coords = cv.perspectiveTransform(coords.reshape((-1, 1, 2)).copy(), Hinv).astype(np.float32)
sample_coords.shape = (-1, 2)
for coord in sample_coords:
cv.circle(im, fixn(coord, 4), fixn(2, 4), (0,0,255), thickness=1, lineType=cv.LINE_AA, shift=4)
sx = sample_coords[:,0].astype(np.int)
sy = sample_coords[:,1].astype(np.int)
if (0 <= sx).all() and (sx < framew).all() and (0 <= sy).all() and (sy < frameh).all():
sampled = (mask[sy,sx] > 0)
sampled.shape = (tch, tcw)
bits = sampled[1:-1, 1:-1].flatten()
codevalue = frombits(bits)
codevalue = gray2linear(codevalue)
timestamp = codevalue * 1e-3
delta = timestamp - now
err = abs(delta - meanval[index])
meanval[index] += (delta - meanval[index]) * alpha
meanerr[index] += (err - meanerr[index]) * alpha
if err > 1.0:
meanval[index] = meanerr[index] = 0
print(f"#{index}: {timestamp:.3f} s, delta {delta:+.3f} s")
print(f"mean {meanval[index]:.3f}, mean err {meanerr[index]:.3f}")
centeredText(im,
f"{meanval[index]:.2f}s",
quad.mean(axis=(0,1)),
1.5, (255,255,255), 3, background=(0, 201, 106))
print()
cv.imshow("camera", im)
k = cv.waitKey(1)
if k == 27:
break
timecode.release()
cap.release()
cv.destroyAllWindows()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment