Skip to content

Instantly share code, notes, and snippets.

@BriFuture
Created March 14, 2019 10:54
Show Gist options
  • Save BriFuture/4b3f747cfb91d480d0f1e14629f86450 to your computer and use it in GitHub Desktop.
Save BriFuture/4b3f747cfb91d480d0f1e14629f86450 to your computer and use it in GitHub Desktop.
模拟MCU通过网络与上位机交互的小程序,使用装饰器动态添加多种数据
import logging
logger = logging.getLogger("compass")
from simulator import McuSimulator
simulator = McuSimulator()
# data protocol: nmea0183
@simulator.route_cmd("PING")
def cmd_none(msg: bytes):
simulator.send( b'#ID?*hh' )
data= {
'magx': 0.0,
'magy': 0.0,
'magz': 0.0,
'accx': 0.0,
'accy': 0.0,
'accz': 0.0,
'angx': 0.0,
'angy': 0.0,
'angz': 0.0,
}
@simulator.route_data("ICM")
def send_data():
count = simulator.data.get("CYCLE_ICM", 0)
while count > 0:
data['magx'] += (random()-0.5) * 1
data['magy'] += (random()-0.5) * 2
data['magz'] += (random()-0.5) * 3
data['accx'] += (random()-0.5) * 0.1
data['accy'] += (random()-0.5) * 0.2
data['accz'] += (random()-0.5) * 0.3
data['angx'] += random() * 0.5
data['angy'] += random() * 0.8
data['angz'] += random() * 1.2
resp = '$data,{:.3f},{:.3f},{:.3f},{:.3f},{:.3f},{:.3f},{:.3f},{:.3f},{:.3f},1*ff'.format(
data['magx'], data['magy'], data['magz'],
data['accx'], data['accy'], data['accz'],
data['angx'], data['angy'], data['angz']
)
# resp = '$ICMDATA,-25396.1,1144.3,41025.3,-0.28,3.449,9.136,0.01,0.035,0.001,16.44*63'
simulator.send(resp.encode('ascii'))
count -= 1
time.sleep(0.05)
from socketserver import BaseRequestHandler, ThreadingTCPServer as Server
from threading import Thread
class CompassHandler( BaseRequestHandler ):
def handle( self ):
logger.info( "{} ' Got connection from: ', {} ".format(datetime.now(), self.client_address ))
global simulator
simulator.con = self.request
def run_simulator():
try:
simulator.serve_forever()
except KeyboardInterrupt:
logger.info("Simulator Interrupting.")
t = Thread(target=run_simulator)
t.daemon = True
t.start()
while simulator.running:
msg = self.request.recv( 1024 )
simulator.recv.put(msg)
PORT = 8811
addr = ('0.0.0.0', PORT)
def main():
global addr
try:
logger.info('Listening on{0}'.format(addr))
server = Server( addr, CompassHandler )
server.serve_forever()
except KeyboardInterrupt:
simulator.running = False
finally:
logger.info('Interupting...')
import sys
sys.exit(0)
if __name__ == '__main__':
main()
# -*- coding: utf-8 -*-
"""Mcu Simulator
Author: BriFuture
"""
from socket import socket
from queue import Queue
import time
import logging
logger = logging.getLogger("compass")
class McuSimulator(object):
"""Before using ``send`` method to send message to client,
make sure ``con`` is set.
"""
def __init__(self, con:socket = None):
"""``con``: socket.socket
"""
self.con = con
self.recv = Queue()
self.data = {}
self._routes = {}
self._datas = {}
self.running = True
def route_cmd(self, header: str, cmd_flag="#", end_flag="*"):
"""add a known cmd to dict,
if header matchs when message recieved,
the corresponding callback will be invoked.
For example:
``#PITCH?*ff`` matches the header ``PITCH`` with flag ``#``
"""
def decorator(f):
h = cmd_flag + header
if h in self._routes:
raise ValueError
self._routes[h] = {
'method': f,
'start_flag': cmd_flag,
'end_flag': end_flag
}
return f
return decorator
def route_data(self, header: str, data_flag="$", end_flag="*"):
"""add a known data response callback to dict,
the corresponding callback will be invoked everytime it could be.
Note: user time.sleep to slow the message sending.
"""
def decorator(f):
d = data_flag + header
self._datas[d] = {
"method": f,
"start_flag": data_flag,
"end_flag": end_flag
}
pass
return decorator
def serve_forever(self):
while self.running:
if not self.recv.empty():
msg = self.recv.get()
if len(msg) == 0:
self.running = False
self._process_cmd(msg)
self._response()
time.sleep(0.05)
def _process_cmd(self, msg: bytes):
logger.info("processing msg: {}".format(msg))
processor = None
for header, route in self._routes.items():
if msg.startswith(header.encode()):
processor = route["method"]
break
if processor is not None:
processor(msg)
def _response(self):
for _, item in self._datas.items():
item["method"]()
time.sleep(0.05)
def is_set_cmd(self, cmd):
get = cmd.find(b"=")
if get > -1:
return True
return False
def send(self, msg):
# logger.debug("Send Msg: {}".format(msg))
self.con.send(msg + b'\r\n')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment