Skip to content

Instantly share code, notes, and snippets.

@kemusiro
Last active June 12, 2022 13:10
Show Gist options
  • Save kemusiro/0172d64feeef4f003736378e5865151d to your computer and use it in GitHub Desktop.
Save kemusiro/0172d64feeef4f003736378e5865151d to your computer and use it in GitHub Desktop.
Raspberry Pi PicoからESP32のATコマンドモードを使ってWi-Fi接続する。
import uasyncio as asyncio
from machine import UART
'''
Raspberry Pi PicoからESP32-DevKitCのWi-Fi経由でWebサーバに接続するサンプル
Raspberry PiとESP32の間はUARTで接続する。
Raspberry Pi Pico側: UART1 (TX: GP4, RX: GP5)
ESP32-DevKitC側: UART1 (TX: GPIO17, RX: GPIO16)
'''
WIFISSID = "xxxxxxxx"
WIFIPASSWD = "xxxxxxxx"
TARGET_IP = "192.168.0.1"
TARGET_PORT = 8000
uart = None
command_sent = False
command_accepted = False
cipsend_sent = False
cipsend_accepted = False
async def send_command(swriter, command):
'''
コマンドを送信し、受理されるまで待つ。
'''
global command_sent
global command_accepted
print("--- SEND: " + command)
command_sent = True
swriter.write(command + '\r\n')
await swriter.drain()
while not command_accepted:
await asyncio.sleep_ms(100)
command_accepted = False
async def send_cipsend(swriter, message):
'''
AT CIPSENDコマンドを使ってメッセージを送信する。
'''
global cipsend_sent
global cipsend_accepted
global cipsend_mode
cipsend_sent = True
await send_command(swriter, 'AT+CIPSEND=0,{}'.format(len(message) + 2))
while not cipsend_accepted:
await asyncio.sleep_ms(100)
cipsend_accepted = False
await send_command(swriter, message)
async def sender():
'''
UARTによるメッセージ送信コルーチン
'''
global cipsend_accepted
swriter = asyncio.StreamWriter(uart, {})
# ESP32のATコマンドの使用に従いメッセージを送信する。
await send_command(swriter, 'AT+RST')
await send_command(swriter, 'AT+CWMODE=1')
await send_command(swriter, 'AT+CWJAP="{}","{}"'.format(WIFISSID, WIFIPASSWD))
await send_command(swriter, 'AT+CIPMUX=1')
await send_command(swriter, 'AT+CIPSTART=0,"TCP","{}",{}'.format(TARGET_IP, TARGET_PORT))
await send_cipsend(swriter, 'GET / HTTP/1.0\r\n')
# await send_command(swriter, 'AT+CIPCLOSE')
while True:
asyncio.sleep(1)
async def receiver():
'''
UARTによるメッセージ受信コルーチン
ボーレートが高いと受信データを取りこぼすことがある。
'''
global command_sent
global command_accepted
global cipsend_sent
global cipsend_accepted
sreader = asyncio.StreamReader(uart)
line = b''
while True:
c = await sreader.read(1)
line += c
if c == b'\n':
print(line)
if command_sent and (line == b'OK\r\n' or line == b'ready\r\n'):
command_sent = False
command_accepted = True
line = b''
elif cipsend_sent and c == b'>':
print(line)
line = b''
cipsend_sent = False
cipsend_accepted = True
async def main():
global command_sent
global uart
uart = UART(1, 115200)
command_sent = True
asyncio.create_task(sender())
asyncio.create_task(receiver())
while True:
await asyncio.sleep(1)
def test():
try:
asyncio.run(main())
except KeyboardInterrupt:
print('Interrupted')
finally:
asyncio.new_event_loop()
test()
@kemusiro
Copy link
Author

CIPSENDモードから抜ける方法がわからない。'+++'を送ってもうまくいかない。検討中。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment