-
-
Save codingadvocate/f732da79ddf6cef4b7a0b6b3679f519f to your computer and use it in GitHub Desktop.
stop/start client connections with loseConnection in ReconnectingClientFactory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, sys, traceback | |
import json, time, datetime, psutil | |
from twisted.internet.protocol import ReconnectingClientFactory | |
from twisted.protocols.basic import LineReceiver | |
from twisted.internet import reactor, task, defer, threads | |
from contextlib import suppress | |
class CustomLineReceiverProtocol(LineReceiver): | |
delimiter = b':==:' | |
class ServiceClientProtocol(CustomLineReceiverProtocol): | |
def connectionMade(self): | |
print(' protocol connectionMade') | |
self.factory.connectedClient = self | |
self.factory.clientConnectionMade() | |
def lineReceived(self, line): | |
dataDict = json.loads(line) | |
if dataDict.get('action') == 'healthRequest': | |
self.factory.enterSimulateJob() | |
def connectionLost(self, reason): | |
print(' protocol connectionLost') | |
self.factory.connectedClient = None | |
def constructAndSendData(self, action, content): | |
message = {} | |
message['action'] = action | |
message['content'] = content | |
jsonMessage = json.dumps(message) | |
msg = jsonMessage.encode('utf-8') | |
print(' protocol constructAndSendData: {}'.format(msg)) | |
self.sendLine(msg) | |
class ServiceClientFactory(ReconnectingClientFactory): | |
continueTrying = True | |
def __init__(self): | |
print('factory constructor') | |
self.connectedClient = None | |
self.health = {} | |
self.loopingSystemHealth = task.LoopingCall(self.enterSystemHealthCheck) | |
self.loopingSystemHealth.start(10) | |
self.numPortsChanged = False | |
self.disconnectedOnPurpose = False | |
super().__init__() | |
def buildProtocol(self, addr): | |
print(' factory buildProtocol') | |
self.resetDelay() | |
protocol = ServiceClientProtocol() | |
protocol.factory = self | |
return protocol | |
def clientConnectionLost(self, connector, reason): | |
print(' factory clientConnectionLost: reason: {}'.format(reason)) | |
# if self.disconnectedOnPurpose: | |
# ## Hack to keep reactor alive | |
# print(' factory clientConnectionLost: increasing numPorts') | |
# self.numPorts += 1 | |
# self.numPortsChanged = True | |
# self.disconnectedOnPurpose = False | |
print(' ... simulate client going idle before attempting restart...') | |
time.sleep(5) | |
ReconnectingClientFactory.clientConnectionLost(self, connector, reason) | |
print(' factory clientConnectionLost: end.\n') | |
def clientConnectionMade(self): | |
print(' factory clientConnectionMade: starting numPorts: {}'.format(self.numPorts)) | |
# if self.numPortsChanged : | |
# ## Resetting from hacked value | |
# print(' factory clientConnectionMade: decreasing numPorts') | |
# self.numPorts -= 1 | |
# self.numPortsChanged = False | |
print(' factory clientConnectionMade: finished numPorts: {}'.format(self.numPorts)) | |
print(' ..... pausing for <CTRL><C> test') | |
time.sleep(3) | |
def cleanup(self): | |
print('factory cleanup: calling loseConnection') | |
if self.connectedClient is not None: | |
self.connectedClient.transport.loseConnection() | |
self.disconnectedOnPurpose = True | |
def stopFactory(self): | |
print('stopFactory') | |
self.stopTrying() | |
with suppress(Exception): | |
self.loopingSystemHealth.stop() | |
print('stopFactory end.') | |
def enterSimulateJob(self): | |
print(' factory enterSimulateJob') | |
threadHandle = threads.deferToThread(self.simulateJob) | |
return threadHandle | |
def simulateJob(self): | |
print(' factory simulateJob: starting job') | |
time.sleep(2) | |
self.connectedClient.constructAndSendData('jobResponse', self.health) | |
print(' factory simulateJob: finished job... time to reset the client (diconnect/re-initialize)...') | |
self.cleanup() | |
def enterSystemHealthCheck(self): | |
print(' factory enterSystemHealthCheck') | |
threadHandle = threads.deferToThread(self.getSystemHealth) | |
return threadHandle | |
def getSystemHealth(self): | |
print(' factory getSystemHealth') | |
try: | |
currentTime = time.time() | |
process = psutil.Process(os.getpid()) | |
startTime = process.create_time() | |
self.health = { | |
'processCpuPercent': process.cpu_percent(), | |
'processMemory': process.memory_full_info().uss, | |
'processRunTime': int(currentTime-startTime) | |
} | |
print(' factory getSystemHealth: system health: {}'.format(self.health)) | |
except: | |
exception = traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]) | |
print(' factory getSystemHealth: exception: {}'.format(exception)) | |
if __name__ == '__main__': | |
try: | |
connector = reactor.connectTCP('127.0.0.1', 51841, ServiceClientFactory(), timeout=300) | |
reactor.run() | |
except: | |
stacktrace = traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]) | |
print('clientWrapper exception: {}'.format(stacktrace)) | |
print('exiting') | |
sys.exit(0) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys, traceback | |
import json | |
from twisted.internet import reactor, task, defer, threads | |
from twisted.internet.protocol import ServerFactory | |
from twisted.protocols.basic import LineReceiver | |
class CustomLineReceiverProtocol(LineReceiver): | |
delimiter = b':==:' | |
class ServiceListener(CustomLineReceiverProtocol): | |
def connectionMade(self): | |
print(' protocol connectionMade') | |
self.factory.activeClients.append(self) | |
def connectionLost(self, reason): | |
print(' protocol connectionLost') | |
self.factory.removeClient(self) | |
def lineReceived(self, line): | |
print(' protocol lineReceived: {}'.format(line)) | |
def constructAndSendData(self, action): | |
message = {'action': action} | |
jsonMessage = json.dumps(message) | |
msg = jsonMessage.encode('utf-8') | |
print(' protocol constructAndSendData: {}'.format(msg)) | |
self.sendLine(msg) | |
class ServiceFactory(ServerFactory): | |
protocol = ServiceListener | |
def __init__(self): | |
print('factory constructor') | |
super().__init__() | |
self.activeClients = [] | |
self.loopingHealthUpdates = task.LoopingCall(self.enterSystemHealthCheck) | |
self.loopingHealthUpdates.start(15) | |
def removeClient(self, client): | |
print(' factory removeClient') | |
self.activeClients.remove(client) | |
def enterSystemHealthCheck(self): | |
print(' factory enterSystemHealthCheck') | |
threadHandle = threads.deferToThread(self.sendHealthRequest) | |
return threadHandle | |
def sendHealthRequest(self): | |
if len(self.activeClients) <= 0: | |
print(' factory sendHealthRequest: no active clients to talk to') | |
else: | |
for client in self.activeClients: | |
print(' factory sendHealthRequest: requesting from client...') | |
client.constructAndSendData('healthRequest') | |
if __name__ == '__main__': | |
try: | |
reactor.listenTCP(51841, ServiceFactory(), interface='127.0.0.1') | |
reactor.run() | |
except: | |
stacktrace = traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2]) | |
print('clientWrapper exception: {}'.format(stacktrace)) | |
print('exiting') | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment