Last active
November 3, 2017 05:10
-
-
Save mathfur/31e5f248650db36825d9f69e1d6159e4 to your computer and use it in GitHub Desktop.
raspberry_pi_home_system
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
try: | |
import RPi.GPIO as GPIO | |
import smbus | |
from Adafruit_LED_Backpack import SevenSegment | |
bus = smbus.SMBus(1) | |
seven_segment = SevenSegment.SevenSegment(address=0x70) | |
seven_segment.begin() | |
except: | |
print "failed to import GPIO" | |
import datetime | |
import time | |
import os | |
import random | |
import subprocess | |
bm = None | |
host_name = 1/0 # TODO: 書き直す | |
sec_for_moving_moter = 5 #7*3600 | |
max_for_moter_count = 6 # on_for_momentを最大何回実行するか | |
# デバッグモード時のみprintする | |
def print_d(e, debug_mode): | |
if debug_mode: | |
print(e) | |
def global_receive_input(gpio_num): | |
bm.receive_input(gpio_num) | |
def post_to_server(task_name, start_time, finish_time): | |
print(">>post_to_server(%s, %s, %s)" % (task_name, start_time, finish_time)) | |
start_str = start_time.strftime("%Y-%m-%dT%H:%M:%S") | |
finish_str = finish_time.strftime("%Y-%m-%dT%H:%M:%S") | |
content = ("%s,%s,%s" % (task_name, start_str, finish_str)) | |
cmd = ("curl 'https://%s/base/s_notify/post_vals_by_get?tag=raspberry_pi&content=%s'" % (host_name, content)) | |
print(">>" + cmd) | |
os.system(cmd) | |
def show_to_7segment_led(sec): | |
print(">>show_to_7segment_led(%s)" % sec) | |
s = int(sec % 60) | |
m = int(sec / 60) | |
d0 = int(m / 10) | |
d1 = int(m % 10) | |
d2 = int(s / 10) | |
d3 = int(s % 10) | |
seven_segment.clear() | |
seven_segment.set_digit(0, d0) | |
seven_segment.set_digit(1, d1) | |
seven_segment.set_digit(2, d2) | |
seven_segment.set_digit(3, d3) | |
seven_segment.set_colon(1) | |
seven_segment.write_display() | |
time.sleep(0.1) | |
def get_now_info(): | |
wday = datetime.datetime.now().weekday() # 0: 月曜 | |
day = datetime.datetime.now().day | |
hour = datetime.datetime.now().hour # 0〜23 | |
# 15:00以降は明日の曜日に直す | |
if 15 <= hour: | |
wday = (wday + 1) % 7 | |
day = day + 1 | |
wnum = (day - 1) / 7 # N週目 | |
return (wday, wnum, day, hour) | |
class BoundaryManager: | |
name__gpio_num__pairs_for_input = { | |
# モーター | |
"start_btn": 14, | |
"stop_btn": 15, | |
# タスク | |
"start_measuring_btn_for_rest": 22, | |
"start_measuring_btn_for_bath": 23, | |
"start_measuring_btn_for_sentaku": 24, | |
"finish_measuring_btn": 25, | |
"reset_measuring_btn": 8, | |
} | |
name__gpio_num__pairs_for_output = { | |
"m1": 17, # moter1 | |
"m2": 27, # moter2 | |
"led1": 4, | |
# ごみ | |
"led_for_burnable": 5, | |
"led_for_plastic": 6, | |
"led_for_unburnable": 13, | |
"led_for_recycle": 19, | |
} | |
def __init__(self, debug_mode = False): | |
self.debug_mode = debug_mode | |
self.run_at_later = {} # {name => (time, lambda)} | |
self.event_queue = [] # [(event_name, event_val)] | |
self.pin_vals = {} | |
self.bm = None | |
self.setup_GPIO() | |
def setup_GPIO(self): | |
if not self.debug_mode: | |
GPIO.setmode(GPIO.BCM) | |
#------------------- | |
# 入力ピンについて | |
#------------------- | |
for name, gpio_num in self.name__gpio_num__pairs_for_input.items(): | |
GPIO.setup(gpio_num, GPIO.IN) | |
GPIO.add_event_detect(gpio_num, GPIO.RISING, callback=global_receive_input, bouncetime=300) | |
#------------------- | |
# 出力ピンについて | |
#------------------- | |
for name, gpio_num in self.name__gpio_num__pairs_for_output.items(): | |
GPIO.setup(gpio_num, GPIO.OUT) | |
def set_timer(self, name, sec): | |
# 既に登録済みならハンドラ削除 | |
if self.run_at_later.get(name): | |
self.run_at_later.pop(name) | |
t = datetime.datetime.now() + datetime.timedelta(seconds=sec) | |
self.run_at_later[name] = t | |
def step(self): | |
self.print_inspect() | |
#--------------------- | |
# タイムアウト処理 | |
#--------------------- | |
for name, t in self.run_at_later.items(): | |
now = datetime.datetime.now() | |
if t <= now: | |
print_d("timeouted", self.debug_mode) | |
self.event_queue.append(("timeout", name)) | |
self.run_at_later.pop(name) # Remove | |
else: | |
print_d("not timeout", self.debug_mode) | |
#--------------------- | |
# StateManagerもstep | |
#--------------------- | |
if len(self.event_queue) > 0: | |
event = self.event_queue.pop(0) | |
else: | |
event = (None, None) | |
self.sm.step(event) | |
self.print_inspect() | |
# デバッグ用表示 | |
def print_inspect(self): | |
if self.debug_mode: | |
print("===============================================S") | |
print("event_queue: %s" % self.event_queue) | |
print("sm.s1: %s" % self.sm.s1) | |
print("sm.s2: %s" % self.sm.s2) | |
print("self.pin_vals: %s" % self.pin_vals) | |
print("len(run_at_later): %s" % len(self.run_at_later)) | |
print("run_at_later: %s" % self.run_at_later) | |
print("===============================================E") | |
def on(self, gpio_num): | |
self.change_pin_val(gpio_num, True) | |
def off(self, gpio_num): | |
self.change_pin_val(gpio_num, False) | |
def change_pin_val(self, gpio_num, val): | |
#print "change_pin_val: %s, %s" % (gpio_num, val) | |
self.pin_vals[gpio_num] = val | |
if not self.debug_mode: | |
# 対応するピンがあれば出力する | |
gpio_num = self.name__gpio_num__pairs_for_output.get(gpio_num) | |
if gpio_num: | |
GPIO.output(gpio_num, val) | |
# モーター用, 一瞬だけONにする | |
def on_for_moment(self, gpio_num): | |
self.on(gpio_num) | |
time.sleep(0.2) #=> ギアボックス経由なら1/24回転ぐらい?する。モーターは20回転ぐらいする。 | |
self.off(gpio_num) | |
#========================================================================== | |
def receive_input(self, gpio_num): | |
print("receive input: %s" % gpio_num) | |
self.event_queue.append(("click", gpio_num)) | |
class StateManager: | |
def __init__(self, boundary_manager, debug_mode=False): | |
self.s1 = "start" | |
self.s1_tmp_for_moter_min = None | |
self.s1_tmp_for_moter_count = 0 | |
self.s2 = "start" | |
self.s3 = "start" | |
self.s3_task_name = None | |
self.s3_start_time = None | |
self.debug_mode = debug_mode | |
self.bm = boundary_manager | |
def step(self, event = None): | |
if event: | |
(event_name, event_val) = event | |
else: | |
(event_name, event_val) = (None, None) | |
if event_name: | |
print(">>StateManager.step(%s, %s)" % (event_name, event_val)) | |
self.execute_for_heart_beat(event_name, event_val) | |
self.execute_measuring_time_of_task(event_name, event_val) | |
self.execute_showing_collection_day(event_name, event_val) | |
self.execute_for_curtain(event_name, event_val) | |
def execute_for_heart_beat(self, event_name, event_val): | |
next_state = None | |
#========================================================== | |
if self.s2 == "start": | |
next_state = "on" | |
#========================================================== | |
if self.s2 == "on": | |
if event_name == "timeout" and (event_val in ["heart_beat_off"]): | |
next_state = "off" | |
#========================================================== | |
elif self.s2 == "off": | |
if event_name == "timeout" and (event_val in ["heart_beat_on"]): | |
next_state = "on" | |
#========================================================== | |
# 遷移したときの処理 | |
#========================================================== | |
if next_state != None and self.s2 != next_state: | |
print_d("s2: change state: %s -> %s" % (self.s2, next_state), self.debug_mode) | |
#----------------------------------------- | |
if next_state == "start": | |
__do_not_anything__ = "" | |
#----------------------------------------- | |
elif next_state == "on": | |
self.bm.on("led1") # LEDをつける | |
self.bm.set_timer("heart_beat_off", 1) | |
#----------------------------------------- | |
elif next_state == "off": | |
self.bm.off("led1") # LEDを消す | |
self.bm.set_timer("heart_beat_on", 1) | |
#----------------------------------------- | |
self.s2 = next_state | |
else: | |
print_d("s2: NOT change state: %s -> %s" % (self.s2, next_state), self.debug_mode) | |
def execute_measuring_time_of_task(self, event_name, event_val): | |
next_state = None | |
#========================================================== | |
if self.s3 == "start": | |
if event_name == "click" and (event_val in ["start_measuring_btn_for_rest", bm.name__gpio_num__pairs_for_input["start_measuring_btn_for_rest"]]): | |
self.s3_start_time = datetime.datetime.now() | |
self.s3_task_name = "rest" | |
next_state = "measuring" | |
elif event_name == "click" and (event_val in ["start_measuring_btn_for_bath", bm.name__gpio_num__pairs_for_input["start_measuring_btn_for_bath"]]): | |
self.s3_start_time = datetime.datetime.now() | |
self.s3_task_name = "bath" | |
next_state = "measuring" | |
elif event_name == "click" and (event_val in ["start_measuring_btn_for_sentaku", bm.name__gpio_num__pairs_for_input["start_measuring_btn_for_sentaku"]]): | |
self.s3_start_time = datetime.datetime.now() | |
self.s3_task_name = "sentaku" | |
next_state = "measuring" | |
#========================================================== | |
if self.s3 == "measuring": | |
if event_name == "click" and (event_val in ["finish_measuring_btn", bm.name__gpio_num__pairs_for_input["finish_measuring_btn"]]): # 無事タスクが完了した時 | |
post_to_server(self.s3_task_name, self.s3_start_time, datetime.datetime.now()) | |
self.s3_start_time = None | |
next_state = "start" | |
elif event_name == "click" and (event_val in ["reset_measuring_btn", bm.name__gpio_num__pairs_for_input["reset_measuring_btn"]]): # タスクを中断する時 | |
self.s3_start_time = None | |
next_state = "start" | |
#----------------------------------- | |
# 7セグメントLEDにdiffを表示させる | |
#----------------------------------- | |
if self.s3_start_time: | |
diff_sec = (datetime.datetime.now() - self.s3_start_time).total_seconds() | |
else: | |
diff_sec = 0 | |
show_to_7segment_led(diff_sec) | |
#========================================================== | |
# 遷移したときの処理 | |
#========================================================== | |
if next_state != None and self.s3 != next_state: | |
print_d("s3: change state: %s -> %s" % (self.s3, next_state), self.debug_mode) | |
#----------------------------------------- | |
if next_state == "start": | |
__do_not_anything__ = "" | |
#----------------------------------------- | |
elif next_state == "measuring": | |
__do_not_anything__ = "" | |
#----------------------------------------- | |
self.s3 = next_state | |
else: | |
print_d("s3: NOT change state: %s -> %s" % (self.s3, next_state), self.debug_mode) | |
def execute_for_curtain(self, event_name, event_val): | |
next_state = None | |
#========================================================== | |
if self.s1 == "start": | |
if event_name == "click" and (event_val in ["start_btn", bm.name__gpio_num__pairs_for_input["start_btn"]]): | |
# 7時間後にタイムアウト | |
self.bm.set_timer("7h_to_morning", sec_for_moving_moter) # 3600*7 | |
next_state = "neterutoki" | |
#========================================================== | |
elif self.s1 == "neterutoki": | |
if event_name == "timeout" and (event_val in ["7h_to_morning"]): | |
next_state = "alerting" | |
elif event_name == "click" and (event_val in ["start_btn", bm.name__gpio_num__pairs_for_input["start_btn"]]): | |
# 7時間後にタイムアウト | |
self.bm.set_timer("7h_to_morning", sec_for_moving_moter) # 3600*7 | |
next_state = "neterutoki" | |
#========================================================== | |
elif self.s1 == "alerting": | |
if event_name == "click" and (event_val in ["stop_btn", bm.name__gpio_num__pairs_for_input["stop_btn"]]): | |
next_state = "start" | |
#========================================================== | |
# 遷移したときの処理 | |
#========================================================== | |
if next_state != None and self.s1 != next_state: | |
#----------------------------------------- | |
if next_state == "start": | |
# moter | |
self.bm.off("m1") | |
self.bm.off("m2") | |
#----------------------------------------- | |
elif next_state == "neterutoki": | |
__tmp__ = 0 | |
#----------------------------------------- | |
elif next_state == "alerting": | |
__do_not_anything__ = "" | |
print("s1: change state: %s -> %s" % (self.s1, next_state)) | |
self.s1 = next_state | |
else: | |
print_d("s1: NOT change state: %s -> %s" % (self.s1, next_state), self.debug_mode) | |
#========================================================== | |
# 現在の状態に応じた処理 | |
#========================================================== | |
#----------------------------------------- | |
if self.s1 == "start": | |
self.s1_tmp_for_moter_min = None | |
self.s1_tmp_for_moter_count = 0 | |
#----------------------------------------- | |
elif self.s1 == "neterutoki": | |
__do_not_anything__ = "" | |
#----------------------------------------- | |
elif self.s1 == "alerting": | |
if self.s1_tmp_for_moter_count < max_for_moter_count: | |
minute = datetime.datetime.now().minute | |
if (minute % 10 == 0) and ((not self.s1_tmp_for_moter_min) or (self.s1_tmp_for_moter_min != minute)): # 10分ごとに約1/24回転させる | |
self.s1_tmp_for_moter_min = minute | |
self.s1_tmp_for_moter_count = self.s1_tmp_for_moter_count + 1 | |
print(">> moving moter") | |
# moter動かす | |
self.bm.on_for_moment("m1") | |
self.bm.on_for_moment("m2") | |
# ごみ収集日の表示 | |
def execute_showing_collection_day(self, event_name, event_val): | |
next_state = None | |
(wday, wnum, day, hour) = get_now_info() | |
self.bm.off("led_for_burnable") | |
self.bm.off("led_for_plastic") | |
self.bm.off("led_for_unburnable") | |
self.bm.off("led_for_recycle") | |
# 朝か、夕方のみ表示させる | |
if hour <= 9 or 15 <= hour: | |
# 燃えるゴミ | |
if wday in [1, 4]: | |
self.bm.on("led_for_burnable") | |
# プラスチックゴミ | |
elif wday in [2]: | |
self.bm.on("led_for_plastic") | |
# 不燃ごみ | |
elif wday == 0 and wnum in [1, 3]: | |
self.bm.on("led_for_unburnable") | |
# 資源ゴミ | |
elif wday == 1 and wnum in [0, 2]: | |
self.bm.on("led_for_recycle") | |
# 光っている時間を若干取るために少しsleep | |
time.sleep(0.001) | |
if __name__ == '__main__': | |
bm = BoundaryManager() | |
sm = StateManager(bm) | |
bm.sm = sm | |
print "__in_loop__" | |
while True: | |
time.sleep(0.01) | |
bm.step() | |
print("__END__") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment