Last active
June 12, 2022 13:10
-
-
Save kemusiro/0172d64feeef4f003736378e5865151d to your computer and use it in GitHub Desktop.
Raspberry Pi PicoからESP32のATコマンドモードを使ってWi-Fi接続する。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uasyncio as asyncio | |
from machine import UART | |
''' | |
Raspberry Pi PicoからESP32-DevKitCのWi-Fi経由でWebサーバに接続するサンプル | |
Raspberry PiとESP32の間はUARTで接続する。 | |
Raspberry Pi Pico側: UART1 (TX: GP4, RX: GP5) | |
ESP32-DevKitC側: UART1 (TX: GPIO17, RX: GPIO16) | |
''' | |
WIFISSID = "xxxxxxxx" | |
WIFIPASSWD = "xxxxxxxx" | |
TARGET_IP = "192.168.0.1" | |
TARGET_PORT = 8000 | |
uart = None | |
command_sent = False | |
command_accepted = False | |
cipsend_sent = False | |
cipsend_accepted = False | |
async def send_command(swriter, command): | |
''' | |
コマンドを送信し、受理されるまで待つ。 | |
''' | |
global command_sent | |
global command_accepted | |
print("--- SEND: " + command) | |
command_sent = True | |
swriter.write(command + '\r\n') | |
await swriter.drain() | |
while not command_accepted: | |
await asyncio.sleep_ms(100) | |
command_accepted = False | |
async def send_cipsend(swriter, message): | |
''' | |
AT CIPSENDコマンドを使ってメッセージを送信する。 | |
''' | |
global cipsend_sent | |
global cipsend_accepted | |
global cipsend_mode | |
cipsend_sent = True | |
await send_command(swriter, 'AT+CIPSEND=0,{}'.format(len(message) + 2)) | |
while not cipsend_accepted: | |
await asyncio.sleep_ms(100) | |
cipsend_accepted = False | |
await send_command(swriter, message) | |
async def sender(): | |
''' | |
UARTによるメッセージ送信コルーチン | |
''' | |
global cipsend_accepted | |
swriter = asyncio.StreamWriter(uart, {}) | |
# ESP32のATコマンドの使用に従いメッセージを送信する。 | |
await send_command(swriter, 'AT+RST') | |
await send_command(swriter, 'AT+CWMODE=1') | |
await send_command(swriter, 'AT+CWJAP="{}","{}"'.format(WIFISSID, WIFIPASSWD)) | |
await send_command(swriter, 'AT+CIPMUX=1') | |
await send_command(swriter, 'AT+CIPSTART=0,"TCP","{}",{}'.format(TARGET_IP, TARGET_PORT)) | |
await send_cipsend(swriter, 'GET / HTTP/1.0\r\n') | |
# await send_command(swriter, 'AT+CIPCLOSE') | |
while True: | |
asyncio.sleep(1) | |
async def receiver(): | |
''' | |
UARTによるメッセージ受信コルーチン | |
ボーレートが高いと受信データを取りこぼすことがある。 | |
''' | |
global command_sent | |
global command_accepted | |
global cipsend_sent | |
global cipsend_accepted | |
sreader = asyncio.StreamReader(uart) | |
line = b'' | |
while True: | |
c = await sreader.read(1) | |
line += c | |
if c == b'\n': | |
print(line) | |
if command_sent and (line == b'OK\r\n' or line == b'ready\r\n'): | |
command_sent = False | |
command_accepted = True | |
line = b'' | |
elif cipsend_sent and c == b'>': | |
print(line) | |
line = b'' | |
cipsend_sent = False | |
cipsend_accepted = True | |
async def main(): | |
global command_sent | |
global uart | |
uart = UART(1, 115200) | |
command_sent = True | |
asyncio.create_task(sender()) | |
asyncio.create_task(receiver()) | |
while True: | |
await asyncio.sleep(1) | |
def test(): | |
try: | |
asyncio.run(main()) | |
except KeyboardInterrupt: | |
print('Interrupted') | |
finally: | |
asyncio.new_event_loop() | |
test() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
CIPSENDモードから抜ける方法がわからない。'+++'を送ってもうまくいかない。検討中。