Skip to content

Instantly share code, notes, and snippets.

@Mahyar24
Created August 20, 2023 14:13
Show Gist options
  • Save Mahyar24/e51367e1411ffdd75b97efdd7c361c2c to your computer and use it in GitHub Desktop.
Save Mahyar24/e51367e1411ffdd75b97efdd7c361c2c to your computer and use it in GitHub Desktop.
Detecting motion, and then recording, streaming and sending an SMS.
#! /usr/bin/env python3.10
"""
This module is used for detecting motion, and then recording, streaming and sending an SMS.
Install packages beforehand via: `pip install khayyam pytz requests opencv-python`
FFmpeg is required for streaming, and it should be in your PATH.
Remember to fill in these items:
- streaming link: line 66
- SMS sender and receivers numbers: line 69
- SMS API key: line 78 (I'm using SMS.ir API)
- SMS API link: line 179
GitHub: https://github.com/Mahyar24
Mahyar@Mahyar24.com, Mon 6 May 2019
"""
import json
import logging
import os
import subprocess
import threading
from datetime import datetime, timedelta
from time import struct_time
import cv2
import requests
from khayyam import JalaliDatetime
from pytz import timezone
def tehran_time(*_, **__): # for logging in jalali time and dates.
tehran_dt = JalaliDatetime.now(tz=timezone("Asia/Tehran"))
return struct_time(
(
tehran_dt.year,
tehran_dt.month,
tehran_dt.day,
tehran_dt.hour,
tehran_dt.minute,
tehran_dt.second,
tehran_dt.weekday(),
tehran_dt.dayofyear(),
1,
)
)
logging.raiseExceptions = False
logging.Formatter.converter = tehran_time
logger = logging.getLogger(os.path.basename(__file__))
logger.setLevel(logging.INFO)
logger_file = logging.FileHandler("SpyCam.log")
logger_stream = logging.StreamHandler()
handler_formatter = logging.Formatter(
"%(asctime)s:%(message)s.", datefmt="%Y-%m-%d %H:%M:%S"
)
logger_file.setFormatter(handler_formatter)
logger_stream.setFormatter(handler_formatter)
logger.addHandler(logger_file)
logger.addHandler(logger_stream)
class SpyCam:
# Replace these with your own values:
link = "rtmp://[ip]/[stream]/[eye]?pwd=[password]"
payload = {
# Replace these with your own values:
"lineNumber": 0000000,
"mobiles": [
"000000",
],
"sendDateTime": None,
}
headers = {
# Replace this with your own value:
"X-API-KEY": "YOUR_API_KEY",
"Content-Type": "application/json",
}
def __init__(self, sensitivity=0.05, timeout=60, path=".", codec="XVID", fps=18):
self.sensitivity = sensitivity
self.timeout = timedelta(seconds=timeout)
self.path = path
self.is_recording = False
self.last_seen = datetime.now(tz=timezone("Asia/Tehran"))
self.frame = None
self.fourcc = cv2.VideoWriter_fourcc(*codec)
self.now_output = None
self.now_output_name = ""
self.cam = cv2.VideoCapture(0)
self.width = int(self.cam.get(cv2.CAP_PROP_FRAME_WIDTH))
self.height = int(self.cam.get(cv2.CAP_PROP_FRAME_HEIGHT))
self.fps = float(min(int(self.cam.get(cv2.CAP_PROP_FPS)), int(fps)))
self.resolution = int(self.width * self.height)
self.command = [
"ffmpeg",
"-y",
"-f",
"rawvideo",
"-vcodec",
"rawvideo",
"-pix_fmt",
"bgr24",
"-s",
f"{self.width}x{self.height}",
"-r",
str(self.fps),
"-i",
"-",
"-c:v",
"libx264",
"-pix_fmt",
"yuv420p",
"-preset",
"ultrafast",
"-f",
"flv",
self.link,
]
self.ffmpeg = subprocess.Popen(
self.command,
stdin=subprocess.PIPE,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
self.threshold = int(10 / self.sensitivity)
self.timelapse = int(10 / self.sensitivity)
self.subtractor = cv2.createBackgroundSubtractorMOG2(
history=self.timelapse, varThreshold=self.threshold, detectShadows=True
)
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.cam.release()
@staticmethod
def time():
return datetime.now(tz=timezone("Asia/Tehran"))
def output_name(self):
if not self.is_recording:
self.now_output_name = (
JalaliDatetime(self.last_seen).strftime("%Y_%m_%d_%H_%M_%S") + ".avi"
)
return self.now_output_name
def output(self):
if not self.is_recording:
self.now_output = cv2.VideoWriter(
os.path.join(self.path, self.output_name()),
self.fourcc,
self.fps,
(self.width, self.height),
)
return self.now_output
def seeing(self):
_, self.frame = self.cam.read()
grayed = cv2.cvtColor(self.frame, cv2.COLOR_BGR2GRAY)
mask = self.subtractor.apply(grayed)
return mask
def check(self):
mask = self.seeing()
n = cv2.countNonZero(mask)
if (motion := (n / self.resolution)) >= (0.0001 / self.sensitivity):
if motion != 1.0: # at first moment our motion ratio is exactly 1.0!
return True
return False
def _send_sms(self):
try:
resp = requests.post(
# Replace this with your own value:
"https://api.sms.ir/v1/send/bulk",
data=json.dumps(self.payload),
headers=self.headers,
)
resp.raise_for_status()
except requests.exceptions.RequestException:
logger.error(f"SMS failed to send: {resp.text}!r")
else:
logger.warning(f"SMS sent successfully: {resp.text}!r")
def send_sms(self):
time = JalaliDatetime(self.last_seen).strftime("%Y,%m,%d - %H:%M:%S")
self.payload[
"messageText"
] = f"Movement Detected at {time}, Live streaming: {self.link}"
threading.Thread(target=self._send_sms).start()
def record_upload(self):
self.ffmpeg.stdin.write(self.frame.tobytes())
self.output().write(self.frame)
def guard(self):
logger.warning("Start guarding")
while True:
motion = self.check()
if self.is_recording:
if motion:
self.record_upload()
self.last_seen = self.time()
else:
if (self.time() - self.last_seen) >= self.timeout:
logger.warning("Camera Released")
self.output().release()
self.is_recording = False
else:
self.record_upload()
else:
if motion:
logger.warning("Motion Detected")
self.send_sms()
self.last_seen = self.time()
self.record_upload()
self.is_recording = True
if __name__ == "__main__":
with SpyCam(sensitivity=0.05, timeout=30) as cam:
try:
cam.guard()
except KeyboardInterrupt:
logger.warning("Receiving SIGINT Signal; Terminating")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment