Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save sujoyu/ad36b2fef0f0e215d6ee1d093d925e63 to your computer and use it in GitHub Desktop.
Save sujoyu/ad36b2fef0f0e215d6ee1d093d925e63 to your computer and use it in GitHub Desktop.
fixed facerec_from_webcam_multiprocessing.py
import face_recognition
import cv2
from multiprocessing import Process, Manager, cpu_count
import time
import numpy
# This is a little bit complicated (but fast) example of running face recognition on live video from your webcam.
# This example is using multiprocess.
# PLEASE NOTE: This example requires OpenCV (the `cv2` library) to be installed only to read from your webcam.
# OpenCV is *not* required to use the face_recognition library. It's only required if you want to run this
# specific demo. If you have trouble installing it, try any of the other demos that don't require it instead.
# Get next worker's id
def next_id(current_id, Global):
if current_id == Global.worker_num:
return 1
else:
return current_id + 1
# Get previous worker's id
def prev_id(current_id, Global):
if current_id == 1:
return Global.worker_num
else:
return current_id - 1
# A subprocess use to capture frames.
def capture(read_frame_list, Global):
# Get a reference to webcam #0 (the default one)
video_capture = cv2.VideoCapture(0)
# video_capture.set(3, 640) # Width of the frames in the video stream.
# video_capture.set(4, 480) # Height of the frames in the video stream.
# video_capture.set(5, 30) # Frame rate.
print("Width: %d, Height: %d, FPS: %d" % (video_capture.get(3), video_capture.get(4), video_capture.get(5)))
while not Global.is_exit:
# If it's time to read a frame
if Global.buff_num != next_id(Global.read_num, Global):
# Grab a single frame of video
ret, frame = video_capture.read()
read_frame_list[Global.buff_num] = frame
Global.buff_num = next_id(Global.buff_num, Global)
else:
time.sleep(0.01)
# Release webcam
video_capture.release()
# Many subprocess use to process frames.
def process(worker_id, read_frame_list, write_frame_list, Global):
known_face_encodings = Global.known_face_encodings
known_face_names = Global.known_face_names
while not Global.is_exit:
# Wait to read
while Global.read_num != worker_id or Global.read_num != prev_id(Global.buff_num, Global):
time.sleep(0.01)
# Delay to make the video look smoother
time.sleep(Global.frame_delay)
# Read a single frame from frame list
frame_process = read_frame_list[worker_id]
# Expect next worker to read frame
Global.read_num = next_id(Global.read_num, Global)
# Convert the image from BGR color (which OpenCV uses) to RGB color (which face_recognition uses)
rgb_frame = frame_process[:, :, ::-1]
# Find all the faces and face encodings in the frame of video, cost most time
face_locations = face_recognition.face_locations(rgb_frame)
face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
# Loop through each face in this frame of video
for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
# See if the face is a match for the known face(s)
matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
name = "Unknown"
# If a match was found in known_face_encodings, just use the first one.
if True in matches:
first_match_index = matches.index(True)
name = known_face_names[first_match_index]
# Draw a box around the face
cv2.rectangle(frame_process, (left, top), (right, bottom), (0, 0, 255), 2)
# Draw a label with a name below the face
cv2.rectangle(frame_process, (left, bottom - 35), (right, bottom), (0, 0, 255), cv2.FILLED)
font = cv2.FONT_HERSHEY_DUPLEX
cv2.putText(frame_process, name, (left + 6, bottom - 6), font, 1.0, (255, 255, 255), 1)
# Wait to write
while Global.write_num != worker_id:
time.sleep(0.01)
# Send frame to global
write_frame_list[worker_id] = frame_process
# Expect next worker to write frame
Global.write_num = next_id(Global.write_num, Global)
if __name__ == '__main__':
# Global variables
Global = Manager().Namespace()
Global.buff_num = 1
Global.read_num = 1
Global.write_num = 1
Global.frame_delay = 0
Global.is_exit = False
read_frame_list = Manager().dict()
write_frame_list = Manager().dict()
# Number of workers (subprocess use to process frames)
worker_num = cpu_count()
Global.worker_num = worker_num
# Subprocess list
p = []
# Create a subprocess to capture frames
p.append(Process(target=capture, args=(read_frame_list, Global)))
p[0].start()
# Load a sample picture and learn how to recognize it.
obama_image = face_recognition.load_image_file("obama.jpg")
obama_face_encoding = face_recognition.face_encodings(obama_image)[0]
# Load a second sample picture and learn how to recognize it.
biden_image = face_recognition.load_image_file("biden.jpg")
biden_face_encoding = face_recognition.face_encodings(biden_image)[0]
# Create arrays of known face encodings and their names
Global.known_face_encodings = [
obama_face_encoding,
biden_face_encoding
]
Global.known_face_names = [
"Barack Obama",
"Joe Biden"
]
# Create workers
for worker_id in range(1, worker_num + 1):
p.append(Process(target=process, args=(worker_id, read_frame_list, write_frame_list, Global)))
p[worker_id].start()
# Start to show video
last_num = 1
fps_list = []
tmp_time = time.time()
while not Global.is_exit:
while Global.write_num != last_num:
last_num = int(Global.write_num)
# Calculate fps
delay = time.time() - tmp_time
tmp_time = time.time()
fps_list.append(delay)
if len(fps_list) > 5 * worker_num:
fps_list.pop(0)
fps = len(fps_list) / numpy.sum(fps_list)
print("fps: %.2f" % fps)
# Calculate frame delay, in order to make the video look smoother.
# When fps is higher, should use a smaller ratio, or fps will be limited in a lower value.
# Larger ratio can make the video look smoother, but fps will hard to become higher.
# Smaller ratio can make fps higher, but the video looks not too smoother.
# The ratios below are tested many times.
if fps < 6:
Global.frame_delay = (1 / fps) * 0.75
elif fps < 20:
Global.frame_delay = (1 / fps) * 0.5
elif fps < 30:
Global.frame_delay = (1 / fps) * 0.25
else:
Global.frame_delay = 0
# Display the resulting image
cv2.imshow('Video', write_frame_list[prev_id(Global.write_num, Global)])
# Hit 'q' on the keyboard to quit!
if cv2.waitKey(1) & 0xFF == ord('q'):
Global.is_exit = True
break
time.sleep(0.01)
# Quit
cv2.destroyAllWindows()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment