Skip to content

Instantly share code, notes, and snippets.

@y-ookuma
Last active November 14, 2021 13:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save y-ookuma/628639cf7a3d6c2b1098b0081b60e641 to your computer and use it in GitHub Desktop.
Save y-ookuma/628639cf7a3d6c2b1098b0081b60e641 to your computer and use it in GitHub Desktop.
from socket import *
import time as t
import datetime as dt
import pandas as pd
import xmltodict,json,os
from multiprocessing import Process,Manager
## UDP受信クラス
class udprecv():
def __init__(self):
SrcIP = "" # 受信元IP
SrcPort = 16520 # 受信元ポート番号
self.SrcAddr = (SrcIP, SrcPort) # アドレスをtupleに格納
self.BUFSIZE = 512 # バッファサイズ指定
self.udpServSock = socket(AF_INET, SOCK_DGRAM) # ソケット作成
self.udpServSock.bind(self.SrcAddr) # 受信元アドレスでバインド
manager = Manager() # 共有メモリ
self.queue=manager.list() # 空のリストを定義
def recv(self,debug=False,debug_sec=10):
if debug:
start=t.time()
debug_list=[]
while True: # 常に受信待ち
ccm, addr = self.udpServSock.recvfrom(self.BUFSIZE) # 受信
# print(ccm.decode(), addr) # 受信データと送信アドレス表示
# p.self.save_df(ccm)
p = Process(target=self.save_df, args=(debug,ccm,self.queue)) # マルチプロセス化でDB処理などを実行する
p.start()
if p.join(2) is None:
p.terminate()
if debug:
end=t.time()
print(ccm.decode(), addr) # 受信データと送信アドレス表示
debug_list.append(ccm)
print("Main process ID:",os.getppid())
if end-start>=debug_sec:
print("debug_time:",end-start,"ExecCount:",len(debug_list),"//",len(debug_list)/(end-start),"count/sec")
break;
# あなたのDB保存処理を記述すること
def save_df(self,debug,ccm,queue):
#"ここにDB保存処理を書くと良い。以下は適当にサンプルを書いてみた。"
dictionary = xmltodict.parse(ccm) # xmlを辞書型へ変換
json_string = json.dumps(dictionary) # json形式のstring
json_string = json_string.replace('@', '').replace('#', '') # 「#や@」 をreplace
json_object = json.loads(json_string) # Stringを再度json形式で読み込む
# ↓↓↓ influxDB用整形
measurement = json_object["UECS"]["DATA"]["type"].split(".")[0]
measurement += "_" + json_object["UECS"]["DATA"]["room"]
measurement += "_" + json_object["UECS"]["DATA"]["region"]
measurement += "_" + json_object["UECS"]["DATA"]["order"]
datetime = pd.to_datetime(dt.datetime.now())
val = float(json_object["UECS"]["DATA"]["text"])*1.0
priority = json_object["UECS"]["DATA"]["priority"]
json_body = [{"measurement": measurement,
"tags": {"priority": priority,"cloud": ""},
"time": datetime,
"precision": "s",
"fields": {"value": val}
}]
# ↑↑↑ influxDB用整形
#リスト化することによりdataframe化して、DBのような操作が可能になる。
list_body = [datetime,measurement,val,priority]
if len(self.queue)>=1000:
self.queue.pop(0) # 1000件以上なら一番左の要素を削除
self.queue.append(list_body)
if debug:
print("process ID:",os.getppid(),"Queue:",len(self.queue),json_body)
## UDP送信クラス
class udpsend():
def __init__(self):
# get ip address
ip = socket(AF_INET, SOCK_DGRAM)
ip.connect(("8.8.8.8", 80))
self.ip_add = ip.getsockname()[0]
ip.close()
SrcIP = self.ip_add # 送信元IP
SrcPort = 16520 # 送信元ポート番号
self.SrcAddr = (SrcIP,SrcPort) # アドレスをtupleに格納
DstIP = "255.255.255.255" # 宛先IP
DstPort = 16520 # 宛先ポート番号
self.DstAddr = (DstIP,DstPort) # アドレスをtupleに格納
self.udpClntSock = socket(AF_INET, SOCK_DGRAM) # ソケット作成
self.udpClntSock.bind(self.SrcAddr) # 送信元アドレスでバインド
self.udpClntSock.setsockopt(SOL_SOCKET, SO_BROADCAST, 1) #
def send(self,type="NoName",room=0,region=0,order=0,priority=30,val=None):
ccm = "<?xml version=\"1.0\"?><UECS ver=\"1.00-E10\">"
ccm += "<DATA type=\"%s\"" % type
ccm += " room=\"%s\"" % room
ccm += " region=\"%s\"" % region
ccm += " order=\"%s\"" % order
ccm += " priority=\"%s\">" % priority
ccm += "%s" % val
ccm += "</DATA><IP>%s</IP>" % self.ip_add
ccm += "</UECS>"
print(ccm)
ccm = ccm.encode('utf-8') # バイナリに変換
self.udpClntSock.sendto(ccm, self.DstAddr) # 宛先アドレスに送信
#受信
udp = udprecv() # クラス呼び出し
udp.recv(debug=True) # 関数実行
#送信
#udp = udpsend() # クラス呼び出し
#udp.send(type="test",room="9",region="8",order="7",priority="6",val="5") # 関数実行
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment