Skip to content

Instantly share code, notes, and snippets.

@SamCarlberg
Last active December 9, 2016 16:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SamCarlberg/91db7fc8a15377ca2a69553aae69e010 to your computer and use it in GitHub Desktop.
Save SamCarlberg/91db7fc8a15377ca2a69553aae69e010 to your computer and use it in GitHub Desktop.
import cv2
from vision_api import *
class StubPipeline(VisionPipeline):
def __init__(self):
self.output = None
def process(self, image: np.ndarray) -> None:
self.output = image # Sets the 'output'
class MyListener(PipelineListener):
def on_pipeline_ran(pipeline: StubPipeline) -> None:
print(pipeline.output) # Do something with the 'output'
if __name__ == '__main__'
# Set up OpenCV capture and the runner
cap = cv2.VideoCapture(0)
pipeline = StubPipeline()
listener = MyListener()
runner = VisionRunner(cap, pipeline, listener)
thread = VisionThread(runner)
thread.start()
import numpy as np
from threading import Thread
class VisionPipeline:
"""
A vision pipeline runs a series of OpenCV algorithms to extract data from an image.
This is intended to be subclassed; calling process() directly will throw an error
"""
def process(self, image: np.ndarray) -> None:
"""
Processes an image and extracts data from it
:param image: the image to process
:return: None
"""
raise NotImplementedError
class PipelineListener:
"""
A callback type that runs after a pipeline runs. This can be used to do additional
image processing if the pipeline doesn't do everything (e.g. GRIP-generated code).
This is intended to be subclassed; calling on_pipeline_ran directly will throw an error.
"""
def on_pipeline_ran(self, pipeline: VisionPipeline) -> None:
"""
Called when a pipeline finishes running
:param pipeline: the pipeline that ran
:return: None
"""
raise NotImplementedError
class VisionRunner:
"""
A vision runner is a convenient wrapper object to make it easy to run an OpenCV pipeline
from user code. The easiest way to use it is to run it in a VisionThread and use the listener
to take snapshots of the pipelines outputs.
"""
def __init__(self, cap, pipeline: VisionPipeline, listener: PipelineListener):
"""
:param cap: the OpenCV video capture to use to grab images from e.g. cv2.VideoCapture(...)
:param pipeline: the pipeline to run
:param listener: the listener to call after the pipeline runs
"""
self.__cap = cap
self.__pipeline = pipeline
self.__listener = listener
def run_once(self) -> None:
"""
Runs the pipeline once, or not at all if the OpenCV video capture didn't get an image.
:return: None
"""
got_frame, frame = self.__cap.read()
if got_frame:
self.__pipeline.process(frame)
self.__listener.on_pipeline_ran(self.__pipeline)
def run_forever(self) -> None:
"""
Continuously runs the pipeline by processing the most recent images from the OpenCV video capture.
:return: None (does not return unless there was an error)
"""
while True:
self.run_once()
class VisionThread(Thread):
"""
A vision thread is a special daemon thread that runs a pipeline.
"""
def __init__(self, runner: VisionRunner):
Thread.__init__(self, name="Vision Thread", target=runner.run_forever)
self.setDaemon(True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment