Skip to content

Instantly share code, notes, and snippets.

@aeroaks
Last active November 4, 2022 09:48
Show Gist options
  • Save aeroaks/733cd638d7ca7bdcd279b80fd3f22431 to your computer and use it in GitHub Desktop.
Save aeroaks/733cd638d7ca7bdcd279b80fd3f22431 to your computer and use it in GitHub Desktop.
Websocket server with Qt4 GUI using python-websocket-server recieves message from clients and is printed by the GUI using slots
# -*- coding: utf-8 -*-
# Form implementation generated from reading ui file 'websocket.ui'
#
# Created by: PyQt4 UI code generator 4.11.4
#
# WARNING! All changes made in this file will be lost!
from PyQt4 import QtCore, QtGui
try:
_fromUtf8 = QtCore.QString.fromUtf8
except AttributeError:
def _fromUtf8(s):
return s
try:
_encoding = QtGui.QApplication.UnicodeUTF8
def _translate(context, text, disambig):
return QtGui.QApplication.translate(context, text, disambig, _encoding)
except AttributeError:
def _translate(context, text, disambig):
return QtGui.QApplication.translate(context, text, disambig)
class Ui_Dialog(object):
def setupUi(self, Dialog):
Dialog.setObjectName(_fromUtf8("Dialog"))
Dialog.resize(430, 247)
Dialog.setMinimumSize(QtCore.QSize(280, 200))
self.gridLayout = QtGui.QGridLayout(Dialog)
self.gridLayout.setObjectName(_fromUtf8("gridLayout"))
self.verticalLayout = QtGui.QVBoxLayout()
self.verticalLayout.setObjectName(_fromUtf8("verticalLayout"))
self.startBtn = QtGui.QPushButton(Dialog)
self.startBtn.setObjectName(_fromUtf8("startBtn"))
self.verticalLayout.addWidget(self.startBtn)
self.rxList = QtGui.QListWidget(Dialog)
self.rxList.setObjectName(_fromUtf8("rxList"))
self.verticalLayout.addWidget(self.rxList)
self.horizontalLayout = QtGui.QHBoxLayout()
self.horizontalLayout.setObjectName(_fromUtf8("horizontalLayout"))
self.passBtn = QtGui.QPushButton(Dialog)
self.passBtn.setObjectName(_fromUtf8("passBtn"))
self.horizontalLayout.addWidget(self.passBtn)
self.failBtn = QtGui.QPushButton(Dialog)
self.failBtn.setObjectName(_fromUtf8("failBtn"))
self.horizontalLayout.addWidget(self.failBtn)
self.verticalLayout.addLayout(self.horizontalLayout)
self.gridLayout.addLayout(self.verticalLayout, 0, 0, 1, 1)
self.retranslateUi(Dialog)
QtCore.QMetaObject.connectSlotsByName(Dialog)
def retranslateUi(self, Dialog):
Dialog.setWindowTitle(_translate("Dialog", "Websocket Server (Fake)", None))
self.startBtn.setText(_translate("Dialog", "Start Server", None))
self.passBtn.setText(_translate("Dialog", "Pass", None))
self.failBtn.setText(_translate("Dialog", "Fail", None))
from websocket_server import WebsocketServer
import datetime
import random
import json
import sys
from PyQt4 import QtGui, QtCore
from wsGui import Ui_Dialog
class WebSocket(QtCore.QThread):
GotMsg = QtCore.pyqtSignal(list)
server = WebsocketServer(9000, '127.0.0.2')
awaitingResult = True
result = 'fail'
def new_client(self, client, server):
self.server.send_message_to_all(json.dumps({u'result': u'new'}))
def client_msg(self, client, server, message):
now = datetime.datetime.utcnow().isoformat() + 'Z'
name = message
self.GotMsg.emit([client, message])
greeting = "Hello {0}! at {1}".format(name, now)
#self.server.send_message(client, greeting)
while self.awaitingResult:
pass
self.server.send_message(client, self.result)
self.awaitingResult = True
# Called for every client disconnecting
def client_left(self, client, server):
self.server.server_close()
print("Client(%d) disconnected" % client['id'])
def run(self):
self.server.set_fn_new_client(self.new_client)
self.server.set_fn_message_received(self.client_msg)
self.server.set_fn_client_left(self.client_left)
self.server.run_forever()
class ServerGui(QtGui.QDialog, Ui_Dialog):
sendResult = QtCore.pyqtSignal(str)
def __init__(self, ws = None):
# exec_() method deletes the object instance on close
# select as 'modal' in designer to freeze background window
super().__init__()
self.setupUi(self)
self.WebSocketThread = WebSocket()
self.WebSocketThread.moveToThread(self.WebSocketThread)
self.LogModel = QtGui.QStandardItemModel() #Stores lines of log messages
self.rxList.setModel(self.LogModel)
self.startBtn.clicked.connect(self.startServer)
self.passBtn.clicked.connect(self.txResultPass)
self.failBtn.clicked.connect(self.txResultFail)
self.WebSocketThread.GotMsg.connect(self.msgPass)
#self.sendResult.connect(self.WebSocketThread.sendMsg)
def startServer(self):
self.WebSocketThread.start()
print('started server')
def txResultPass(self):
self.sendResult.emit(json.dumps({u'result': u'pass'}))
print('GUI', json.dumps({u'result': u'pass'}))
def txResultFail(self):
self.sendResult.emit(json.dumps({u'result': u'fail'}))
print('GUI', json.dumps({u'result': u'fail'}))
def closeEvent(self, *args, **kwargs):
#self.WebSocketThread.eventloop.stop()
pass
def msgPass(self, msgList):
print(msgList)
item = QtGui.QStandardItem(msgList)
self.LogModel.appendRow([item])
if __name__ == '__main__':
app = QtGui.QApplication(sys.argv)
main = ServerGui()
main.show()
sys.exit(app.exec_())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment