Created
March 14, 2019 10:54
-
-
Save BriFuture/4b3f747cfb91d480d0f1e14629f86450 to your computer and use it in GitHub Desktop.
模拟MCU通过网络与上位机交互的小程序,使用装饰器动态添加多种数据
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
logger = logging.getLogger("compass") | |
from simulator import McuSimulator | |
simulator = McuSimulator() | |
# data protocol: nmea0183 | |
@simulator.route_cmd("PING") | |
def cmd_none(msg: bytes): | |
simulator.send( b'#ID?*hh' ) | |
data= { | |
'magx': 0.0, | |
'magy': 0.0, | |
'magz': 0.0, | |
'accx': 0.0, | |
'accy': 0.0, | |
'accz': 0.0, | |
'angx': 0.0, | |
'angy': 0.0, | |
'angz': 0.0, | |
} | |
@simulator.route_data("ICM") | |
def send_data(): | |
count = simulator.data.get("CYCLE_ICM", 0) | |
while count > 0: | |
data['magx'] += (random()-0.5) * 1 | |
data['magy'] += (random()-0.5) * 2 | |
data['magz'] += (random()-0.5) * 3 | |
data['accx'] += (random()-0.5) * 0.1 | |
data['accy'] += (random()-0.5) * 0.2 | |
data['accz'] += (random()-0.5) * 0.3 | |
data['angx'] += random() * 0.5 | |
data['angy'] += random() * 0.8 | |
data['angz'] += random() * 1.2 | |
resp = '$data,{:.3f},{:.3f},{:.3f},{:.3f},{:.3f},{:.3f},{:.3f},{:.3f},{:.3f},1*ff'.format( | |
data['magx'], data['magy'], data['magz'], | |
data['accx'], data['accy'], data['accz'], | |
data['angx'], data['angy'], data['angz'] | |
) | |
# resp = '$ICMDATA,-25396.1,1144.3,41025.3,-0.28,3.449,9.136,0.01,0.035,0.001,16.44*63' | |
simulator.send(resp.encode('ascii')) | |
count -= 1 | |
time.sleep(0.05) | |
from socketserver import BaseRequestHandler, ThreadingTCPServer as Server | |
from threading import Thread | |
class CompassHandler( BaseRequestHandler ): | |
def handle( self ): | |
logger.info( "{} ' Got connection from: ', {} ".format(datetime.now(), self.client_address )) | |
global simulator | |
simulator.con = self.request | |
def run_simulator(): | |
try: | |
simulator.serve_forever() | |
except KeyboardInterrupt: | |
logger.info("Simulator Interrupting.") | |
t = Thread(target=run_simulator) | |
t.daemon = True | |
t.start() | |
while simulator.running: | |
msg = self.request.recv( 1024 ) | |
simulator.recv.put(msg) | |
PORT = 8811 | |
addr = ('0.0.0.0', PORT) | |
def main(): | |
global addr | |
try: | |
logger.info('Listening on{0}'.format(addr)) | |
server = Server( addr, CompassHandler ) | |
server.serve_forever() | |
except KeyboardInterrupt: | |
simulator.running = False | |
finally: | |
logger.info('Interupting...') | |
import sys | |
sys.exit(0) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
"""Mcu Simulator | |
Author: BriFuture | |
""" | |
from socket import socket | |
from queue import Queue | |
import time | |
import logging | |
logger = logging.getLogger("compass") | |
class McuSimulator(object): | |
"""Before using ``send`` method to send message to client, | |
make sure ``con`` is set. | |
""" | |
def __init__(self, con:socket = None): | |
"""``con``: socket.socket | |
""" | |
self.con = con | |
self.recv = Queue() | |
self.data = {} | |
self._routes = {} | |
self._datas = {} | |
self.running = True | |
def route_cmd(self, header: str, cmd_flag="#", end_flag="*"): | |
"""add a known cmd to dict, | |
if header matchs when message recieved, | |
the corresponding callback will be invoked. | |
For example: | |
``#PITCH?*ff`` matches the header ``PITCH`` with flag ``#`` | |
""" | |
def decorator(f): | |
h = cmd_flag + header | |
if h in self._routes: | |
raise ValueError | |
self._routes[h] = { | |
'method': f, | |
'start_flag': cmd_flag, | |
'end_flag': end_flag | |
} | |
return f | |
return decorator | |
def route_data(self, header: str, data_flag="$", end_flag="*"): | |
"""add a known data response callback to dict, | |
the corresponding callback will be invoked everytime it could be. | |
Note: user time.sleep to slow the message sending. | |
""" | |
def decorator(f): | |
d = data_flag + header | |
self._datas[d] = { | |
"method": f, | |
"start_flag": data_flag, | |
"end_flag": end_flag | |
} | |
pass | |
return decorator | |
def serve_forever(self): | |
while self.running: | |
if not self.recv.empty(): | |
msg = self.recv.get() | |
if len(msg) == 0: | |
self.running = False | |
self._process_cmd(msg) | |
self._response() | |
time.sleep(0.05) | |
def _process_cmd(self, msg: bytes): | |
logger.info("processing msg: {}".format(msg)) | |
processor = None | |
for header, route in self._routes.items(): | |
if msg.startswith(header.encode()): | |
processor = route["method"] | |
break | |
if processor is not None: | |
processor(msg) | |
def _response(self): | |
for _, item in self._datas.items(): | |
item["method"]() | |
time.sleep(0.05) | |
def is_set_cmd(self, cmd): | |
get = cmd.find(b"=") | |
if get > -1: | |
return True | |
return False | |
def send(self, msg): | |
# logger.debug("Send Msg: {}".format(msg)) | |
self.con.send(msg + b'\r\n') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment