Skip to content

Instantly share code, notes, and snippets.

@crackwitz
Last active February 22, 2024 08:21
Show Gist options
  • Star 37 You must be signed in to star a gist
  • Fork 11 You must be signed in to fork a gist
  • Save crackwitz/15c3910f243a42dcd9d4a40fcdb24e40 to your computer and use it in GitHub Desktop.
Save crackwitz/15c3910f243a42dcd9d4a40fcdb24e40 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
'''
always getting the most recent frame of a camera
================================================
Usage:
------
freshest_camera_frame.py
Keys:
-----
ESC - exit
'''
# Python 2/3 compatibility
from __future__ import print_function
import os
import sys
import time
import threading
import numpy as np
import cv2 as cv
# also acts (partly) like a cv.VideoCapture
class FreshestFrame(threading.Thread):
def __init__(self, capture, name='FreshestFrame'):
self.capture = capture
assert self.capture.isOpened()
# this lets the read() method block until there's a new frame
self.cond = threading.Condition()
# this allows us to stop the thread gracefully
self.running = False
# keeping the newest frame around
self.frame = None
# passing a sequence number allows read() to NOT block
# if the currently available one is exactly the one you ask for
self.latestnum = 0
# this is just for demo purposes
self.callback = None
super().__init__(name=name)
self.start()
def start(self):
self.running = True
super().start()
def release(self, timeout=None):
self.running = False
self.join(timeout=timeout)
self.capture.release()
def run(self):
counter = 0
while self.running:
# block for fresh frame
(rv, img) = self.capture.read()
assert rv
counter += 1
# publish the frame
with self.cond: # lock the condition for this operation
self.frame = img if rv else None
self.latestnum = counter
self.cond.notify_all()
if self.callback:
self.callback(img)
def read(self, wait=True, seqnumber=None, timeout=None):
# with no arguments (wait=True), it always blocks for a fresh frame
# with wait=False it returns the current frame immediately (polling)
# with a seqnumber, it blocks until that frame is available (or no wait at all)
# with timeout argument, may return an earlier frame;
# may even be (0,None) if nothing received yet
with self.cond:
if wait:
if seqnumber is None:
seqnumber = self.latestnum+1
if seqnumber < 1:
seqnumber = 1
rv = self.cond.wait_for(lambda: self.latestnum >= seqnumber, timeout=timeout)
if not rv:
return (self.latestnum, self.frame)
return (self.latestnum, self.frame)
def main():
# these windows belong to the main thread
cv.namedWindow("frame")
# on win32, imshow from another thread to this DOES work
cv.namedWindow("realtime")
# open some camera
cap = cv.VideoCapture(0)
cap.set(cv.CAP_PROP_FPS, 30)
# wrap it
fresh = FreshestFrame(cap)
# a way to watch the camera unthrottled
def callback(img):
cv.imshow("realtime", img)
# main thread owns windows, does waitkey
fresh.callback = callback
# main loop
# get freshest frame, but never the same one twice (cnt increases)
# see read() for details
cnt = 0
while True:
# test that this really takes NO time
# (if it does, the camera is actually slower than this loop and we have to wait!)
t0 = time.perf_counter()
cnt,img = fresh.read(seqnumber=cnt+1)
dt = time.perf_counter() - t0
if dt > 0.010: # 10 milliseconds
print("NOTICE: read() took {dt:.3f} secs".format(dt=dt))
# let's pretend we need some time to process this frame
print("processing {cnt}...".format(cnt=cnt), end=" ", flush=True)
cv.imshow("frame", img)
# this keeps both imshow windows updated during the wait (in particular the "realtime" one)
key = cv.waitKey(200)
if key == 27:
break
print("done!")
fresh.release()
cv.destroyWindow("frame")
cv.destroyWindow("realtime")
if __name__ == '__main__':
main()
@dzulfiamien
Copy link

thanks for this solution, it helps me a lot!

@ahgsql
Copy link

ahgsql commented May 9, 2022

it keep freezing, not responding :/

@jonpedros
Copy link

Thank you very much for making this available. I could successfully implement this for using with 60 FPS with my Baumer industrial camera while also recording frame and TTL inputs timestamps.

However, my camera API actually gives the most recent frame by default and keeps a buffer of max 10 images. Also, I have to get a nparray from this image before it can be displayed and saved with openCV.

So I am wondering if I show use the code like I did (pasting it below), or if I should move the GetNPArray function, video write and all the np array saving etc to a different part of the code.

# Class module for instantiating one or more camera objects as a standalone
# script or by importing on other scripts. Frame acquisition, display and
# saving to file is done in their own threads. Camera also saves frame capture
# and TTL inputs (Line 1) timestamps numpy array on binary format.

import os
import sys
import time
import threading
import numpy as np
from datetime import datetime
import cv2 as cv
from PyQt5.QtWidgets import QMessageBox
import neoapi

def screenText(string, line, frame, size):
    cv.putText(img=frame, text=string,
                fontFace=cv.FONT_HERSHEY_SIMPLEX,
                fontScale=size,
                org=(12, 18 + (line - 1) * 19), 
                color=(0, 0, 0),
                thickness=2,
                bottomLeftOrigin=False)
    cv.putText(img=frame, text=string,
                fontFace=cv.FONT_HERSHEY_SIMPLEX,
                fontScale=size,
                org=(12, 18 + (line - 1) * 19), 
                color=(255, 0, 0),
                thickness=1, 
                bottomLeftOrigin=False)
    
def showDialog(text, title, info, icon):
    msgBox = QMessageBox()
    msgBox.setIcon(icon)
    msgBox.setText(text)
    msgBox.setInformativeText(info)
    msgBox.setWindowTitle(title)
    msgBox.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel)
    return msgBox.exec()

# also acts (partly) like a cv.VideoCapture
class FreshestFrame(threading.Thread):

    def __init__(self, cam, name='FreshestFrame'):
        self.capture = cam
        assert self.capture.IsConnected()

        # this lets the read() method block until there's a new frame
        self.cond = threading.Condition()

        # this allows us to stop the thread gracefully
        self.running = False

        # keeping the newest frame around
        self.frame = None

        # passing a sequence number allows read() to NOT block
        # if the currently available one is exactly the one you ask for
        self.latestnum = 0

        super().__init__(name=name)
        self.start()

    def start(self):
        self.running = True
        super().start()

    def release(self, timeout=None):
        self.running = False
        self.join(timeout=timeout)
        self.capture.Disconnect()

    def run(self):
        counter = 0
        while self.running:
            # block for fresh frame
            img = self.capture.GetImage()
            rv = False if img.IsEmpty() else True
            assert rv
            counter += 1

            # publish the frame
            with self.cond: # lock the condition for this operation
                self.frame = img if rv else None
                self.latestnum = counter
                self.cond.notify_all()

    def read(self, wait=True, seqnumber=None, timeout=None):
        # with no arguments (wait=True), it always blocks for a fresh frame
        # with wait=False it returns the current frame immediately (polling)
        # with a seqnumber, it blocks until that frame is available (or no wait at all)
        # with timeout argument, may return an earlier frame;
        #   may even be (0,None) if nothing received yet

        with self.cond:
            if wait:
                if seqnumber is None:
                    seqnumber = self.latestnum+1
                if seqnumber < 1:
                    seqnumber = 1
                
                rv = self.cond.wait_for(lambda: self.latestnum >= seqnumber, timeout=timeout)
                if not rv:
                    return (self.latestnum, self.frame)

            return (self.latestnum, self.frame)

def main():
    ## Create variables, arrays and filename strings
    camera = neoapi.Cam()
    isColor = False
    fps = 60
    width = 720
    height = 540
    offx = 0
    offy = 0

    wd_title = 'Camera acquisition'
    wd_title_rt = 'Camera acquisition - RT'
    txtsize = 0.6

    sub = 'NC-1'
    task = 'self-initiated-running'
    exp = 'November_pilot'
    
    data_path = 'C:\\Users\\system-\\Downloads\\pyControl\\data'
    exp_path = os.path.join(data_path, exp)
    
    frame_ts = []
    sync_ts = []
    early = True

    ## Connect camera
    while True:
        try:
            camera.Connect()
            break
        except:
            button = showDialog(text="Camera is not connected!",
                                info="Connect and press OK to try again.",
                                title="Error",
                                icon=QMessageBox.Warning)
            while True:
                if button == QMessageBox.Ok:
                    break
            continue

    ## Define acquisition modes
    camera.f.AcquisitionFrameRateEnable.value = True
    camera.f.AcquisitionMode.SetString('Continuous')
    camera.f.ExposureAuto.SetString('Off')
    camera.f.ExposureMode.SetString('Timed')

    ## Set camera features
    camera.f.ExposureTime.value = 15000
    camera.f.Gain.value = 2
    camera.f.Width.value = width
    camera.f.Height.value = height
    camera.f.OffsetX.value = offx
    camera.f.OffsetY.value = offy
    camera.f.AcquisitionFrameRate.value = fps
    camera.f.PixelFormat.SetString('Mono8')

    ## Configure Digital I/O parameters
    ### Line 1, used for receiving synchronization pulses
    camera.f.LineSelector.SetString("Line1")
    camera.f.LineMode.SetString("Input")
    camera.f.LineInverter.value = False
    camera.f.LineDebouncerHighTimeAbs.value = 1
    camera.f.LineDebouncerLowTimeAbs.value = 1

    # Create event handler
    class SyncPulseEventCallback():
        def sync_event_callback(self, event):
            sync_ts.append(event.GetTimestamp())
    
    ## Set-up event parameters
    sync_callback = SyncPulseEventCallback()
    camera.EnableEventCallback(sync_callback.sync_event_callback, "Line1RisingEdge")

    ## Enabling synchronization and frame events
    camera.ClearEvents()
    camera.EnableEvent("Line1RisingEdge")

    begin_ts = datetime.now()
    file_ts = begin_ts.strftime("%Y-%m-%d_%H-%M-%S")
    video_fn = sub + "-side-vid-" + file_ts + ".avi"
    video_fp = os.path.join(exp_path, video_fn)
    frame_fp = video_fp.replace('avi','npy').replace('vid','frame')
    sync_fp = frame_fp.replace('frame','sync')
    video = cv.VideoWriter(video_fp,
                           cv.VideoWriter_fourcc(*'XVID'), 
                           fps,
                           (width, height), 
                           isColor)
    
    # these windows belong to the main thread
    cv.namedWindow(wd_title)
    cv.resizeWindow(wd_title, width, height)

    # wrap it
    fresh = FreshestFrame(camera)

	# main thread owns windows, does waitkey

    # main loop
    # get freshest frame, but never the same one twice (cnt increases)
    # see read() for details
    cnt = 0
    while True:
        # test that this really takes NO time
        # (if it does, the camera is actually slower than this loop and we have to wait!)
        t0 = time.perf_counter()
        cnt,frame = fresh.read(seqnumber=cnt+1)
        dt = time.perf_counter() - t0
        if dt > 0.010: # 10 milliseconds
            print("NOTICE: read() took {dt:.3f} secs".format(dt=dt))
        print("processing {cnt}...".format(cnt=cnt), end=" ", flush=True)
        frame_ts.append(frame.GetTimestamp())
        frame = frame.GetNPArray()
        copy = frame.copy()
        video.write(frame)
        record_delta = datetime.now() - begin_ts
        record_string = '0' + str(record_delta).split('.', 2)[0]
        screenText('Recording   {}   {}   {}'.format(exp, sub, task), 1, copy, txtsize)
        screenText(record_string, 2, copy, txtsize)
        if early:
            if record_delta.total_seconds() < 5:
                screenText('Checking synchronization input', 3, copy, txtsize)
            elif 5 < record_delta.total_seconds() < 10 and len(sync_ts) > 0:
                screenText('Synchronization OK! {} pulses received'.format(len(sync_ts)), 3, copy, txtsize)
            elif 5 < record_delta.total_seconds() < 10 and len(sync_ts) == 0:
                screenText('Sync input absent!', 3, copy, txtsize)
            elif record_delta.total_seconds() > 10:
                early = False
        np.save(frame_fp,frame_ts)
        np.save(sync_fp,sync_ts)

        cv.imshow("Camera acquisition", copy)
        # this keeps both imshow windows updated during the wait (in particular the "realtime" one)
        key = cv.waitKey(1)
        if key == 27:
            break

    fresh.release()
    video.release()

    cv.destroyWindow("Camera acquisition")

if __name__ == '__main__':
	main()

@pietras125
Copy link

it keep freezing, not responding :/

I got the same problem. Are there any solutions? I found that variable "img" is not empty or None, so why imshow cant show it?

@suleymanVR
Copy link

Have you tried to integrate these codes into the yolo object detection model?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment