Skip to content

Instantly share code, notes, and snippets.

@ysuito
Created January 10, 2023 14:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ysuito/73fd12ea78d7b517ccd1538c9e1f9fed to your computer and use it in GitHub Desktop.
Save ysuito/73fd12ea78d7b517ccd1538c9e1f9fed to your computer and use it in GitHub Desktop.
Smart Plow
import cv2
import numpy as np
import serial
from pywttr import Wttr
import datetime, time, urllib, json
from PIL import Image, ImageDraw, ImageFont
cameras = [n for n in range(3)]
in_camera = 0
shooter_camera = 1
city = "Nagaoka" # Default Nagaoka City.
com = "COM4" # Set COM Port
WIDTH = 480
HEIGHT = 360
HORIZONAL_THRESHOLD = 80
dummy_img = np.zeros((HEIGHT, WIDTH, 3)).astype(np.uint8)
class Cameras():
def __init__(self, cams) -> None:
self.caps = []
for n in cams:
cap = cv2.VideoCapture(n)
ret,img = cap.read()
if ret:
img = cv2.resize(img, (WIDTH, HEIGHT))
self.caps.append(cap)
class FaceDetector():
def __init__(self) -> None:
self.cascade_path = "./data/haarcascade_frontalface_default.xml"
self.cascade = cv2.CascadeClassifier(self.cascade_path)
self.last_time = time.time()
def detect(self, img):
img = np.copy(img)
facerects = self.cascade.detectMultiScale(img, scaleFactor=1.1, minNeighbors=2, minSize=(30, 30))
if len(facerects) > 0:
for rect in facerects:
cv2.rectangle(img, tuple(rect[0:2]),tuple(rect[0:2]+rect[2:4]), (255,204,0), thickness=2)
self.last_time = time.time()
if time.time() - self.last_time > 10:
cv2.putText(
img, #numpy array on which text is written
"DEAD MAN", #text
(70,180), #position at which writing has to start
cv2.FONT_HERSHEY_SIMPLEX, #font family
2, #font size
(0, 0, 255), #font color
3) #font stroke
return img
class BodyDetector():
def __init__(self) -> None:
self.cascade_path = "./data/haarcascade_fullbody.xml"
self.cascade = cv2.CascadeClassifier(self.cascade_path)
def detect(self, img):
img = np.copy(img)
bodyrects = self.cascade.detectMultiScale(img, scaleFactor=1.1, minNeighbors=2, minSize=(30, 30))
if len(bodyrects) > 0:
for rect in bodyrects:
cv2.rectangle(img, tuple(rect[0:2]),tuple(rect[0:2]+rect[2:4]), (255,204,0), thickness=2)
cv2.putText(
img, #numpy array on which text is written
"WARNING", #text
(70,180), #position at which writing has to start
cv2.FONT_HERSHEY_SIMPLEX, #font family
2, #font size
(0, 204, 255), #font color
3) #font stroke
return img
class Weather():
def __init__(self,city) -> None:
self.city = city
self.desc = ""
self.temp = ""
self.pressure = ""
def current(self):
wttr = Wttr(self.city)
forecast = wttr.en()
self.pressure = forecast.current_condition[0].pressure
self.temp = forecast.current_condition[0].temp_c
self.desc = forecast.current_condition[0].weather_desc[0].value
class Shooter():
def __init__(self, micon) -> None:
self.micon = micon
def detect_pylons(self, img):
# Extract Red
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV)
# Red Mask 1
lower = np.array([0, 128, 128])
upper = np.array([10, 255, 255])
mask1 = cv2.inRange(hsv, lower, upper)
# Red Mask 2
lower = np.array([170, 128, 128])
upper = np.array([179, 255, 255])
mask2 = cv2.inRange(hsv, lower, upper)
# Red Mask
mask = mask1 + mask2
masked_img = cv2.bitwise_and(img, img, mask=mask)
# Find Contours
gray = cv2.cvtColor(masked_img, cv2.COLOR_BGR2GRAY)
ret, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_OTSU)
(contours,_) = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE)
candidates = []
for contour in contours:
(x,y,w,h) = cv2.boundingRect(contour)
if 10 < w < WIDTH and 10 < h < HEIGHT:
candidates.append((x,y,w,h))
return candidates
def stop(self):
self.micon.write(b'shooter=stop\n')
def left(self):
self.micon.write(b'shooter=left\n')
def right(self):
self.micon.write(b'shooter=right\n')
def auto_control(self, img):
pylons = self.detect_pylons(img)
# Decide Target
if len(pylons) > 0:
pylon = max(pylons, key=lambda x: x[2]*x[3])
(x,y,w,h) = pylon
# Draw Pylon
cv2.rectangle(img, (x,y), (x+w,y+h), (0,255,0), 2)
# Draw Lines
# 投雪により、視界が塞がれるので左側にオフセットした位置を中心とする。(パイロンから少し右に投雪される)
target_x = int(WIDTH / 2) - HORIZONAL_THRESHOLD
cv2.line(img,(target_x,0),(target_x,HEIGHT),(0,255,0),2)
cv2.line(img,(target_x - HORIZONAL_THRESHOLD,0),(target_x - HORIZONAL_THRESHOLD,HEIGHT),(0,0,255),2)
cv2.line(img,(target_x + HORIZONAL_THRESHOLD,0),(target_x + HORIZONAL_THRESHOLD,HEIGHT),(0,0,255),2)
# Exec Controll
pylon_x_center = x + w / 2
if pylon_x_center > target_x + HORIZONAL_THRESHOLD:
print('Turn to right...')
self.right()
cv2.arrowedLine(img, (WIDTH - int(WIDTH / 6), int(HEIGHT / 2)), (WIDTH - 10, int(HEIGHT / 2)), (0, 255, 0), thickness=4)
elif pylon_x_center < target_x - HORIZONAL_THRESHOLD:
self.left()
print('Turn to left...')
cv2.arrowedLine(img, (int(WIDTH / 6), int(HEIGHT / 2)), (10, int(HEIGHT / 2)), (0, 255, 0), thickness=4)
else:
self.stop()
else:
self.stop()
return img
class ESP32():
def __init__(self, com, band_width=115200) -> None:
self.ser = serial.Serial(com, band_width)
def read(self):
line = self.ser.readline()
try:
line = line.decode('shift_jis').rstrip()
except:
print('serial read error')
return False
return line
def write(self, val):
self.ser.write(val)
def draw_frame(imgs, data):
mergeImg = np.empty(0)
note = Image.new("RGB", (WIDTH, HEIGHT), (0, 0, 0))
draw = ImageDraw.Draw(note)
x = 20
y = 20
for d in data:
draw.text((x, y), d[0], fill=(0, 204, 255), font=d[1]) # Text color is BGR
y = y + 50
if len(imgs) == 3:
row1 = np.hstack(imgs[:2])
row2 = np.hstack([imgs[2], note])
mergeImg = np.vstack([row1, row2])
elif len(imgs) == 2:
row1 = np.hstack(imgs[:2])
row2 = np.hstack([dummy_img, note])
mergeImg = np.vstack([row1, row2])
elif len(imgs) == 1:
mergeImg = np.hstack([imgs[0],note])
return mergeImg
def main():
print('pg started')
weather = Weather(city)
weather.current()
print('got weather')
cams = Cameras(cameras)
print('cam prepaired')
micon = ESP32(com, 115200)
shooter = Shooter(micon)
face_detector = FaceDetector()
body_detector = BodyDetector()
while True:
#read serial
try:
acx, acy, acz, gyx, gyy, gyz, temp = micon.read().split(',')
except:
print('serial parsee rror')
continue
# take picture
imgs = []
for i, cap in enumerate(cams.caps):
ret,img = cap.read()
if ret:
img = cv2.resize(img, (WIDTH, HEIGHT))
if i == in_camera:
img = face_detector.detect(img)
if i == shooter_camera:
img = shooter.auto_control(img)
img = body_detector.detect(img)
imgs.append(img)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
data = [
[f'時間: {datetime.datetime.now().strftime("%H:%M:%S")}', ImageFont.truetype('C:/Windows/Fonts/meiryob.ttc', 40)],
[f'気温: {weather.temp}℃', ImageFont.truetype('C:/Windows/Fonts/meiryob.ttc', 40)],
[f'気圧: {weather.pressure}hPa', ImageFont.truetype('C:/Windows/Fonts/meiryob.ttc', 40)],
[f'天気: {weather.desc}', ImageFont.truetype('C:/Windows/Fonts/meiryob.ttc', 20)],
[f'acx:{acx},acy:{acy},acz:{acz}', ImageFont.truetype('C:/Windows/Fonts/meiryob.ttc', 20)],
]
mergeImg = draw_frame(imgs, data)
cv2.imshow('Video', mergeImg)
time.sleep(0.1)
for cap in caps:
cap.release()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment