Skip to content

Instantly share code, notes, and snippets.

@johnnyutahh
Created April 15, 2015 17:35
Show Gist options
  • Save johnnyutahh/e7754a70ccf7972feae5 to your computer and use it in GitHub Desktop.
Save johnnyutahh/e7754a70ccf7972feae5 to your computer and use it in GitHub Desktop.
# Copied and modified from http://stackoverflow.com/a/12712362/605356
from PyQt4.QtCore import *
from PyQt4.QtGui import *
from PyQt4.QtNetwork import *
import logging, sys
LOGVARSTR = "%25s = '%s'"
class QtSingleApplication(QApplication):
messageReceived = pyqtSignal(unicode)
def __init__(self, id, *argv):
logfunc = logging.info
logfunc(sys._getframe().f_code.co_name + '()')
super(QtSingleApplication, self).__init__(*argv)
self._id = id
self._activationWindow = None
self._activateOnMessage = False
# Is there another instance running?
self._outSocket = QLocalSocket()
self._outSocket.connectToServer(self._id)
self._isRunning = self._outSocket.waitForConnected()
self._outStream = None
self._inSocket = None
self._inStream = None
self._server = None
if self._isRunning:
# Yes, there is.
self._outStream = QTextStream(self._outSocket)
self._outStream.setCodec('UTF-8')
else:
# No, there isn't, at least not properly.
# Cleanup any past, crashed server.
error = self._outSocket.error()
logfunc(LOGVARSTR % ('self._outSocket.error()', error))
if error == QLocalSocket.ConnectionRefusedError:
logfunc('received QLocalSocket.ConnectionRefusedError; ' + \
'removing server.')
self.close()
QLocalServer.removeServer(self._id)
self._outSocket = None
self._server = QLocalServer()
self._server.listen(self._id)
self._server.newConnection.connect(self._onNewConnection)
logfunc(sys._getframe().f_code.co_name + '(): returning')
def close(self):
logfunc = logging.info
logfunc(sys._getframe().f_code.co_name + '()')
if self._inSocket:
self._inSocket.disconnectFromServer()
if self._outSocket:
self._outSocket.disconnectFromServer()
if self._server:
self._server.close()
logfunc(sys._getframe().f_code.co_name + '(): returning')
def isRunning(self):
return self._isRunning
def id(self):
return self._id
def activationWindow(self):
return self._activationWindow
def setActivationWindow(self, activationWindow, activateOnMessage = True):
self._activationWindow = activationWindow
self._activateOnMessage = activateOnMessage
def activateWindow(self):
if not self._activationWindow:
return
self._activationWindow.setWindowState(
self._activationWindow.windowState() & ~Qt.WindowMinimized)
self._activationWindow.raise_()
self._activationWindow.activateWindow()
def sendMessage(self, msg):
if not self._outStream:
return False
self._outStream << msg << '\n'
self._outStream.flush()
return self._outSocket.waitForBytesWritten()
def _onNewConnection(self):
if self._inSocket:
self._inSocket.readyRead.disconnect(self._onReadyRead)
self._inSocket = self._server.nextPendingConnection()
if not self._inSocket:
return
self._inStream = QTextStream(self._inSocket)
self._inStream.setCodec('UTF-8')
self._inSocket.readyRead.connect(self._onReadyRead)
if self._activateOnMessage:
self.activateWindow()
def _onReadyRead(self):
while True:
msg = self._inStream.readLine()
if not msg: break
self.messageReceived.emit(msg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment