Skip to content

Instantly share code, notes, and snippets.

@alex-eri

alex-eri/qtchat.py

Created May 29, 2020
Embed
What would you like to do?
from asyncqt import QEventLoop, asyncSlot, asyncClose
import sys
import asyncio
from PySide2.QtWidgets import (
QApplication, QWidget, QLabel, QLineEdit, QTextEdit, QPushButton,
QVBoxLayout)
from PySide2.QtCore import Signal
class ClientProtocol(asyncio.Protocol):
def __init__(self, chat):
self.window = chat
def data_received(self, data: bytes):
decoded = data.decode()
self.window.speak.emit(decoded)
def connection_made(self, transport):
self.transport = transport
def connection_lost(self, exception):
pass
class MainWindow(QWidget):
speak = Signal(str)
__transport = None
__protocol = None
def __init__(self):
super().__init__()
self.setLayout(QVBoxLayout())
self.editResponse = QTextEdit('', self)
self.layout().addWidget(self.editResponse)
self.editSend = QLineEdit('', self)
self.layout().addWidget(self.editSend)
self.editSend.returnPressed.connect(self.on_editSend)
self.speak.connect(self.editResponse.append)
@asyncSlot()
async def on_editSend(self):
text = self.editSend.text()
if self.__transport:
self.__transport.write(text.encode())
self.editSend.setText('')
self.editResponse.append('>'+text)
def set_transport(self, task):
self.__transport, self.__protocol = task.result()
if __name__ == '__main__':
app = QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
mainWindow = MainWindow()
coro = loop.create_connection(
lambda : ClientProtocol(mainWindow),
"127.0.0.1",
8888
)
task = loop.create_task(coro)
task.add_done_callback(mainWindow.set_transport)
mainWindow.show()
with loop:
sys.exit(loop.run_forever())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment