Skip to content

Instantly share code, notes, and snippets.

@tuxite
Created March 25, 2015 16:58
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tuxite/c76811fd8b550e222740 to your computer and use it in GitHub Desktop.
Save tuxite/c76811fd8b550e222740 to your computer and use it in GitHub Desktop.
A simple Wind (speed, direction) NMEA serial simulator with Flask and SocketIO controls
<!DOCTYPE HTML>
<html>
<head>
<title>NMEA Serial Simulator</title>
<!-- Latest compiled and minified CSS -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.2/css/bootstrap.min.css">
<script type="text/javascript" src="https://code.jquery.com/jquery-1.11.2.min.js"></script>
<script type="text/javascript" src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/0.9.16/socket.io.min.js"></script>
<!-- Latest compiled and minified JavaScript -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.2/js/bootstrap.min.js"></script>
<script type="text/javascript" charset="utf-8">
$(document).ready(function () {
var socket = io.connect('http://' + document.domain + ':' + location.port + "/nmea");
socket.on('connect', function () {
socket.emit('client-connect', JSON.stringify("Hello"));
console.log("Connected");
});
socket.on('nmea', function (message) {
console.log("Received nmea", JSON.parse(message));
data = JSON.parse(message);
$("#nmea-sentence").html(data.date + " " + data.string);
});
});
</script>
</head>
<body>
<div id="content" class="container">
<div class="page-header">
<h1>NMEA Serial Simulator <small>using Flask and pySerial</small></h1>
</div>
<h2>Wind Data</h2>
<div id="nmea">
<form class="form-inline" method="POST" action='/nmea'>
<div class="form-group">
<label for="direction">Direction</label>
<input type="number" value="" id="direction" name="direction" class="form-control" step="0.1" min="0" max="359.9">
</div>
<div class="form-group">
<label for="speed">Speed</label>
<input type="number" value="" id="speed" name="speed" class="form-control" step="0.1" min="0">
</div>
<button id="client-connect-btn" type="submit" class="btn btn-default">
<span class="glyphicon glyphicon-upload" aria-hidden="true"></span> Update
</button>
</form>
</div>
<div id="data">
<h3>Sent via the serial interface</h3>
<pre id="nmea-sentence"></pre>
</div>
<h2>Serial Data</h2>
<div id="serial">
<form class="form-inline" method="POST" action='/serial'>
<div class="form-group">
<label for="baudrate">Baud Rate</label>
<select id="baudrate" name="baud" class="form-control">
<option>300</option>
<option>600</option>
<option>1200</option>
<option>2400</option>
<option>4800</option>
<option selected>9600</option>
<option>14400</option>
<option>19200</option>
<option>28800</option>
<option>31250</option>
<option>38400</option>
<option>57600</option>
<option>115200</option>
</select>
</div>
<div class="form-group">
<label for="bits">Bits</label>
<select id="bits" name="bits" class="form-control">
<option>5</option>
<option>6</option>
<option>7</option>
<option selected>8</option>
</select>
</div>
<div class="form-group">
<label for="parity">Parity</label>
<select id="parity" name="parity" class="form-control">
<option value="N" selected>NONE</option>
<option value="E">EVEN</option>
<option value="O">ODD</option>
<option value="M">MARK</option>
<option value="S">SPACE</option>
</select>
</div>
<div class="form-group">
<label for="stop_bit">Stop</label>
<select id="stop_bit" name="stop" class="form-control">
<option selected>1</option>
<option>1.5</option>
<option>2</option>
</select>
</div>
<button id="client-connect-btn" type="submit" class="btn btn-default">
<span class="glyphicon glyphicon-log-in" aria-hidden="true"></span> Connect
</button>
</form>
</div>
</div>
</body>
</html>
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""A simple serial NMEA simulator using Flask and SocketIO."""
import serial
import json
import time
import datetime
import sys
import operator
import threading
import signal
from flask import Flask, render_template, request, make_response
from flask.ext.socketio import SocketIO
app = Flask(__name__)
app.config["SECRET_KEY"] = "your-secret"
app.config["SERVER_NAME"] = "127.0.0.1:5002"
socketio = SocketIO(app)
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Set the port from the command line
PORT = "/dev/ttyS0"
STOPBITS = {
"1": 1,
"1.5": 1.5,
"2": 2
}
PARITY = ['N', 'E', 'O', 'M', 'S']
SPEED = 0
DIRECTION = 0
thread = None
class SerialThread(threading.Thread):
"""Thread class for emitting nmea sentences through the serial interface.
"""
def __init__(self, baud=9600, bits="8", parity="N", stop_bit="1"):
"""Initialization of the serial parameters.
Keyword arguments:
baud -- the speed of the connection. Acceptable values are :
300, 600, 1200, 2400, 4800, 9600, 14400, 19200, 28800, 31250,
38400, 57600, 115200
bits -- the number of bits (integer: 5, 6, 7, 8)
parity -- the parity of the connection. One character: N, E, O, M, S
stop_bit -- the stop bit: 1, 1.5 or 2
"""
threading.Thread.__init__(self)
self.ser = None
self.baud = baud
self.bits = bits
self.parity = parity
self.stop_bit = stop_bit
self._stop = threading.Event()
def stop(self):
"""Set the stop status of the thread."""
self._stop.set()
def stopped(self):
"""Return the status of the thread.
Checked regularly in the loop to stop if needed.
"""
return self._stop.isSet()
def run(self):
"""Loop of the thread.
Connect to the serial port using the parameters. Then, create an
infinite loop with regular check of the status of the thread.
Create the NMEA sentence using the global variables SPEED and
DIRECTION. Send it to the serial interface and emit a socket.io
message with the datetime of the sentence for the clients.
"""
global socketio
self.ser = serial.Serial(port=PORT,
baudrate=self.baud,
parity=self.parity,
stopbits=self.stop_bit,
timeout=10)
while True:
if self.stopped():
break
nmea = "$WIWVM,{0},R,{1},N,A".format(DIRECTION, SPEED)
crc = reduce(operator.xor, [ord(x) for x in nmea[1:]], 0)
nmea += "*{0:02X}\r\n".format(crc)
self.ser.write(nmea)
date = datetime.datetime.utcnow().replace(microsecond=0)
socketio.emit("nmea",
json.dumps({"date": date.isoformat(),
"string": nmea}),
namespace="/nmea")
time.sleep(1)
self.ser.close()
return True
@app.route('/')
def index():
"""Return the main page."""
return render_template('index.html')
@app.route('/nmea', methods=['POST',])
def update():
"""Update the speed and the direction of the wind.
Return nothing (204) if succeed.
"""
global SPEED, DIRECTION
try:
SPEED = float(request.form['speed'])
DIRECTION = float(request.form['direction'])
except ValueError:
logger.warning("NMEA Bad values")
return make_response(("Bad Values", 500))
logger.info("Wind updated: {0}kn {1}deg".format(SPEED, DIRECTION))
return make_response(("", 204))
@app.route('/serial', methods=['POST',])
def connect():
"""Update the parameters of the serial connection.
Close the existing one if any (terminate the associated thread). Then,
create a new thread with these new parameters.
Only one serial thread at a time, normally...
Return nothing (204) if succeed.
"""
global thread
try:
baud = int(request.form['baud'])
bits = int(request.form['bits'])
except ValueError:
logger.warning("Serial connection vad values")
return make_response(("Bad Values", 500))
parity = request.form['parity']
if parity not in PARITY:
logger.warning("Serial connection vad values")
return make_response(("Bad Values", 500))
stop = request.form['stop']
if stop not in STOPBITS:
logger.warning("Serial connection vad values")
return make_response(("Bad Values", 500))
stop = STOPBITS[stop]
logger.info("Serial parameters: {0},{1},{2},{3}".format(baud,
bits,
parity,
stop))
if thread:
thread.stop()
thread = SerialThread(baud, bits, parity, stop)
logger.info("Started the thread")
thread.start()
return make_response(("", 204))
def kill_thread(*args):
"""Properly exit the script."""
global thread
if thread:
logger.info("Stopping the thread")
thread.stop()
exit()
@socketio.on('client-connect', namespace='/nmea')
def namespace_test(message):
"""Log the connection of a client."""
logger.info("SocketIO OK: {0}".format(message))
if __name__ == "__main__":
# Set the port from the command line
if len(sys.argv) > 1:
PORT = sys.argv[1]
signal.signal(signal.SIGINT, kill_thread)
signal.signal(signal.SIGTERM, kill_thread)
socketio.run(app)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment