-
-
Save crackwitz/15c3910f243a42dcd9d4a40fcdb24e40 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3 | |
''' | |
always getting the most recent frame of a camera | |
================================================ | |
Usage: | |
------ | |
freshest_camera_frame.py | |
Keys: | |
----- | |
ESC - exit | |
''' | |
# Python 2/3 compatibility | |
from __future__ import print_function | |
import os | |
import sys | |
import time | |
import threading | |
import numpy as np | |
import cv2 as cv | |
# also acts (partly) like a cv.VideoCapture | |
class FreshestFrame(threading.Thread): | |
def __init__(self, capture, name='FreshestFrame'): | |
self.capture = capture | |
assert self.capture.isOpened() | |
# this lets the read() method block until there's a new frame | |
self.cond = threading.Condition() | |
# this allows us to stop the thread gracefully | |
self.running = False | |
# keeping the newest frame around | |
self.frame = None | |
# passing a sequence number allows read() to NOT block | |
# if the currently available one is exactly the one you ask for | |
self.latestnum = 0 | |
# this is just for demo purposes | |
self.callback = None | |
super().__init__(name=name) | |
self.start() | |
def start(self): | |
self.running = True | |
super().start() | |
def release(self, timeout=None): | |
self.running = False | |
self.join(timeout=timeout) | |
self.capture.release() | |
def run(self): | |
counter = 0 | |
while self.running: | |
# block for fresh frame | |
(rv, img) = self.capture.read() | |
assert rv | |
counter += 1 | |
# publish the frame | |
with self.cond: # lock the condition for this operation | |
self.frame = img if rv else None | |
self.latestnum = counter | |
self.cond.notify_all() | |
if self.callback: | |
self.callback(img) | |
def read(self, wait=True, seqnumber=None, timeout=None): | |
# with no arguments (wait=True), it always blocks for a fresh frame | |
# with wait=False it returns the current frame immediately (polling) | |
# with a seqnumber, it blocks until that frame is available (or no wait at all) | |
# with timeout argument, may return an earlier frame; | |
# may even be (0,None) if nothing received yet | |
with self.cond: | |
if wait: | |
if seqnumber is None: | |
seqnumber = self.latestnum+1 | |
if seqnumber < 1: | |
seqnumber = 1 | |
rv = self.cond.wait_for(lambda: self.latestnum >= seqnumber, timeout=timeout) | |
if not rv: | |
return (self.latestnum, self.frame) | |
return (self.latestnum, self.frame) | |
def main(): | |
# these windows belong to the main thread | |
cv.namedWindow("frame") | |
# on win32, imshow from another thread to this DOES work | |
cv.namedWindow("realtime") | |
# open some camera | |
cap = cv.VideoCapture(0) | |
cap.set(cv.CAP_PROP_FPS, 30) | |
# wrap it | |
fresh = FreshestFrame(cap) | |
# a way to watch the camera unthrottled | |
def callback(img): | |
cv.imshow("realtime", img) | |
# main thread owns windows, does waitkey | |
fresh.callback = callback | |
# main loop | |
# get freshest frame, but never the same one twice (cnt increases) | |
# see read() for details | |
cnt = 0 | |
while True: | |
# test that this really takes NO time | |
# (if it does, the camera is actually slower than this loop and we have to wait!) | |
t0 = time.perf_counter() | |
cnt,img = fresh.read(seqnumber=cnt+1) | |
dt = time.perf_counter() - t0 | |
if dt > 0.010: # 10 milliseconds | |
print("NOTICE: read() took {dt:.3f} secs".format(dt=dt)) | |
# let's pretend we need some time to process this frame | |
print("processing {cnt}...".format(cnt=cnt), end=" ", flush=True) | |
cv.imshow("frame", img) | |
# this keeps both imshow windows updated during the wait (in particular the "realtime" one) | |
key = cv.waitKey(200) | |
if key == 27: | |
break | |
print("done!") | |
fresh.release() | |
cv.destroyWindow("frame") | |
cv.destroyWindow("realtime") | |
if __name__ == '__main__': | |
main() |
Thank you very much for making this available. I could successfully implement this for using with 60 FPS with my Baumer industrial camera while also recording frame and TTL inputs timestamps.
However, my camera API actually gives the most recent frame by default and keeps a buffer of max 10 images. Also, I have to get a nparray from this image before it can be displayed and saved with openCV.
So I am wondering if I show use the code like I did (pasting it below), or if I should move the GetNPArray function, video write and all the np array saving etc to a different part of the code.
# Class module for instantiating one or more camera objects as a standalone
# script or by importing on other scripts. Frame acquisition, display and
# saving to file is done in their own threads. Camera also saves frame capture
# and TTL inputs (Line 1) timestamps numpy array on binary format.
import os
import sys
import time
import threading
import numpy as np
from datetime import datetime
import cv2 as cv
from PyQt5.QtWidgets import QMessageBox
import neoapi
def screenText(string, line, frame, size):
cv.putText(img=frame, text=string,
fontFace=cv.FONT_HERSHEY_SIMPLEX,
fontScale=size,
org=(12, 18 + (line - 1) * 19),
color=(0, 0, 0),
thickness=2,
bottomLeftOrigin=False)
cv.putText(img=frame, text=string,
fontFace=cv.FONT_HERSHEY_SIMPLEX,
fontScale=size,
org=(12, 18 + (line - 1) * 19),
color=(255, 0, 0),
thickness=1,
bottomLeftOrigin=False)
def showDialog(text, title, info, icon):
msgBox = QMessageBox()
msgBox.setIcon(icon)
msgBox.setText(text)
msgBox.setInformativeText(info)
msgBox.setWindowTitle(title)
msgBox.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel)
return msgBox.exec()
# also acts (partly) like a cv.VideoCapture
class FreshestFrame(threading.Thread):
def __init__(self, cam, name='FreshestFrame'):
self.capture = cam
assert self.capture.IsConnected()
# this lets the read() method block until there's a new frame
self.cond = threading.Condition()
# this allows us to stop the thread gracefully
self.running = False
# keeping the newest frame around
self.frame = None
# passing a sequence number allows read() to NOT block
# if the currently available one is exactly the one you ask for
self.latestnum = 0
super().__init__(name=name)
self.start()
def start(self):
self.running = True
super().start()
def release(self, timeout=None):
self.running = False
self.join(timeout=timeout)
self.capture.Disconnect()
def run(self):
counter = 0
while self.running:
# block for fresh frame
img = self.capture.GetImage()
rv = False if img.IsEmpty() else True
assert rv
counter += 1
# publish the frame
with self.cond: # lock the condition for this operation
self.frame = img if rv else None
self.latestnum = counter
self.cond.notify_all()
def read(self, wait=True, seqnumber=None, timeout=None):
# with no arguments (wait=True), it always blocks for a fresh frame
# with wait=False it returns the current frame immediately (polling)
# with a seqnumber, it blocks until that frame is available (or no wait at all)
# with timeout argument, may return an earlier frame;
# may even be (0,None) if nothing received yet
with self.cond:
if wait:
if seqnumber is None:
seqnumber = self.latestnum+1
if seqnumber < 1:
seqnumber = 1
rv = self.cond.wait_for(lambda: self.latestnum >= seqnumber, timeout=timeout)
if not rv:
return (self.latestnum, self.frame)
return (self.latestnum, self.frame)
def main():
## Create variables, arrays and filename strings
camera = neoapi.Cam()
isColor = False
fps = 60
width = 720
height = 540
offx = 0
offy = 0
wd_title = 'Camera acquisition'
wd_title_rt = 'Camera acquisition - RT'
txtsize = 0.6
sub = 'NC-1'
task = 'self-initiated-running'
exp = 'November_pilot'
data_path = 'C:\\Users\\system-\\Downloads\\pyControl\\data'
exp_path = os.path.join(data_path, exp)
frame_ts = []
sync_ts = []
early = True
## Connect camera
while True:
try:
camera.Connect()
break
except:
button = showDialog(text="Camera is not connected!",
info="Connect and press OK to try again.",
title="Error",
icon=QMessageBox.Warning)
while True:
if button == QMessageBox.Ok:
break
continue
## Define acquisition modes
camera.f.AcquisitionFrameRateEnable.value = True
camera.f.AcquisitionMode.SetString('Continuous')
camera.f.ExposureAuto.SetString('Off')
camera.f.ExposureMode.SetString('Timed')
## Set camera features
camera.f.ExposureTime.value = 15000
camera.f.Gain.value = 2
camera.f.Width.value = width
camera.f.Height.value = height
camera.f.OffsetX.value = offx
camera.f.OffsetY.value = offy
camera.f.AcquisitionFrameRate.value = fps
camera.f.PixelFormat.SetString('Mono8')
## Configure Digital I/O parameters
### Line 1, used for receiving synchronization pulses
camera.f.LineSelector.SetString("Line1")
camera.f.LineMode.SetString("Input")
camera.f.LineInverter.value = False
camera.f.LineDebouncerHighTimeAbs.value = 1
camera.f.LineDebouncerLowTimeAbs.value = 1
# Create event handler
class SyncPulseEventCallback():
def sync_event_callback(self, event):
sync_ts.append(event.GetTimestamp())
## Set-up event parameters
sync_callback = SyncPulseEventCallback()
camera.EnableEventCallback(sync_callback.sync_event_callback, "Line1RisingEdge")
## Enabling synchronization and frame events
camera.ClearEvents()
camera.EnableEvent("Line1RisingEdge")
begin_ts = datetime.now()
file_ts = begin_ts.strftime("%Y-%m-%d_%H-%M-%S")
video_fn = sub + "-side-vid-" + file_ts + ".avi"
video_fp = os.path.join(exp_path, video_fn)
frame_fp = video_fp.replace('avi','npy').replace('vid','frame')
sync_fp = frame_fp.replace('frame','sync')
video = cv.VideoWriter(video_fp,
cv.VideoWriter_fourcc(*'XVID'),
fps,
(width, height),
isColor)
# these windows belong to the main thread
cv.namedWindow(wd_title)
cv.resizeWindow(wd_title, width, height)
# wrap it
fresh = FreshestFrame(camera)
# main thread owns windows, does waitkey
# main loop
# get freshest frame, but never the same one twice (cnt increases)
# see read() for details
cnt = 0
while True:
# test that this really takes NO time
# (if it does, the camera is actually slower than this loop and we have to wait!)
t0 = time.perf_counter()
cnt,frame = fresh.read(seqnumber=cnt+1)
dt = time.perf_counter() - t0
if dt > 0.010: # 10 milliseconds
print("NOTICE: read() took {dt:.3f} secs".format(dt=dt))
print("processing {cnt}...".format(cnt=cnt), end=" ", flush=True)
frame_ts.append(frame.GetTimestamp())
frame = frame.GetNPArray()
copy = frame.copy()
video.write(frame)
record_delta = datetime.now() - begin_ts
record_string = '0' + str(record_delta).split('.', 2)[0]
screenText('Recording {} {} {}'.format(exp, sub, task), 1, copy, txtsize)
screenText(record_string, 2, copy, txtsize)
if early:
if record_delta.total_seconds() < 5:
screenText('Checking synchronization input', 3, copy, txtsize)
elif 5 < record_delta.total_seconds() < 10 and len(sync_ts) > 0:
screenText('Synchronization OK! {} pulses received'.format(len(sync_ts)), 3, copy, txtsize)
elif 5 < record_delta.total_seconds() < 10 and len(sync_ts) == 0:
screenText('Sync input absent!', 3, copy, txtsize)
elif record_delta.total_seconds() > 10:
early = False
np.save(frame_fp,frame_ts)
np.save(sync_fp,sync_ts)
cv.imshow("Camera acquisition", copy)
# this keeps both imshow windows updated during the wait (in particular the "realtime" one)
key = cv.waitKey(1)
if key == 27:
break
fresh.release()
video.release()
cv.destroyWindow("Camera acquisition")
if __name__ == '__main__':
main()
it keep freezing, not responding :/
I got the same problem. Are there any solutions? I found that variable "img" is not empty or None, so why imshow cant show it?
Have you tried to integrate these codes into the yolo object detection model?
it keep freezing, not responding :/