Skip to content

Instantly share code, notes, and snippets.

Last active January 9, 2022 17:15
Show Gist options
  • Save CarlosGS/f39b7ba1a79da9fc6a9ad4a4238946c5 to your computer and use it in GitHub Desktop.
Save CarlosGS/f39b7ba1a79da9fc6a9ad4a4238946c5 to your computer and use it in GitHub Desktop.
Connect to an MJPG stream and compute the general direction of motion (like optic flow, but lighter). The script first crops and downsamples the JPG frames using the JPEGtran library, to allow processing in low power platforms like the Raspberry Pi.
# File:
# Description: Connect to an MJPG stream and compute the
# general direction of motion.
# The script first crops and downsamples the
# JPEG frames using the JPEGtran library,
# to allow processing in low power platforms
# like the Raspberry Pi.
# Author: Carlos Garcia-Saura (@CarlosGS) - 2018
# Modified by: Author (email) - year
import time
import math
import requests
import cv2
import numpy as np
from jpegtran import JPEGImage
# Fetch a raw JPG frame from an MJPEG stream
def grab_frame(url="http://localhost:8090/?action=snapshot"):
blob = requests.get(url).content
return blob
def round_down(num, divisor):
return int(num - (num%divisor))
# Processes and loads a JPG frame, cropping and scaling down with the specified parameters
def process_frame(blob, zoom=4, newSize=100, quality=95):
img = JPEGImage(blob=blob) # Load the JPG image with the JPEGtran library
# Compute the target dimensions
halfW = img.width/2
halfH = img.height/2
side = int(img.width/zoom)
halfSide = side/2
# Compute the starting XY coordinates
startW = round_down(halfW-halfSide, 16) # jpegtran requires indexing in multiples of 16
startH = round_down(halfH-halfSide, 16)
# Apply the transformation directly to the JPG data
data = img.crop(startW,startH, side,side).downscale(newSize,newSize, quality).data
# Convert the JPG data into the OpenCV format
nparr = np.frombuffer(data, np.uint8)
frame = cv2.imdecode(nparr, 1)
#frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # Optional: convert to grayscale
return frame
# Grab reference frame (A)
A = grab_frame()
A = process_frame(A)
cv2.imshow('A', A)
while True:
# Grab latest frame (B)
B = grab_frame()
B = process_frame(B)
# Compute the transformation matrix
warp_matrix = cv2.estimateRigidTransform(A, B, fullAffine=False)
if warp_matrix is not None:
# Extract the motion parameters
scale = warp_matrix[0,0]-1
rotation = math.asin(warp_matrix[0,1])
tx = warp_matrix[0,2]
ty = warp_matrix[1,2]
# Useful for debugging
cv2.imshow('B', B) # Show the latest frame
Bt = cv2.warpAffine(A, warp_matrix, (A.shape[1],A.shape[0]))
cv2.imshow('Bt', Bt) # Show the transformed image (the "A" frame transformed to fit the latest frame)
print("Couldn't match the two images!")
# Also useful for debugging
key = cv2.waitKey(250) & 0xFF
if key == ord('q'): break
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment