Skip to content

Instantly share code, notes, and snippets.

@zuku
Created April 1, 2023 08:15
Show Gist options
  • Save zuku/042333e0c27f043c436cf4c958dd7b62 to your computer and use it in GitHub Desktop.
Save zuku/042333e0c27f043c436cf4c958dd7b62 to your computer and use it in GitHub Desktop.
M5StickCのMicroPythonでUDPブロードキャストの送受信を行うサンプル
import wifiCfg, _thread, socket
wifiCfg.autoConnect()
class Broadcast:
"""
M5StickC PlusのMicroPython環境でブロードキャストの送信と受信を行うクラス
動作環境
* M5StickC Plus 1.1
* UIFlow_StickC_Plus v1.11.3
* MicroPython 1.12
* sys.implementation.version = (1, 12, 0)
* sys.version = '3.4.0'
MicroPython専用のため通常のPython (CPython)では動作しない
受信側と送信側は同じネットワーク内で動作する必要がある
パケットを取りこぼす場合がある(大量のブロードキャストパケットが送信された場合など)
Examples
--------
# 受信側
r = Broadcast()
# 受信待機開始(受信したらprint)
r.startWaiting(print)
# 送信側
# 受信側とは別端末で実行
s = Broadcast()
s.sendMessage("Hello!")
"""
PORT = 47890
BUFFER_SIZE = 1024
TIMEOUT = 10
def __init__(self):
if not wifiCfg.wlan_sta.isconnected():
raise Exception("Network is not connected")
ip_address, subnetmask, _, _ = wifiCfg.wlan_sta.ifconfig()
self._broadcast_address = self._getBroadcastAddress(ip_address, subnetmask)
self._continue_to_receive = False
self._is_waiting_for_receiving = False
def _aton(self, ip_string):
a1, a2, a3, a4 = ip_string.split(".")
return (int(a1) << 24) | (int(a2) << 16) | (int(a3) << 8) | int(a4)
def _ntoa(self, packed_ip):
return "{:d}.{:d}.{:d}.{:d}".format(
(packed_ip >> 24) & 0xFF,
(packed_ip >> 16) & 0xFF,
(packed_ip >> 8) & 0xFF,
packed_ip & 0xFF
)
def _getBroadcastAddress(self, ip_address, subnetmask):
return self._ntoa(self._aton(ip_address) | (~self._aton(subnetmask) & 0xFFFFFFFF))
def _receivingLoop(self, callback):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.settimeout(self.TIMEOUT)
sock.bind((self._broadcast_address, self.PORT))
try:
self._continue_to_receive = True
self._is_waiting_for_receiving = True
while self._continue_to_receive:
try:
data = sock.recv(self.BUFFER_SIZE)
callback(data)
except OSError as e:
if not "ETIMEDOUT" in str(e):
raise e
finally:
self._is_waiting_for_receiving = False
sock.close()
def startWaiting(self, callback):
"""
受信の待機を開始する
Parameters
----------
callback : function
データ受信時に呼び出されるコールバック関数
関数の第1引数にはbytesで受信したデータが渡される
"""
if self._is_waiting_for_receiving:
raise Exception("receiving loop already started")
_thread.start_new_thread(self._receivingLoop, (callback,))
def stopWaiting(self):
"""
受信の待機を終了する
startWaiting()で開始した受信待機を終了する
受信待機は即座には終了せず最長でTIMEOUT秒数後に終了する
"""
self._continue_to_receive = False
def isWaitingForReceiving(self):
"""
受信待機中かを判定する
Returns
-------
bool
Trueの場合は受信待機中
"""
return self._is_waiting_for_receiving
def sendMessage(self, message):
"""
ブロードキャストメッセージを送信する
Parameters
----------
message : str
送信するメッセージ
Returns
--------
int
送信したメッセージのバイト数
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return sock.sendto(message.encode("utf-8"), (self._broadcast_address, self.PORT))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment