Last active
November 14, 2021 13:32
-
-
Save y-ookuma/628639cf7a3d6c2b1098b0081b60e641 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from socket import * | |
import time as t | |
import datetime as dt | |
import pandas as pd | |
import xmltodict,json,os | |
from multiprocessing import Process,Manager | |
## UDP受信クラス | |
class udprecv(): | |
def __init__(self): | |
SrcIP = "" # 受信元IP | |
SrcPort = 16520 # 受信元ポート番号 | |
self.SrcAddr = (SrcIP, SrcPort) # アドレスをtupleに格納 | |
self.BUFSIZE = 512 # バッファサイズ指定 | |
self.udpServSock = socket(AF_INET, SOCK_DGRAM) # ソケット作成 | |
self.udpServSock.bind(self.SrcAddr) # 受信元アドレスでバインド | |
manager = Manager() # 共有メモリ | |
self.queue=manager.list() # 空のリストを定義 | |
def recv(self,debug=False,debug_sec=10): | |
if debug: | |
start=t.time() | |
debug_list=[] | |
while True: # 常に受信待ち | |
ccm, addr = self.udpServSock.recvfrom(self.BUFSIZE) # 受信 | |
# print(ccm.decode(), addr) # 受信データと送信アドレス表示 | |
# p.self.save_df(ccm) | |
p = Process(target=self.save_df, args=(debug,ccm,self.queue)) # マルチプロセス化でDB処理などを実行する | |
p.start() | |
if p.join(2) is None: | |
p.terminate() | |
if debug: | |
end=t.time() | |
print(ccm.decode(), addr) # 受信データと送信アドレス表示 | |
debug_list.append(ccm) | |
print("Main process ID:",os.getppid()) | |
if end-start>=debug_sec: | |
print("debug_time:",end-start,"ExecCount:",len(debug_list),"//",len(debug_list)/(end-start),"count/sec") | |
break; | |
# あなたのDB保存処理を記述すること | |
def save_df(self,debug,ccm,queue): | |
#"ここにDB保存処理を書くと良い。以下は適当にサンプルを書いてみた。" | |
dictionary = xmltodict.parse(ccm) # xmlを辞書型へ変換 | |
json_string = json.dumps(dictionary) # json形式のstring | |
json_string = json_string.replace('@', '').replace('#', '') # 「#や@」 をreplace | |
json_object = json.loads(json_string) # Stringを再度json形式で読み込む | |
# ↓↓↓ influxDB用整形 | |
measurement = json_object["UECS"]["DATA"]["type"].split(".")[0] | |
measurement += "_" + json_object["UECS"]["DATA"]["room"] | |
measurement += "_" + json_object["UECS"]["DATA"]["region"] | |
measurement += "_" + json_object["UECS"]["DATA"]["order"] | |
datetime = pd.to_datetime(dt.datetime.now()) | |
val = float(json_object["UECS"]["DATA"]["text"])*1.0 | |
priority = json_object["UECS"]["DATA"]["priority"] | |
json_body = [{"measurement": measurement, | |
"tags": {"priority": priority,"cloud": ""}, | |
"time": datetime, | |
"precision": "s", | |
"fields": {"value": val} | |
}] | |
# ↑↑↑ influxDB用整形 | |
#リスト化することによりdataframe化して、DBのような操作が可能になる。 | |
list_body = [datetime,measurement,val,priority] | |
if len(self.queue)>=1000: | |
self.queue.pop(0) # 1000件以上なら一番左の要素を削除 | |
self.queue.append(list_body) | |
if debug: | |
print("process ID:",os.getppid(),"Queue:",len(self.queue),json_body) | |
## UDP送信クラス | |
class udpsend(): | |
def __init__(self): | |
# get ip address | |
ip = socket(AF_INET, SOCK_DGRAM) | |
ip.connect(("8.8.8.8", 80)) | |
self.ip_add = ip.getsockname()[0] | |
ip.close() | |
SrcIP = self.ip_add # 送信元IP | |
SrcPort = 16520 # 送信元ポート番号 | |
self.SrcAddr = (SrcIP,SrcPort) # アドレスをtupleに格納 | |
DstIP = "255.255.255.255" # 宛先IP | |
DstPort = 16520 # 宛先ポート番号 | |
self.DstAddr = (DstIP,DstPort) # アドレスをtupleに格納 | |
self.udpClntSock = socket(AF_INET, SOCK_DGRAM) # ソケット作成 | |
self.udpClntSock.bind(self.SrcAddr) # 送信元アドレスでバインド | |
self.udpClntSock.setsockopt(SOL_SOCKET, SO_BROADCAST, 1) # | |
def send(self,type="NoName",room=0,region=0,order=0,priority=30,val=None): | |
ccm = "<?xml version=\"1.0\"?><UECS ver=\"1.00-E10\">" | |
ccm += "<DATA type=\"%s\"" % type | |
ccm += " room=\"%s\"" % room | |
ccm += " region=\"%s\"" % region | |
ccm += " order=\"%s\"" % order | |
ccm += " priority=\"%s\">" % priority | |
ccm += "%s" % val | |
ccm += "</DATA><IP>%s</IP>" % self.ip_add | |
ccm += "</UECS>" | |
print(ccm) | |
ccm = ccm.encode('utf-8') # バイナリに変換 | |
self.udpClntSock.sendto(ccm, self.DstAddr) # 宛先アドレスに送信 | |
#受信 | |
udp = udprecv() # クラス呼び出し | |
udp.recv(debug=True) # 関数実行 | |
#送信 | |
#udp = udpsend() # クラス呼び出し | |
#udp.send(type="test",room="9",region="8",order="7",priority="6",val="5") # 関数実行 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment