Skip to content

Instantly share code, notes, and snippets.

@RobbieClarken
Last active September 23, 2023 20:19
Show Gist options
  • Save RobbieClarken/8844828 to your computer and use it in GitHub Desktop.
Save RobbieClarken/8844828 to your computer and use it in GitHub Desktop.
EPICS over WebSockets with Flask

Installation:

pip install Flask Flask-Sockets gunicorn pyepics

Deployment:

gunicorn -k flask_sockets.worker index:app
<!DOCTYPE html>
<html>
<head>
<script>
document.addEventListener('DOMContentLoaded', function() {
var socket = new WebSocket('ws://127.0.0.1:8000/monitor')
socket.onmessage = function(event) {
var data = JSON.parse(event.data)
document.getElementById('beam-current').innerHTML = data['value']
}
})
</script>
</head>
<body>
<p>Beam Current: <span id='beam-current'></span></p>
</body>
</html>
#-*- coding: utf-8 -*-
from flask import Flask, render_template
from flask_sockets import Sockets
from geventwebsocket.exceptions import WebSocketError
from epics import PV
import json
app = Flask(__name__)
sockets = Sockets(app)
ws_conns = []
@app.route('/')
def index():
return render_template('index.html')
@sockets.route('/monitor')
def echo_socket(ws):
ws_conns.append(ws)
while True:
message = ws.receive()
if message is None:
break
ws_conns.remove(ws)
ws.close()
def callback(pvname, value, **kws):
message = json.dumps({'pvname': pvname, 'value': value})
for ws in ws_conns:
try:
ws.send(message)
except WebSocketError:
pass
pv = PV('SR11BCM01:CURRENT_MONITOR', callback=callback)
@dleacock
Copy link

Any idea on if this would work with multiple PVs? Been trying it for shutter PV's and not having any luck. Can read them fine using pyepics but unable to get it to work in the callback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment