-
-
Save ysuito/73fd12ea78d7b517ccd1538c9e1f9fed to your computer and use it in GitHub Desktop.
Smart Plow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cv2 | |
import numpy as np | |
import serial | |
from pywttr import Wttr | |
import datetime, time, urllib, json | |
from PIL import Image, ImageDraw, ImageFont | |
cameras = [n for n in range(3)] | |
in_camera = 0 | |
shooter_camera = 1 | |
city = "Nagaoka" # Default Nagaoka City. | |
com = "COM4" # Set COM Port | |
WIDTH = 480 | |
HEIGHT = 360 | |
HORIZONAL_THRESHOLD = 80 | |
dummy_img = np.zeros((HEIGHT, WIDTH, 3)).astype(np.uint8) | |
class Cameras(): | |
def __init__(self, cams) -> None: | |
self.caps = [] | |
for n in cams: | |
cap = cv2.VideoCapture(n) | |
ret,img = cap.read() | |
if ret: | |
img = cv2.resize(img, (WIDTH, HEIGHT)) | |
self.caps.append(cap) | |
class FaceDetector(): | |
def __init__(self) -> None: | |
self.cascade_path = "./data/haarcascade_frontalface_default.xml" | |
self.cascade = cv2.CascadeClassifier(self.cascade_path) | |
self.last_time = time.time() | |
def detect(self, img): | |
img = np.copy(img) | |
facerects = self.cascade.detectMultiScale(img, scaleFactor=1.1, minNeighbors=2, minSize=(30, 30)) | |
if len(facerects) > 0: | |
for rect in facerects: | |
cv2.rectangle(img, tuple(rect[0:2]),tuple(rect[0:2]+rect[2:4]), (255,204,0), thickness=2) | |
self.last_time = time.time() | |
if time.time() - self.last_time > 10: | |
cv2.putText( | |
img, #numpy array on which text is written | |
"DEAD MAN", #text | |
(70,180), #position at which writing has to start | |
cv2.FONT_HERSHEY_SIMPLEX, #font family | |
2, #font size | |
(0, 0, 255), #font color | |
3) #font stroke | |
return img | |
class BodyDetector(): | |
def __init__(self) -> None: | |
self.cascade_path = "./data/haarcascade_fullbody.xml" | |
self.cascade = cv2.CascadeClassifier(self.cascade_path) | |
def detect(self, img): | |
img = np.copy(img) | |
bodyrects = self.cascade.detectMultiScale(img, scaleFactor=1.1, minNeighbors=2, minSize=(30, 30)) | |
if len(bodyrects) > 0: | |
for rect in bodyrects: | |
cv2.rectangle(img, tuple(rect[0:2]),tuple(rect[0:2]+rect[2:4]), (255,204,0), thickness=2) | |
cv2.putText( | |
img, #numpy array on which text is written | |
"WARNING", #text | |
(70,180), #position at which writing has to start | |
cv2.FONT_HERSHEY_SIMPLEX, #font family | |
2, #font size | |
(0, 204, 255), #font color | |
3) #font stroke | |
return img | |
class Weather(): | |
def __init__(self,city) -> None: | |
self.city = city | |
self.desc = "" | |
self.temp = "" | |
self.pressure = "" | |
def current(self): | |
wttr = Wttr(self.city) | |
forecast = wttr.en() | |
self.pressure = forecast.current_condition[0].pressure | |
self.temp = forecast.current_condition[0].temp_c | |
self.desc = forecast.current_condition[0].weather_desc[0].value | |
class Shooter(): | |
def __init__(self, micon) -> None: | |
self.micon = micon | |
def detect_pylons(self, img): | |
# Extract Red | |
hsv = cv2.cvtColor(img, cv2.COLOR_BGR2HSV) | |
# Red Mask 1 | |
lower = np.array([0, 128, 128]) | |
upper = np.array([10, 255, 255]) | |
mask1 = cv2.inRange(hsv, lower, upper) | |
# Red Mask 2 | |
lower = np.array([170, 128, 128]) | |
upper = np.array([179, 255, 255]) | |
mask2 = cv2.inRange(hsv, lower, upper) | |
# Red Mask | |
mask = mask1 + mask2 | |
masked_img = cv2.bitwise_and(img, img, mask=mask) | |
# Find Contours | |
gray = cv2.cvtColor(masked_img, cv2.COLOR_BGR2GRAY) | |
ret, binary = cv2.threshold(gray, 0, 255, cv2.THRESH_OTSU) | |
(contours,_) = cv2.findContours(binary, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) | |
candidates = [] | |
for contour in contours: | |
(x,y,w,h) = cv2.boundingRect(contour) | |
if 10 < w < WIDTH and 10 < h < HEIGHT: | |
candidates.append((x,y,w,h)) | |
return candidates | |
def stop(self): | |
self.micon.write(b'shooter=stop\n') | |
def left(self): | |
self.micon.write(b'shooter=left\n') | |
def right(self): | |
self.micon.write(b'shooter=right\n') | |
def auto_control(self, img): | |
pylons = self.detect_pylons(img) | |
# Decide Target | |
if len(pylons) > 0: | |
pylon = max(pylons, key=lambda x: x[2]*x[3]) | |
(x,y,w,h) = pylon | |
# Draw Pylon | |
cv2.rectangle(img, (x,y), (x+w,y+h), (0,255,0), 2) | |
# Draw Lines | |
# 投雪により、視界が塞がれるので左側にオフセットした位置を中心とする。(パイロンから少し右に投雪される) | |
target_x = int(WIDTH / 2) - HORIZONAL_THRESHOLD | |
cv2.line(img,(target_x,0),(target_x,HEIGHT),(0,255,0),2) | |
cv2.line(img,(target_x - HORIZONAL_THRESHOLD,0),(target_x - HORIZONAL_THRESHOLD,HEIGHT),(0,0,255),2) | |
cv2.line(img,(target_x + HORIZONAL_THRESHOLD,0),(target_x + HORIZONAL_THRESHOLD,HEIGHT),(0,0,255),2) | |
# Exec Controll | |
pylon_x_center = x + w / 2 | |
if pylon_x_center > target_x + HORIZONAL_THRESHOLD: | |
print('Turn to right...') | |
self.right() | |
cv2.arrowedLine(img, (WIDTH - int(WIDTH / 6), int(HEIGHT / 2)), (WIDTH - 10, int(HEIGHT / 2)), (0, 255, 0), thickness=4) | |
elif pylon_x_center < target_x - HORIZONAL_THRESHOLD: | |
self.left() | |
print('Turn to left...') | |
cv2.arrowedLine(img, (int(WIDTH / 6), int(HEIGHT / 2)), (10, int(HEIGHT / 2)), (0, 255, 0), thickness=4) | |
else: | |
self.stop() | |
else: | |
self.stop() | |
return img | |
class ESP32(): | |
def __init__(self, com, band_width=115200) -> None: | |
self.ser = serial.Serial(com, band_width) | |
def read(self): | |
line = self.ser.readline() | |
try: | |
line = line.decode('shift_jis').rstrip() | |
except: | |
print('serial read error') | |
return False | |
return line | |
def write(self, val): | |
self.ser.write(val) | |
def draw_frame(imgs, data): | |
mergeImg = np.empty(0) | |
note = Image.new("RGB", (WIDTH, HEIGHT), (0, 0, 0)) | |
draw = ImageDraw.Draw(note) | |
x = 20 | |
y = 20 | |
for d in data: | |
draw.text((x, y), d[0], fill=(0, 204, 255), font=d[1]) # Text color is BGR | |
y = y + 50 | |
if len(imgs) == 3: | |
row1 = np.hstack(imgs[:2]) | |
row2 = np.hstack([imgs[2], note]) | |
mergeImg = np.vstack([row1, row2]) | |
elif len(imgs) == 2: | |
row1 = np.hstack(imgs[:2]) | |
row2 = np.hstack([dummy_img, note]) | |
mergeImg = np.vstack([row1, row2]) | |
elif len(imgs) == 1: | |
mergeImg = np.hstack([imgs[0],note]) | |
return mergeImg | |
def main(): | |
print('pg started') | |
weather = Weather(city) | |
weather.current() | |
print('got weather') | |
cams = Cameras(cameras) | |
print('cam prepaired') | |
micon = ESP32(com, 115200) | |
shooter = Shooter(micon) | |
face_detector = FaceDetector() | |
body_detector = BodyDetector() | |
while True: | |
#read serial | |
try: | |
acx, acy, acz, gyx, gyy, gyz, temp = micon.read().split(',') | |
except: | |
print('serial parsee rror') | |
continue | |
# take picture | |
imgs = [] | |
for i, cap in enumerate(cams.caps): | |
ret,img = cap.read() | |
if ret: | |
img = cv2.resize(img, (WIDTH, HEIGHT)) | |
if i == in_camera: | |
img = face_detector.detect(img) | |
if i == shooter_camera: | |
img = shooter.auto_control(img) | |
img = body_detector.detect(img) | |
imgs.append(img) | |
if cv2.waitKey(1) & 0xFF == ord('q'): | |
break | |
data = [ | |
[f'時間: {datetime.datetime.now().strftime("%H:%M:%S")}', ImageFont.truetype('C:/Windows/Fonts/meiryob.ttc', 40)], | |
[f'気温: {weather.temp}℃', ImageFont.truetype('C:/Windows/Fonts/meiryob.ttc', 40)], | |
[f'気圧: {weather.pressure}hPa', ImageFont.truetype('C:/Windows/Fonts/meiryob.ttc', 40)], | |
[f'天気: {weather.desc}', ImageFont.truetype('C:/Windows/Fonts/meiryob.ttc', 20)], | |
[f'acx:{acx},acy:{acy},acz:{acz}', ImageFont.truetype('C:/Windows/Fonts/meiryob.ttc', 20)], | |
] | |
mergeImg = draw_frame(imgs, data) | |
cv2.imshow('Video', mergeImg) | |
time.sleep(0.1) | |
for cap in caps: | |
cap.release() | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment