Skip to content

Instantly share code, notes, and snippets.

@yarogniew
Last active July 24, 2024 14:44
Show Gist options
  • Select an option

  • Save yarogniew/28d0f7450b08e57f3248aaee71975ecd to your computer and use it in GitHub Desktop.

Select an option

Save yarogniew/28d0f7450b08e57f3248aaee71975ecd to your computer and use it in GitHub Desktop.
Program w micropython dla ESP-8266 z mikrofalowym czujnikiem ruchu RCWL-0516. Sprawdza ruch i jeśli nastąpił po czasie dłuższym niż MIN_ELAPSED_SINCE_STOP, wysyła info na serwer flask o intruzie.
import machine
import utime
import uasyncio
import urequests
# Adres URL serwera Flask
url = "SERVER_FLASK_ADDRESS" # na przykład http://192.168.1.222:8000/czujnik_ruchu
DEV_NAME = "czujnik_ruchu_studio1"
# Piny wejściowe i wyjściowe
motion_pin = machine.Pin(12, machine.Pin.IN)
led_pin = machine.Pin(14, machine.Pin.OUT)
# Czas ostatniego wykrytego ruchu
last_motion_time = 0
# Minimalny czas między kolejnymi wykryciami ruchu (w sekundach)
MIN_MOTION_INTERVAL = 10
# Minimalny czas elapsed_since_stop do wysłania informacji na serwer (w sekundach)
MIN_ELAPSED_SINCE_STOP = 60
# Flaga wskazująca, czy funkcja f_start() została już wywołana
start_triggered = False
# Funkcja formatująca czas na godz:min:sekund
def format_time(seconds):
hours, remainder = divmod(seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}"
# Funkcja uruchamiana przy wykryciu ruchu
def f_start():
global start_triggered, last_motion_time
print("Ruch wykryty! Wywołanie funkcji f_start()")
# Wypisz czas od ostatniego wywołania funkcji f_stop()
elapsed_since_stop = utime.time() - last_motion_time
formatted_time = format_time(elapsed_since_stop)
print(f"Upłynęło {formatted_time} od ostatniego zakończenia ruchu.")
print(f"To jest {elapsed_since_stop} sekund.")
# ================== SEND DATA TO SERVER =========================
if elapsed_since_stop > MIN_ELAPSED_SINCE_STOP:
send_intruder_data(formatted_time) # wysyłka do serwera
led_pin.on()
start_triggered = True
last_motion_time = utime.time()
# Funkcja uruchamiana po ustaniu ruchu
def f_stop():
global start_triggered, last_motion_time
led_pin.off()
print("Ruch zakończony! Wywołanie funkcji f_stop()")
# Tutaj dodaj kod, który ma zostać wykonany po ustaniu ruchu
# np. akcje w momencie zakończenia ruchu
start_triggered = False
last_motion_time = utime.time()
# Funkcja obsługująca czujnik ruchu
async def handle_motion_sensor():
global last_motion_time
while True:
if motion_pin.value() == 1:
# Wykryto ruch
if not start_triggered:
f_start() # Wywołaj funkcję tylko jeśli nie została jeszcze uruchomiona
# Oczekuj na ustanie ruchu lub upływ MIN_MOTION_INTERVAL
while motion_pin.value() == 1 or (utime.time() - last_motion_time) < MIN_MOTION_INTERVAL:
# elapsed_seconds = utime.time() - last_motion_time
# print(f"Upłynęło {int(elapsed_seconds)} sekund od rozpoczęcia ruchu.")
await uasyncio.sleep(1)
# Wyłącz diodę LED po upływie MIN_MOTION_INTERVAL od ostatniego ruchu
f_stop()
else:
# Brak ruchu
if start_triggered:
elapsed_seconds = utime.time() - last_motion_time
print(f"Upłynęło {int(elapsed_seconds)} sekund od zakończenia ruchu do teraz.")
await uasyncio.sleep(1)
# Oczekuj na następną iterację (wysoka częstotliwość próbkowania czujnika)
await uasyncio.sleep(0.1)
# Funkcja wysyłająca dane o intruzie do serwera
def send_intruder_data(_time_difference):
data = {
'device_name': DEV_NAME,
'data': {
'time_difference': _time_difference
}
}
print(f'\nWysyłka do serwera: {data}')
try:
# headers = {'Content-Type': 'application/json'}
response = urequests.post(url, json=data)
if response.status_code == 200:
data = response.json()
print(f"\nResponse get from server {data}")
else:
print("Failed to send data. Status code:", response.status_code)
print("Server response:", response.text)
except Exception as e:
print("Error:", str(e))
# Uruchomienie asynchronicznej pętli programu
uasyncio.run(handle_motion_sensor())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment