Skip to content

Instantly share code, notes, and snippets.

@mathfur
Last active November 3, 2017 05:10
Show Gist options
  • Save mathfur/31e5f248650db36825d9f69e1d6159e4 to your computer and use it in GitHub Desktop.
Save mathfur/31e5f248650db36825d9f69e1d6159e4 to your computer and use it in GitHub Desktop.
raspberry_pi_home_system
# -*- coding: utf-8 -*-
try:
import RPi.GPIO as GPIO
import smbus
from Adafruit_LED_Backpack import SevenSegment
bus = smbus.SMBus(1)
seven_segment = SevenSegment.SevenSegment(address=0x70)
seven_segment.begin()
except:
print "failed to import GPIO"
import datetime
import time
import os
import random
import subprocess
bm = None
host_name = 1/0 # TODO: 書き直す
sec_for_moving_moter = 5 #7*3600
max_for_moter_count = 6 # on_for_momentを最大何回実行するか
# デバッグモード時のみprintする
def print_d(e, debug_mode):
if debug_mode:
print(e)
def global_receive_input(gpio_num):
bm.receive_input(gpio_num)
def post_to_server(task_name, start_time, finish_time):
print(">>post_to_server(%s, %s, %s)" % (task_name, start_time, finish_time))
start_str = start_time.strftime("%Y-%m-%dT%H:%M:%S")
finish_str = finish_time.strftime("%Y-%m-%dT%H:%M:%S")
content = ("%s,%s,%s" % (task_name, start_str, finish_str))
cmd = ("curl 'https://%s/base/s_notify/post_vals_by_get?tag=raspberry_pi&content=%s'" % (host_name, content))
print(">>" + cmd)
os.system(cmd)
def show_to_7segment_led(sec):
print(">>show_to_7segment_led(%s)" % sec)
s = int(sec % 60)
m = int(sec / 60)
d0 = int(m / 10)
d1 = int(m % 10)
d2 = int(s / 10)
d3 = int(s % 10)
seven_segment.clear()
seven_segment.set_digit(0, d0)
seven_segment.set_digit(1, d1)
seven_segment.set_digit(2, d2)
seven_segment.set_digit(3, d3)
seven_segment.set_colon(1)
seven_segment.write_display()
time.sleep(0.1)
def get_now_info():
wday = datetime.datetime.now().weekday() # 0: 月曜
day = datetime.datetime.now().day
hour = datetime.datetime.now().hour # 0〜23
# 15:00以降は明日の曜日に直す
if 15 <= hour:
wday = (wday + 1) % 7
day = day + 1
wnum = (day - 1) / 7 # N週目
return (wday, wnum, day, hour)
class BoundaryManager:
name__gpio_num__pairs_for_input = {
# モーター
"start_btn": 14,
"stop_btn": 15,
# タスク
"start_measuring_btn_for_rest": 22,
"start_measuring_btn_for_bath": 23,
"start_measuring_btn_for_sentaku": 24,
"finish_measuring_btn": 25,
"reset_measuring_btn": 8,
}
name__gpio_num__pairs_for_output = {
"m1": 17, # moter1
"m2": 27, # moter2
"led1": 4,
# ごみ
"led_for_burnable": 5,
"led_for_plastic": 6,
"led_for_unburnable": 13,
"led_for_recycle": 19,
}
def __init__(self, debug_mode = False):
self.debug_mode = debug_mode
self.run_at_later = {} # {name => (time, lambda)}
self.event_queue = [] # [(event_name, event_val)]
self.pin_vals = {}
self.bm = None
self.setup_GPIO()
def setup_GPIO(self):
if not self.debug_mode:
GPIO.setmode(GPIO.BCM)
#-------------------
# 入力ピンについて
#-------------------
for name, gpio_num in self.name__gpio_num__pairs_for_input.items():
GPIO.setup(gpio_num, GPIO.IN)
GPIO.add_event_detect(gpio_num, GPIO.RISING, callback=global_receive_input, bouncetime=300)
#-------------------
# 出力ピンについて
#-------------------
for name, gpio_num in self.name__gpio_num__pairs_for_output.items():
GPIO.setup(gpio_num, GPIO.OUT)
def set_timer(self, name, sec):
# 既に登録済みならハンドラ削除
if self.run_at_later.get(name):
self.run_at_later.pop(name)
t = datetime.datetime.now() + datetime.timedelta(seconds=sec)
self.run_at_later[name] = t
def step(self):
self.print_inspect()
#---------------------
# タイムアウト処理
#---------------------
for name, t in self.run_at_later.items():
now = datetime.datetime.now()
if t <= now:
print_d("timeouted", self.debug_mode)
self.event_queue.append(("timeout", name))
self.run_at_later.pop(name) # Remove
else:
print_d("not timeout", self.debug_mode)
#---------------------
# StateManagerもstep
#---------------------
if len(self.event_queue) > 0:
event = self.event_queue.pop(0)
else:
event = (None, None)
self.sm.step(event)
self.print_inspect()
# デバッグ用表示
def print_inspect(self):
if self.debug_mode:
print("===============================================S")
print("event_queue: %s" % self.event_queue)
print("sm.s1: %s" % self.sm.s1)
print("sm.s2: %s" % self.sm.s2)
print("self.pin_vals: %s" % self.pin_vals)
print("len(run_at_later): %s" % len(self.run_at_later))
print("run_at_later: %s" % self.run_at_later)
print("===============================================E")
def on(self, gpio_num):
self.change_pin_val(gpio_num, True)
def off(self, gpio_num):
self.change_pin_val(gpio_num, False)
def change_pin_val(self, gpio_num, val):
#print "change_pin_val: %s, %s" % (gpio_num, val)
self.pin_vals[gpio_num] = val
if not self.debug_mode:
# 対応するピンがあれば出力する
gpio_num = self.name__gpio_num__pairs_for_output.get(gpio_num)
if gpio_num:
GPIO.output(gpio_num, val)
# モーター用, 一瞬だけONにする
def on_for_moment(self, gpio_num):
self.on(gpio_num)
time.sleep(0.2) #=> ギアボックス経由なら1/24回転ぐらい?する。モーターは20回転ぐらいする。
self.off(gpio_num)
#==========================================================================
def receive_input(self, gpio_num):
print("receive input: %s" % gpio_num)
self.event_queue.append(("click", gpio_num))
class StateManager:
def __init__(self, boundary_manager, debug_mode=False):
self.s1 = "start"
self.s1_tmp_for_moter_min = None
self.s1_tmp_for_moter_count = 0
self.s2 = "start"
self.s3 = "start"
self.s3_task_name = None
self.s3_start_time = None
self.debug_mode = debug_mode
self.bm = boundary_manager
def step(self, event = None):
if event:
(event_name, event_val) = event
else:
(event_name, event_val) = (None, None)
if event_name:
print(">>StateManager.step(%s, %s)" % (event_name, event_val))
self.execute_for_heart_beat(event_name, event_val)
self.execute_measuring_time_of_task(event_name, event_val)
self.execute_showing_collection_day(event_name, event_val)
self.execute_for_curtain(event_name, event_val)
def execute_for_heart_beat(self, event_name, event_val):
next_state = None
#==========================================================
if self.s2 == "start":
next_state = "on"
#==========================================================
if self.s2 == "on":
if event_name == "timeout" and (event_val in ["heart_beat_off"]):
next_state = "off"
#==========================================================
elif self.s2 == "off":
if event_name == "timeout" and (event_val in ["heart_beat_on"]):
next_state = "on"
#==========================================================
# 遷移したときの処理
#==========================================================
if next_state != None and self.s2 != next_state:
print_d("s2: change state: %s -> %s" % (self.s2, next_state), self.debug_mode)
#-----------------------------------------
if next_state == "start":
__do_not_anything__ = ""
#-----------------------------------------
elif next_state == "on":
self.bm.on("led1") # LEDをつける
self.bm.set_timer("heart_beat_off", 1)
#-----------------------------------------
elif next_state == "off":
self.bm.off("led1") # LEDを消す
self.bm.set_timer("heart_beat_on", 1)
#-----------------------------------------
self.s2 = next_state
else:
print_d("s2: NOT change state: %s -> %s" % (self.s2, next_state), self.debug_mode)
def execute_measuring_time_of_task(self, event_name, event_val):
next_state = None
#==========================================================
if self.s3 == "start":
if event_name == "click" and (event_val in ["start_measuring_btn_for_rest", bm.name__gpio_num__pairs_for_input["start_measuring_btn_for_rest"]]):
self.s3_start_time = datetime.datetime.now()
self.s3_task_name = "rest"
next_state = "measuring"
elif event_name == "click" and (event_val in ["start_measuring_btn_for_bath", bm.name__gpio_num__pairs_for_input["start_measuring_btn_for_bath"]]):
self.s3_start_time = datetime.datetime.now()
self.s3_task_name = "bath"
next_state = "measuring"
elif event_name == "click" and (event_val in ["start_measuring_btn_for_sentaku", bm.name__gpio_num__pairs_for_input["start_measuring_btn_for_sentaku"]]):
self.s3_start_time = datetime.datetime.now()
self.s3_task_name = "sentaku"
next_state = "measuring"
#==========================================================
if self.s3 == "measuring":
if event_name == "click" and (event_val in ["finish_measuring_btn", bm.name__gpio_num__pairs_for_input["finish_measuring_btn"]]): # 無事タスクが完了した時
post_to_server(self.s3_task_name, self.s3_start_time, datetime.datetime.now())
self.s3_start_time = None
next_state = "start"
elif event_name == "click" and (event_val in ["reset_measuring_btn", bm.name__gpio_num__pairs_for_input["reset_measuring_btn"]]): # タスクを中断する時
self.s3_start_time = None
next_state = "start"
#-----------------------------------
# 7セグメントLEDにdiffを表示させる
#-----------------------------------
if self.s3_start_time:
diff_sec = (datetime.datetime.now() - self.s3_start_time).total_seconds()
else:
diff_sec = 0
show_to_7segment_led(diff_sec)
#==========================================================
# 遷移したときの処理
#==========================================================
if next_state != None and self.s3 != next_state:
print_d("s3: change state: %s -> %s" % (self.s3, next_state), self.debug_mode)
#-----------------------------------------
if next_state == "start":
__do_not_anything__ = ""
#-----------------------------------------
elif next_state == "measuring":
__do_not_anything__ = ""
#-----------------------------------------
self.s3 = next_state
else:
print_d("s3: NOT change state: %s -> %s" % (self.s3, next_state), self.debug_mode)
def execute_for_curtain(self, event_name, event_val):
next_state = None
#==========================================================
if self.s1 == "start":
if event_name == "click" and (event_val in ["start_btn", bm.name__gpio_num__pairs_for_input["start_btn"]]):
# 7時間後にタイムアウト
self.bm.set_timer("7h_to_morning", sec_for_moving_moter) # 3600*7
next_state = "neterutoki"
#==========================================================
elif self.s1 == "neterutoki":
if event_name == "timeout" and (event_val in ["7h_to_morning"]):
next_state = "alerting"
elif event_name == "click" and (event_val in ["start_btn", bm.name__gpio_num__pairs_for_input["start_btn"]]):
# 7時間後にタイムアウト
self.bm.set_timer("7h_to_morning", sec_for_moving_moter) # 3600*7
next_state = "neterutoki"
#==========================================================
elif self.s1 == "alerting":
if event_name == "click" and (event_val in ["stop_btn", bm.name__gpio_num__pairs_for_input["stop_btn"]]):
next_state = "start"
#==========================================================
# 遷移したときの処理
#==========================================================
if next_state != None and self.s1 != next_state:
#-----------------------------------------
if next_state == "start":
# moter
self.bm.off("m1")
self.bm.off("m2")
#-----------------------------------------
elif next_state == "neterutoki":
__tmp__ = 0
#-----------------------------------------
elif next_state == "alerting":
__do_not_anything__ = ""
print("s1: change state: %s -> %s" % (self.s1, next_state))
self.s1 = next_state
else:
print_d("s1: NOT change state: %s -> %s" % (self.s1, next_state), self.debug_mode)
#==========================================================
# 現在の状態に応じた処理
#==========================================================
#-----------------------------------------
if self.s1 == "start":
self.s1_tmp_for_moter_min = None
self.s1_tmp_for_moter_count = 0
#-----------------------------------------
elif self.s1 == "neterutoki":
__do_not_anything__ = ""
#-----------------------------------------
elif self.s1 == "alerting":
if self.s1_tmp_for_moter_count < max_for_moter_count:
minute = datetime.datetime.now().minute
if (minute % 10 == 0) and ((not self.s1_tmp_for_moter_min) or (self.s1_tmp_for_moter_min != minute)): # 10分ごとに約1/24回転させる
self.s1_tmp_for_moter_min = minute
self.s1_tmp_for_moter_count = self.s1_tmp_for_moter_count + 1
print(">> moving moter")
# moter動かす
self.bm.on_for_moment("m1")
self.bm.on_for_moment("m2")
# ごみ収集日の表示
def execute_showing_collection_day(self, event_name, event_val):
next_state = None
(wday, wnum, day, hour) = get_now_info()
self.bm.off("led_for_burnable")
self.bm.off("led_for_plastic")
self.bm.off("led_for_unburnable")
self.bm.off("led_for_recycle")
# 朝か、夕方のみ表示させる
if hour <= 9 or 15 <= hour:
# 燃えるゴミ
if wday in [1, 4]:
self.bm.on("led_for_burnable")
# プラスチックゴミ
elif wday in [2]:
self.bm.on("led_for_plastic")
# 不燃ごみ
elif wday == 0 and wnum in [1, 3]:
self.bm.on("led_for_unburnable")
# 資源ゴミ
elif wday == 1 and wnum in [0, 2]:
self.bm.on("led_for_recycle")
# 光っている時間を若干取るために少しsleep
time.sleep(0.001)
if __name__ == '__main__':
bm = BoundaryManager()
sm = StateManager(bm)
bm.sm = sm
print "__in_loop__"
while True:
time.sleep(0.01)
bm.step()
print("__END__")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment