Skip to content

Instantly share code, notes, and snippets.

@codingadvocate
Created March 22, 2019 23:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save codingadvocate/f732da79ddf6cef4b7a0b6b3679f519f to your computer and use it in GitHub Desktop.
Save codingadvocate/f732da79ddf6cef4b7a0b6b3679f519f to your computer and use it in GitHub Desktop.
stop/start client connections with loseConnection in ReconnectingClientFactory
import os, sys, traceback
import json, time, datetime, psutil
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.protocols.basic import LineReceiver
from twisted.internet import reactor, task, defer, threads
from contextlib import suppress
class CustomLineReceiverProtocol(LineReceiver):
delimiter = b':==:'
class ServiceClientProtocol(CustomLineReceiverProtocol):
def connectionMade(self):
print(' protocol connectionMade')
self.factory.connectedClient = self
self.factory.clientConnectionMade()
def lineReceived(self, line):
dataDict = json.loads(line)
if dataDict.get('action') == 'healthRequest':
self.factory.enterSimulateJob()
def connectionLost(self, reason):
print(' protocol connectionLost')
self.factory.connectedClient = None
def constructAndSendData(self, action, content):
message = {}
message['action'] = action
message['content'] = content
jsonMessage = json.dumps(message)
msg = jsonMessage.encode('utf-8')
print(' protocol constructAndSendData: {}'.format(msg))
self.sendLine(msg)
class ServiceClientFactory(ReconnectingClientFactory):
continueTrying = True
def __init__(self):
print('factory constructor')
self.connectedClient = None
self.health = {}
self.loopingSystemHealth = task.LoopingCall(self.enterSystemHealthCheck)
self.loopingSystemHealth.start(10)
self.numPortsChanged = False
self.disconnectedOnPurpose = False
super().__init__()
def buildProtocol(self, addr):
print(' factory buildProtocol')
self.resetDelay()
protocol = ServiceClientProtocol()
protocol.factory = self
return protocol
def clientConnectionLost(self, connector, reason):
print(' factory clientConnectionLost: reason: {}'.format(reason))
# if self.disconnectedOnPurpose:
# ## Hack to keep reactor alive
# print(' factory clientConnectionLost: increasing numPorts')
# self.numPorts += 1
# self.numPortsChanged = True
# self.disconnectedOnPurpose = False
print(' ... simulate client going idle before attempting restart...')
time.sleep(5)
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
print(' factory clientConnectionLost: end.\n')
def clientConnectionMade(self):
print(' factory clientConnectionMade: starting numPorts: {}'.format(self.numPorts))
# if self.numPortsChanged :
# ## Resetting from hacked value
# print(' factory clientConnectionMade: decreasing numPorts')
# self.numPorts -= 1
# self.numPortsChanged = False
print(' factory clientConnectionMade: finished numPorts: {}'.format(self.numPorts))
print(' ..... pausing for <CTRL><C> test')
time.sleep(3)
def cleanup(self):
print('factory cleanup: calling loseConnection')
if self.connectedClient is not None:
self.connectedClient.transport.loseConnection()
self.disconnectedOnPurpose = True
def stopFactory(self):
print('stopFactory')
self.stopTrying()
with suppress(Exception):
self.loopingSystemHealth.stop()
print('stopFactory end.')
def enterSimulateJob(self):
print(' factory enterSimulateJob')
threadHandle = threads.deferToThread(self.simulateJob)
return threadHandle
def simulateJob(self):
print(' factory simulateJob: starting job')
time.sleep(2)
self.connectedClient.constructAndSendData('jobResponse', self.health)
print(' factory simulateJob: finished job... time to reset the client (diconnect/re-initialize)...')
self.cleanup()
def enterSystemHealthCheck(self):
print(' factory enterSystemHealthCheck')
threadHandle = threads.deferToThread(self.getSystemHealth)
return threadHandle
def getSystemHealth(self):
print(' factory getSystemHealth')
try:
currentTime = time.time()
process = psutil.Process(os.getpid())
startTime = process.create_time()
self.health = {
'processCpuPercent': process.cpu_percent(),
'processMemory': process.memory_full_info().uss,
'processRunTime': int(currentTime-startTime)
}
print(' factory getSystemHealth: system health: {}'.format(self.health))
except:
exception = traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2])
print(' factory getSystemHealth: exception: {}'.format(exception))
if __name__ == '__main__':
try:
connector = reactor.connectTCP('127.0.0.1', 51841, ServiceClientFactory(), timeout=300)
reactor.run()
except:
stacktrace = traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2])
print('clientWrapper exception: {}'.format(stacktrace))
print('exiting')
sys.exit(0)
import sys, traceback
import json
from twisted.internet import reactor, task, defer, threads
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import LineReceiver
class CustomLineReceiverProtocol(LineReceiver):
delimiter = b':==:'
class ServiceListener(CustomLineReceiverProtocol):
def connectionMade(self):
print(' protocol connectionMade')
self.factory.activeClients.append(self)
def connectionLost(self, reason):
print(' protocol connectionLost')
self.factory.removeClient(self)
def lineReceived(self, line):
print(' protocol lineReceived: {}'.format(line))
def constructAndSendData(self, action):
message = {'action': action}
jsonMessage = json.dumps(message)
msg = jsonMessage.encode('utf-8')
print(' protocol constructAndSendData: {}'.format(msg))
self.sendLine(msg)
class ServiceFactory(ServerFactory):
protocol = ServiceListener
def __init__(self):
print('factory constructor')
super().__init__()
self.activeClients = []
self.loopingHealthUpdates = task.LoopingCall(self.enterSystemHealthCheck)
self.loopingHealthUpdates.start(15)
def removeClient(self, client):
print(' factory removeClient')
self.activeClients.remove(client)
def enterSystemHealthCheck(self):
print(' factory enterSystemHealthCheck')
threadHandle = threads.deferToThread(self.sendHealthRequest)
return threadHandle
def sendHealthRequest(self):
if len(self.activeClients) <= 0:
print(' factory sendHealthRequest: no active clients to talk to')
else:
for client in self.activeClients:
print(' factory sendHealthRequest: requesting from client...')
client.constructAndSendData('healthRequest')
if __name__ == '__main__':
try:
reactor.listenTCP(51841, ServiceFactory(), interface='127.0.0.1')
reactor.run()
except:
stacktrace = traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2])
print('clientWrapper exception: {}'.format(stacktrace))
print('exiting')
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment