Skip to content

Instantly share code, notes, and snippets.

@helena-intel
Last active April 28, 2021 12:52
Show Gist options
  • Save helena-intel/f862a27e29398f5961fd663d43bd7e6c to your computer and use it in GitHub Desktop.
Save helena-intel/f862a27e29398f5961fd663d43bd7e6c to your computer and use it in GitHub Desktop.
204-vision-worker-safety
Display the source blob
Display the rendered blob
Raw
{"cells": [{"cell_type": "markdown", "metadata": {}, "source": "<a id=\"top\"></a>\n# Safety Gear Detection Sample Application"}, {"id": "0369b1cb", "cell_type": "markdown", "source": "## Preparation\n\nInstall the requirements and download the files that are necessary for running this notebook.\n\n**NOTE:** installation may take a while. It is recommended to restart the Jupyter kernel after installing the packages. Choose *Kernel->Restart Kernel* in Jupyter Notebook or Lab, or *Runtime->Restart runtime* in Google Colab.", "metadata": {}}, {"id": "2696281f", "cell_type": "code", "metadata": {}, "execution_count": null, "source": "# Install or upgrade required Python packages. Install specific versions of some packages to ensure compatibility.\n!pip install openvino-dev matplotlib opencv-python-headless==4.2.0.32 numpy==1.17.3 ipython", "outputs": []}, {"id": "3a79c6ad", "cell_type": "code", "metadata": {}, "execution_count": null, "source": "# Download image and model files\nimport os\nimport pip\nimport urllib.parse\nimport urllib.request\nfrom pathlib import Path\n\nurls = ['https://raw.githubusercontent.com/helena-intel/openvino_notebooks/safety_gear/notebooks/204-vision-worker-safety/labels.txt', 'https://raw.githubusercontent.com/helena-intel/openvino_notebooks/safety_gear/notebooks/204-vision-worker-safety/models/mobilenet-ssd.bin', 'https://raw.githubusercontent.com/helena-intel/openvino_notebooks/safety_gear/notebooks/204-vision-worker-safety/models/mobilenet-ssd.xml', 'https://raw.githubusercontent.com/helena-intel/openvino_notebooks/safety_gear/notebooks/204-vision-worker-safety/media/worker_zone_detection_small.mp4', 'https://raw.githubusercontent.com/helena-intel/openvino_notebooks/safety_gear/notebooks/204-vision-worker-safety/media/safety-gear-image.jpg']\n\nfor url in urls:\n save_path = Path(url).relative_to(fr\"https:/raw.githubusercontent.com/helena-intel/openvino_notebooks/safety_gear/notebooks/204-vision-worker-safety\")\n os.makedirs(save_path.parent, exist_ok=True)\n safe_url = urllib.parse.quote(url, safe=\":/\")\n\n urllib.request.urlretrieve(safe_url, save_path.as_posix())", "outputs": []}, {"cell_type": "markdown", "metadata": {}, "source": "## Introduction\n\nThis sample application demonstrates how a smart video IoT solution may be created using Intel\u00ae hardware and software tools to perform safety gear detection. This solution detects any number of objects within a video frame looking specifically for people, safety vests, and hardhats. "}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "import colorsys\nimport os\nimport random\nimport time\nimport urllib\nfrom pathlib import Path\n\nimport cv2\nimport matplotlib.pyplot as plt\nimport numpy as np\nfrom IPython.display import (\n HTML,\n FileLink,\n Pretty,\n ProgressBar,\n Video,\n clear_output,\n display,\n)\nfrom openvino.inference_engine import IECore"}, {"cell_type": "markdown", "metadata": {"id": "contained-office"}, "source": "### Settings"}, {"cell_type": "code", "execution_count": null, "metadata": {"id": "amber-lithuania", "tags": []}, "outputs": [], "source": "DEVICE = \"CPU\"\nMODEL_FILE = \"models/mobilenet-ssd.xml\"\nMODEL_FILE_PERSON = \"models/person-detection-retail-0013.xml\"\nLABELS_FILE = \"labels.txt\"\nmodel_name = os.path.basename(MODEL_FILE)\nmodel_name_person = os.path.basename(MODEL_FILE_PERSON)\nmodel_xml_path = Path(MODEL_FILE).with_suffix(\".xml\")\nmodel_xml_path_person = Path(MODEL_FILE_PERSON).with_suffix(\".xml\")"}, {"cell_type": "markdown", "metadata": {}, "source": "### Functions"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "def load_image(path: str):\n \"\"\"\n Loads an image from `path` and returns it as BGR numpy array. `path`\n should point to an image file, either a local filename or an url.\n \"\"\"\n if path.startswith(\"http\"):\n # Set User-Agent to Mozilla because some websites block\n # requests with User-Agent Python\n request = urllib.request.Request(\n path, headers={\"User-Agent\": \"Mozilla/5.0\"}\n )\n response = urllib.request.urlopen(request)\n array = np.asarray(bytearray(response.read()), dtype=\"uint8\")\n image = cv2.imdecode(array, -1) # Loads the image as BGR\n else:\n image = cv2.imread(path)\n return image"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "class ColorPalette:\n def __init__(self, n, rng=None):\n assert n > 0\n\n if rng is None:\n rng = random.Random(0xACE)\n\n candidates_num = 100\n hsv_colors = [(1.0, 1.0, 1.0)]\n for _ in range(1, n):\n colors_candidates = [\n (rng.random(), rng.uniform(0.8, 1.0), rng.uniform(0.5, 1.0))\n for _ in range(candidates_num)\n ]\n min_distances = [\n self.min_distance(hsv_colors, c) for c in colors_candidates\n ]\n arg_max = np.argmax(min_distances)\n hsv_colors.append(colors_candidates[arg_max])\n\n self.palette = [self.hsv2rgb(*hsv) for hsv in hsv_colors]\n\n @staticmethod\n def dist(c1, c2):\n dh = min(abs(c1[0] - c2[0]), 1 - abs(c1[0] - c2[0])) * 2\n ds = abs(c1[1] - c2[1])\n dv = abs(c1[2] - c2[2])\n return dh * dh + ds * ds + dv * dv\n\n @classmethod\n def min_distance(cls, colors_set, color_candidate):\n distances = [cls.dist(o, color_candidate) for o in colors_set]\n return np.min(distances)\n\n @staticmethod\n def hsv2rgb(h, s, v):\n return tuple(round(c * 255) for c in colorsys.hsv_to_rgb(h, s, v))\n\n def __getitem__(self, n):\n return self.palette[n % len(self.palette)]\n\n def __len__(self):\n return len(self.palette)"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "def convert_result_to_image(resized_image, result, labeldict):\n inf_results = result[0][0]\n colors = ((255, 0, 0), (0, 255, 0), (0, 0, 255), (0, 0, 255))\n\n resized_image_rgb = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)\n\n for number, proposal in enumerate(inf_results):\n if proposal[2] > 0.5:\n ih, iw = resized_image.shape[:-1]\n label = np.int(proposal[1])\n labelname = labeldict[label]\n\n xmin = np.int(iw * proposal[3])\n ymin = max(10, np.int(ih * proposal[4]))\n xmax = np.int(iw * proposal[5])\n ymax = np.int(ih * proposal[6])\n\n resized_image_rgb = cv2.rectangle(\n resized_image_rgb,\n (xmin, ymin),\n (xmax, ymax),\n colors[label - 1],\n 3,\n )\n cv2.putText(\n resized_image_rgb,\n f\"{labelname} {proposal[2]:.2f}\",\n (xmin, ymin - 10),\n cv2.FONT_HERSHEY_SIMPLEX,\n 0.8,\n colors[label - 1],\n 1,\n cv2.LINE_AA,\n )\n\n result_image_rgb = cv2.resize(resized_image_rgb, (image.shape[:2][::-1]))\n return result_image_rgb"}, {"cell_type": "markdown", "metadata": {"id": "sensitive-wagner"}, "source": "## Load model and get model information\n\nLoad the model in Inference Engine with `ie.read_network` and load it to the specified device with `ie.load_network`"}, {"cell_type": "code", "execution_count": null, "metadata": {"id": "complete-brother", "tags": []}, "outputs": [], "source": "ie = IECore()\nnet = ie.read_network(\n str(model_xml_path),\n str(model_xml_path.with_suffix(\".bin\")),\n)\n\nexec_net = ie.load_network(network=net, device_name=DEVICE)\n\ninput_key = list(exec_net.input_info)[0]\noutput_key = list(exec_net.outputs.keys())[0]\n\nnetwork_input_shape = exec_net.input_info[input_key].tensor_desc.dims\n(network_image_height, network_image_width) = network_input_shape[2:]"}, {"cell_type": "markdown", "metadata": {}, "source": "## Safety Gear Detection on a Single Image"}, {"cell_type": "code", "execution_count": null, "metadata": {"colab": {"base_uri": "https://localhost:8080/"}, "id": "central-psychology", "outputId": "d864ee96-3fbd-488d-da1a-88e730f34aad", "tags": []}, "outputs": [], "source": "image = load_image(\"media/safety-gear-image.jpg\")\n# resize to input shape for network\nresized_image = cv2.resize(image, (network_image_width, network_image_height))\n\n# reshape image to network input shape NCHW\ninput_image = np.expand_dims(np.transpose(resized_image, (2, 0, 1)), 0)\nplt.imshow(image[:, :, (2, 1, 0)])"}, {"cell_type": "markdown", "metadata": {"id": "taken-spanking"}, "source": "### Do inference on image\n\nDo the inference, convert the result to an image, and resize it to the original image shape"}, {"cell_type": "code", "execution_count": null, "metadata": {"id": "banner-kruger", "tags": []}, "outputs": [], "source": "result = exec_net.infer(inputs={input_key: input_image})[output_key]"}, {"cell_type": "markdown", "metadata": {}, "source": "### Display result"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "labels = open(LABELS_FILE).read().splitlines()\nlabeldict = {i + 1: labelname for i, labelname in enumerate(labels)}\n\nresult_image_rgb = convert_result_to_image(image, result, labeldict)\nplt.figure(figsize=(12, 6))\nplt.imshow(result_image_rgb)"}, {"cell_type": "markdown", "metadata": {}, "source": "## Safety Gear Detection on Video"}, {"cell_type": "code", "execution_count": null, "metadata": {"colab": {"base_uri": "https://localhost:8080/"}, "id": "terminal-dividend", "outputId": "87f5ada0-8caf-49c3-fe54-626e2b1967f3", "tags": []}, "outputs": [], "source": "VIDEO_FILE = \"media/Safety_Full_Hat_and_Vest.mp4\"\n# worker_zone video source: https://github.com/intel-iot-devkit/sample-videos\nVIDEO_FILE = \"media/worker_zone_detection_small.mp4\"\n# Number of video frames to process. Set to 0 to process all frames.\nNUM_FRAMES = 240\n# Scale the output video sides with a factor of SCALE_OUTPUT\n# If the original video has a resolution of 1920x1080, a factor\n# of 0.5 results in an output video of 960x540\n# Set to 1 to keep the original resolution\nSCALE_OUTPUT = 0.5\n# Create Path objects for the input video and the resulting video\nvideo_path = Path(VIDEO_FILE)\nresult_video_path = video_path.with_name(f\"{video_path.stem}_result.mp4\")"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "cap = cv2.VideoCapture(str(video_path))\nret, image = cap.read()\nif not ret:\n raise ValueError(f\"The video at {video_path} cannot be read.\")\nFPS = cap.get(cv2.CAP_PROP_FPS)\ninput_frame_height, input_frame_width = image.shape[:2]\n# The format to use for video encoding. VP90 is slow,\n# but it works on most systems.\n# Try the THEO encoding if you have FFMPEG installed.\n# FOURCC = cv2.VideoWriter_fourcc(*\"VP90\")\nFOURCC = cv2.VideoWriter_fourcc(*\"vp09\")\n\ncap.release()\nprint(\n f\"The input video has a frame width of {input_frame_width}, \"\n f\"frame height of {input_frame_height} and runs at {FPS} fps\"\n)"}, {"cell_type": "markdown", "metadata": {}, "source": "### Inference loop"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "frame_nr = 1\ntarget_frame_width = int(input_frame_width * SCALE_OUTPUT)\ntarget_frame_height = int(input_frame_height * SCALE_OUTPUT)\nstart_time = time.perf_counter()\ntotal_inference_duration = 0\n\ncap = cv2.VideoCapture(str(video_path))\nout_video = cv2.VideoWriter(\n str(result_video_path),\n FOURCC,\n FPS,\n (target_frame_width, target_frame_height),\n)\n\ntotal_frames = (\n cap.get(cv2.CAP_PROP_FRAME_COUNT) if NUM_FRAMES == 0 else NUM_FRAMES\n)\nprogress_bar = ProgressBar(total=total_frames)\nprogress_bar.display()\n\ntry:\n while cap.isOpened():\n ret, image = cap.read()\n if not ret:\n cap.release()\n break\n\n if frame_nr == total_frames:\n break\n\n # Prepare frame for inference\n # resize to input shape for network\n resized_image = cv2.resize(\n image, (network_image_height, network_image_width)\n )\n # reshape image to network input shape NCHW\n input_image = np.expand_dims(np.transpose(resized_image, (2, 0, 1)), 0)\n\n # Do inference\n inference_start_time = time.perf_counter()\n result = exec_net.infer(inputs={input_key: input_image})[output_key]\n inference_stop_time = time.perf_counter()\n inference_duration = inference_stop_time - inference_start_time\n total_inference_duration += inference_duration\n\n if frame_nr % 10 == 0:\n clear_output(wait=True)\n progress_bar.display()\n display(\n Pretty(\n f\"Processed frame {frame_nr}. \"\n f\"Inference time: {inference_duration:.2f} seconds \"\n f\"({1/inference_duration:.2f} FPS)\"\n )\n )\n\n # Transform network result to RGB image\n result_frame = convert_result_to_image(image, result, labeldict)[\n :, :, (2, 1, 0)\n ]\n # Resize to original image shape\n result_frame = cv2.resize(\n result_frame, (target_frame_width, target_frame_height)\n )\n # Save frame to video\n out_video.write(result_frame)\n\n frame_nr = frame_nr + 1\n progress_bar.progress = frame_nr\n progress_bar.update()\n\nexcept KeyboardInterrupt:\n print(\"Processing interrupted.\")\nfinally:\n out_video.release()\n cap.release()\n end_time = time.perf_counter()\n duration = end_time - start_time\n clear_output()\n print(f\"Safety Gear Detection Video saved to '{str(result_video_path)}'.\")\n print(\n f\"Processed {frame_nr} frames in {duration:.2f} seconds. \"\n f\"Total FPS (including video processing): {frame_nr/duration:.2f}.\"\n f\"Inference FPS: {frame_nr/total_inference_duration:.2f} \"\n )"}, {"cell_type": "markdown", "metadata": {"execution": {"iopub.execute_input": "2021-04-16T13:38:56.065237Z", "iopub.status.busy": "2021-04-16T13:38:56.065237Z", "iopub.status.idle": "2021-04-16T13:38:56.085468Z", "shell.execute_reply": "2021-04-16T13:38:56.085468Z", "shell.execute_reply.started": "2021-04-16T13:38:56.065237Z"}}, "source": "### Display or download video with results"}, {"cell_type": "code", "execution_count": null, "metadata": {"tags": []}, "outputs": [], "source": "# TODO: embed=True doesn't work well for large videos\nvideo = Video(result_video_path, width=800, embed=True)\nif not result_video_path.exists():\n plt.imshow(result_frame)\n raise ValueError(\n \"OpenCV was unable to write the video file. Showing one video frame.\"\n )\nelse:\n print(\n \"Showing Safety Gear Detection video saved at\\n\"\n f\"{result_video_path.resolve()}\"\n )\n print(\n \"If you cannot see the video in your browser, please click on the \"\n \"following link to download the video \"\n )\n video_link = FileLink(result_video_path)\n video_link.html_link_str = \"<a href='%s' download>%s</a>\"\n display(HTML(video_link._repr_html_()))\n display(video)"}, {"cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": ""}], "metadata": {"kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}, "language_info": {"codemirror_mode": {"name": "ipython", "version": 3}, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8"}, "nbTranslate": {"displayLangs": ["*"], "hotkey": "alt-t", "langInMainMenu": true, "sourceLang": "en", "targetLang": "fr", "useGoogleTranslate": true}, "toc": {"base_numbering": 1, "nav_menu": {}, "number_sections": true, "sideBar": true, "skip_h1_title": true, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {"height": "calc(100% - 180px)", "left": "10px", "top": "150px", "width": "251.4px"}, "toc_section_display": true, "toc_window_display": true}}, "nbformat": 4, "nbformat_minor": 5}
libpython3.7-dev
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment