-
-
Save yarogniew/28d0f7450b08e57f3248aaee71975ecd to your computer and use it in GitHub Desktop.
Program w micropython dla ESP-8266 z mikrofalowym czujnikiem ruchu RCWL-0516. Sprawdza ruch i jeśli nastąpił po czasie dłuższym niż MIN_ELAPSED_SINCE_STOP, wysyła info na serwer flask o intruzie.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import machine | |
| import utime | |
| import uasyncio | |
| import urequests | |
| # Adres URL serwera Flask | |
| url = "SERVER_FLASK_ADDRESS" # na przykład http://192.168.1.222:8000/czujnik_ruchu | |
| DEV_NAME = "czujnik_ruchu_studio1" | |
| # Piny wejściowe i wyjściowe | |
| motion_pin = machine.Pin(12, machine.Pin.IN) | |
| led_pin = machine.Pin(14, machine.Pin.OUT) | |
| # Czas ostatniego wykrytego ruchu | |
| last_motion_time = 0 | |
| # Minimalny czas między kolejnymi wykryciami ruchu (w sekundach) | |
| MIN_MOTION_INTERVAL = 10 | |
| # Minimalny czas elapsed_since_stop do wysłania informacji na serwer (w sekundach) | |
| MIN_ELAPSED_SINCE_STOP = 60 | |
| # Flaga wskazująca, czy funkcja f_start() została już wywołana | |
| start_triggered = False | |
| # Funkcja formatująca czas na godz:min:sekund | |
| def format_time(seconds): | |
| hours, remainder = divmod(seconds, 3600) | |
| minutes, seconds = divmod(remainder, 60) | |
| return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}" | |
| # Funkcja uruchamiana przy wykryciu ruchu | |
| def f_start(): | |
| global start_triggered, last_motion_time | |
| print("Ruch wykryty! Wywołanie funkcji f_start()") | |
| # Wypisz czas od ostatniego wywołania funkcji f_stop() | |
| elapsed_since_stop = utime.time() - last_motion_time | |
| formatted_time = format_time(elapsed_since_stop) | |
| print(f"Upłynęło {formatted_time} od ostatniego zakończenia ruchu.") | |
| print(f"To jest {elapsed_since_stop} sekund.") | |
| # ================== SEND DATA TO SERVER ========================= | |
| if elapsed_since_stop > MIN_ELAPSED_SINCE_STOP: | |
| send_intruder_data(formatted_time) # wysyłka do serwera | |
| led_pin.on() | |
| start_triggered = True | |
| last_motion_time = utime.time() | |
| # Funkcja uruchamiana po ustaniu ruchu | |
| def f_stop(): | |
| global start_triggered, last_motion_time | |
| led_pin.off() | |
| print("Ruch zakończony! Wywołanie funkcji f_stop()") | |
| # Tutaj dodaj kod, który ma zostać wykonany po ustaniu ruchu | |
| # np. akcje w momencie zakończenia ruchu | |
| start_triggered = False | |
| last_motion_time = utime.time() | |
| # Funkcja obsługująca czujnik ruchu | |
| async def handle_motion_sensor(): | |
| global last_motion_time | |
| while True: | |
| if motion_pin.value() == 1: | |
| # Wykryto ruch | |
| if not start_triggered: | |
| f_start() # Wywołaj funkcję tylko jeśli nie została jeszcze uruchomiona | |
| # Oczekuj na ustanie ruchu lub upływ MIN_MOTION_INTERVAL | |
| while motion_pin.value() == 1 or (utime.time() - last_motion_time) < MIN_MOTION_INTERVAL: | |
| # elapsed_seconds = utime.time() - last_motion_time | |
| # print(f"Upłynęło {int(elapsed_seconds)} sekund od rozpoczęcia ruchu.") | |
| await uasyncio.sleep(1) | |
| # Wyłącz diodę LED po upływie MIN_MOTION_INTERVAL od ostatniego ruchu | |
| f_stop() | |
| else: | |
| # Brak ruchu | |
| if start_triggered: | |
| elapsed_seconds = utime.time() - last_motion_time | |
| print(f"Upłynęło {int(elapsed_seconds)} sekund od zakończenia ruchu do teraz.") | |
| await uasyncio.sleep(1) | |
| # Oczekuj na następną iterację (wysoka częstotliwość próbkowania czujnika) | |
| await uasyncio.sleep(0.1) | |
| # Funkcja wysyłająca dane o intruzie do serwera | |
| def send_intruder_data(_time_difference): | |
| data = { | |
| 'device_name': DEV_NAME, | |
| 'data': { | |
| 'time_difference': _time_difference | |
| } | |
| } | |
| print(f'\nWysyłka do serwera: {data}') | |
| try: | |
| # headers = {'Content-Type': 'application/json'} | |
| response = urequests.post(url, json=data) | |
| if response.status_code == 200: | |
| data = response.json() | |
| print(f"\nResponse get from server {data}") | |
| else: | |
| print("Failed to send data. Status code:", response.status_code) | |
| print("Server response:", response.text) | |
| except Exception as e: | |
| print("Error:", str(e)) | |
| # Uruchomienie asynchronicznej pętli programu | |
| uasyncio.run(handle_motion_sensor()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment