Skip to content

Instantly share code, notes, and snippets.

@after-the-sunrise
Created January 26, 2014 01:32
Show Gist options
  • Save after-the-sunrise/8626762 to your computer and use it in GitHub Desktop.
Save after-the-sunrise/8626762 to your computer and use it in GitHub Desktop.
import logging
import time
import socket
from os import linesep
from queue import Empty, Queue
from socket import AF_INET, SOCK_STREAM
from threading import Thread, Lock
class socket_handler(object):
def __init__(self, host="localhost", port=12345):
self.__logger = logging.getLogger(self.__class__.__name__)
self.__lock = Lock()
self.__host = host
self.__port = port
self.__sock = None
self.__reader = None
self.__writer = None
self.__notifier = None
self.__inbound = Queue()
self.__outbound = Queue()
self.__listeners = list()
def __read(self, sock):
try:
self.__logger.info("Reader started.")
while not sock._closed:
file = sock.makefile(mode='r')
try:
for line in file:
self.__logger.debug("Read : " + line)
self.__inbound.put_nowait(line)
finally:
file.close()
finally:
sock.close()
self.__logger.info("Reader terminated.")
def __write(self, sock):
try:
self.__logger.info("Writer started.")
file = sock.makefile(mode='w')
while not sock._closed:
try:
msg = self.__outbound.get(block=True, timeout=1)
if msg != None:
file.write(msg + linesep)
self.__logger.debug("Write : " + msg)
if self.__outbound.empty():
file.flush()
except Empty:
time.sleep(0.1)
file.close()
finally:
sock.close()
self.__logger.info("Writer terminated.")
def __notify(self, sock):
try:
self.__logger.info("Notifier started.")
while not sock._closed:
try:
msg = self.__inbound.get(block=True, timeout=1)
for l in self.__listeners:
l.onMessage(msg)
except Empty:
time.sleep(0.1)
finally:
sock.close()
for l in self.__listeners:
l.onClose(msg)
self.__logger.info("Notifier terminated.")
def connect(self):
with self.__lock:
if self.__sock == None:
self.__logger.info("Initializing connection.")
try:
self.__sock = socket.socket(AF_INET, SOCK_STREAM)
self.__sock.settimeout(5)
self.__sock.connect((self.__host, self.__port))
self.__logger.info("Initialized connection.")
except Exception as e:
self.__sock.close()
self.__sock = None
tmp = "Initialization failure : [{0}:{1}] {2}"
msg = tmp.format(self.__host, self.__port, str(e))
self.__logger.info(msg)
raise e
s = self.__sock
self.__reader = Thread(target=self.__read, args=[s])
self.__writer = Thread(target=self.__write, args=[s])
self.__notifier = Thread(target=self.__notify, args=[s])
self.__reader.start()
self.__writer.start()
self.__notifier.start()
else:
self.__logger.warn("Connection already initialized.")
def disconnect(self):
with self.__lock:
if self.__sock != None:
self.__logger.info("Terminating connection.")
self.__sock.close()
self.__writer.join()
self.__reader.join()
self.__notifier.join()
self.__writer = None
self.__reader = None
self.__notifier = None
self.__sock = None
self.__logger.info("Terminating connection.")
else:
self.__logger.warn("Connection already closed.")
def isConnected(self):
with self.__lock:
return self.__sock != None and not self.__sock._closed
def addListener(self, listener):
self.__listeners.append(listener)
def removeListener(self, listener):
self.__listeners.remove(listener)
def writeMessage(self, message):
self.__outbound.put_nowait(message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment