Skip to content

Instantly share code, notes, and snippets.

@eqs
Created September 27, 2019 15:48
Show Gist options
  • Save eqs/5c15c467273682b30e6f9857b519a2ae to your computer and use it in GitHub Desktop.
Save eqs/5c15c467273682b30e6f9857b519a2ae to your computer and use it in GitHub Desktop.
Video Stream
# -*- coding: utf-8 -*-
"""
Created on 09/28/19 00:28:12
@author: Satoshi Murashige
"""
import time
import numpy as np
import cv2
from video_stream import VideoStream
video = VideoStream('m3v1mp4.mp4')
video.start()
time.sleep(1.0)
while video.more():
frame = video.read()
cv2.imshow('frame', frame)
key = cv2.waitKey(1)
if key == ord('q'):
break
video.release()
# -*- coding: utf-8 -*-
"""
Created on 09/28/19 00:20:10
@author: Satoshi Murashige
"""
import cv2
from threading import Thread
from queue import Queue
class VideoStream:
def __init__(self, video, queue_size=1024):
self.stream = cv2.VideoCapture(video)
self.stopped = False
self.queue = Queue(maxsize=queue_size)
def start(self):
t = Thread(target=self.update, args=())
t.daemon = True
t.start()
return self
def update(self):
while True:
# If the thread indicator var is set, stop the thread
if self.stopped:
return
# Ensure the queue has room in it
if not self.queue.full():
# Read the next frame
ok, frame = self.stream.read()
# If `ok` is `False`, the we have reached the end of stream
if not ok:
self.release()
return
# Add the frame to the queue
self.queue.put(frame)
def read(self):
# Return next frame in the queue
return self.queue.get()
def more(self):
return self.queue.qsize() > 0
def release(self):
# Indicate that the thread should be stopped
self.stopped = True
# -*- coding: utf-8 -*-
"""
Created on 09/28/19 00:28:12
@author: Satoshi Murashige
"""
import time
import numpy as np
import cv2
from video_stream import VideoStream
video = VideoStream('m3v1mp4.mp4')
video.start()
time.sleep(1.0)
while video.more():
frame = video.read()
cv2.imshow('frame', frame)
key = cv2.waitKey(1)
if key == ord('q'):
break
video.release()
# -*- coding: utf-8 -*-
"""
Created on 09/28/19 00:20:10
@author: Satoshi Murashige
"""
import cv2
from threading import Thread
from queue import Queue
class VideoStream:
def __init__(self, video, queue_size=1024):
self.stream = cv2.VideoCapture(video)
self.stopped = False
self.queue = Queue(maxsize=queue_size)
def start(self):
t = Thread(target=self.update, args=())
t.daemon = True
t.start()
return self
def update(self):
while True:
# If the thread indicator var is set, stop the thread
if self.stopped:
return
# Ensure the queue has room in it
if not self.queue.full():
# Read the next frame
ok, frame = self.stream.read()
# If `ok` is `False`, the we have reached the end of stream
if not ok:
self.release()
return
# Add the frame to the queue
self.queue.put(frame)
def read(self):
# Return next frame in the queue
return self.queue.get()
def more(self):
return self.queue.qsize() > 0
def release(self):
# Indicate that the thread should be stopped
self.stopped = True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment