Created
April 1, 2023 08:15
-
-
Save zuku/042333e0c27f043c436cf4c958dd7b62 to your computer and use it in GitHub Desktop.
M5StickCのMicroPythonでUDPブロードキャストの送受信を行うサンプル
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import wifiCfg, _thread, socket | |
wifiCfg.autoConnect() | |
class Broadcast: | |
""" | |
M5StickC PlusのMicroPython環境でブロードキャストの送信と受信を行うクラス | |
動作環境 | |
* M5StickC Plus 1.1 | |
* UIFlow_StickC_Plus v1.11.3 | |
* MicroPython 1.12 | |
* sys.implementation.version = (1, 12, 0) | |
* sys.version = '3.4.0' | |
MicroPython専用のため通常のPython (CPython)では動作しない | |
受信側と送信側は同じネットワーク内で動作する必要がある | |
パケットを取りこぼす場合がある(大量のブロードキャストパケットが送信された場合など) | |
Examples | |
-------- | |
# 受信側 | |
r = Broadcast() | |
# 受信待機開始(受信したらprint) | |
r.startWaiting(print) | |
# 送信側 | |
# 受信側とは別端末で実行 | |
s = Broadcast() | |
s.sendMessage("Hello!") | |
""" | |
PORT = 47890 | |
BUFFER_SIZE = 1024 | |
TIMEOUT = 10 | |
def __init__(self): | |
if not wifiCfg.wlan_sta.isconnected(): | |
raise Exception("Network is not connected") | |
ip_address, subnetmask, _, _ = wifiCfg.wlan_sta.ifconfig() | |
self._broadcast_address = self._getBroadcastAddress(ip_address, subnetmask) | |
self._continue_to_receive = False | |
self._is_waiting_for_receiving = False | |
def _aton(self, ip_string): | |
a1, a2, a3, a4 = ip_string.split(".") | |
return (int(a1) << 24) | (int(a2) << 16) | (int(a3) << 8) | int(a4) | |
def _ntoa(self, packed_ip): | |
return "{:d}.{:d}.{:d}.{:d}".format( | |
(packed_ip >> 24) & 0xFF, | |
(packed_ip >> 16) & 0xFF, | |
(packed_ip >> 8) & 0xFF, | |
packed_ip & 0xFF | |
) | |
def _getBroadcastAddress(self, ip_address, subnetmask): | |
return self._ntoa(self._aton(ip_address) | (~self._aton(subnetmask) & 0xFFFFFFFF)) | |
def _receivingLoop(self, callback): | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
sock.settimeout(self.TIMEOUT) | |
sock.bind((self._broadcast_address, self.PORT)) | |
try: | |
self._continue_to_receive = True | |
self._is_waiting_for_receiving = True | |
while self._continue_to_receive: | |
try: | |
data = sock.recv(self.BUFFER_SIZE) | |
callback(data) | |
except OSError as e: | |
if not "ETIMEDOUT" in str(e): | |
raise e | |
finally: | |
self._is_waiting_for_receiving = False | |
sock.close() | |
def startWaiting(self, callback): | |
""" | |
受信の待機を開始する | |
Parameters | |
---------- | |
callback : function | |
データ受信時に呼び出されるコールバック関数 | |
関数の第1引数にはbytesで受信したデータが渡される | |
""" | |
if self._is_waiting_for_receiving: | |
raise Exception("receiving loop already started") | |
_thread.start_new_thread(self._receivingLoop, (callback,)) | |
def stopWaiting(self): | |
""" | |
受信の待機を終了する | |
startWaiting()で開始した受信待機を終了する | |
受信待機は即座には終了せず最長でTIMEOUT秒数後に終了する | |
""" | |
self._continue_to_receive = False | |
def isWaitingForReceiving(self): | |
""" | |
受信待機中かを判定する | |
Returns | |
------- | |
bool | |
Trueの場合は受信待機中 | |
""" | |
return self._is_waiting_for_receiving | |
def sendMessage(self, message): | |
""" | |
ブロードキャストメッセージを送信する | |
Parameters | |
---------- | |
message : str | |
送信するメッセージ | |
Returns | |
-------- | |
int | |
送信したメッセージのバイト数 | |
""" | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
return sock.sendto(message.encode("utf-8"), (self._broadcast_address, self.PORT)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment