Skip to content

Instantly share code, notes, and snippets.

@azidanit
Created March 17, 2022 02:17
Show Gist options
  • Save azidanit/3a156d12514c3e2a92aec8bde5655164 to your computer and use it in GitHub Desktop.
Save azidanit/3a156d12514c3e2a92aec8bde5655164 to your computer and use it in GitHub Desktop.
class ThreadedCamera(object):
def __init__(self, src=0):
self.capture = cv2.VideoCapture(src)
self.capture.set(cv2.CAP_PROP_BUFFERSIZE, 2)
# FPS = 1/X
# X = desired FPS
self.FPS = 1/30
self.FPS_MS = int(self.FPS * 1000)
# Start frame retrieval thread
self.thread = Thread(target=self.update, args=())
self.thread.daemon = True
self.thread.start()
def update(self):
while True:
if self.capture.isOpened():
(self.status, self.frame) = self.capture.read()
time.sleep(self.FPS)
def show_frame(self):
cv2.imshow('frame', self.frame)
cv2.waitKey(self.FPS_MS)
if __name__ == '__main__':
src = 'https://videos3.earthcam.com/fecnetwork/9974.flv/chunklist_w1421640637.m3u8'
threaded_camera = ThreadedCamera(src)
while True:
try:
threaded_camera.show_frame()
except AttributeError:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment