Skip to content

Instantly share code, notes, and snippets.

@bmxp
Last active April 15, 2018 07:16
Show Gist options
  • Save bmxp/a954c2496ac1c264483bd51d4e84d171 to your computer and use it in GitHub Desktop.
Save bmxp/a954c2496ac1c264483bd51d4e84d171 to your computer and use it in GitHub Desktop.
lib/connection.py
#!/usr/bin/env python3
#########################################################################
# Copyright 2013 Marcus Popp marcus@popp.mx
#########################################################################
# This file is part of SmartHomeNG. https://github.com/smarthomeNG//
#
# SmartHomeNG is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# SmartHomeNG is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with SmartHomeNG. If not, see <http://www.gnu.org/licenses/>.
#########################################################################
"""
This library is softly on it's way out. In the future network classes for SmartHomeNG
will be implemented trough the network library lib.network, which is still in development.
The following modules use an import lib.connection as of April 2018:
smarthome.py for an object of Connections()
Plugins:
russound, network, visu_websocket, asterisk, knx, squeezebox, nuki, mpd, raumfeld, cli, speech, xbmc, lirc
"""
import logging
import socket
import collections
import threading
import select
import time
import sys
logger = logging.getLogger(__name__)
class Base():
"""
provides same base class for class Connections(), class Server(),
class Stream() and thus also to class Client() which inherits from Stream()
some lookup dicts for protocol family like TCP or UDP flavours and the like for protocol type
"""
_poller = None
_family = {'UDP': socket.AF_INET, 'UDP6': socket.AF_INET6, 'TCP': socket.AF_INET, 'TCP6': socket.AF_INET6}
_type = {'UDP': socket.SOCK_DGRAM, 'UDP6': socket.SOCK_DGRAM, 'TCP': socket.SOCK_STREAM, 'TCP6': socket.SOCK_STREAM}
_monitor = []
def __init__(self, monitor=False):
self._name = self.__class__.__name__
if monitor:
self._monitor.append(self)
def _create_socket(self, flags=None):
family, type, proto, canonname, sockaddr = socket.getaddrinfo(self._host, self._port, family=self._family[self._proto], type=self._type[self._proto])[0]
self.socket = socket.socket(family, type, proto)
return sockaddr
class Connections(Base):
"""
Within SmartHome.py there is one instance of this class
The filenumber of a connection is the key to the contained dicts of
_connections and _servers
Additionally the filenumber is used for wither epoll or kqueue depending
on the environment found for select.
A filenumber of value -1 should be considered as an error valueself.
It was reported that
"""
_connections = {}
_servers = {}
if hasattr(select, 'epoll'):
_ro = select.EPOLLIN | select.EPOLLHUP | select.EPOLLERR
_rw = _ro | select.EPOLLOUT
def __init__(self):
Base.__init__(self)
Base._poller = self
if hasattr(select, 'epoll'):
self._epoll = select.epoll()
elif hasattr(select, 'kqueue'):
self._kqueue = select.kqueue()
def register_server(self, fileno, obj):
if fileno == -1:
logger.error("{} tried to register a server with filenumber == -1".format(obj))
return
self._servers[fileno] = obj
self._connections[fileno] = obj
if hasattr(select, 'epoll'):
self._epoll.register(fileno, self._ro)
elif hasattr(select, 'kqueue'):
event = [
select.kevent(fileno,
filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_ADD)
]
self._kqueue.control(event, 0, 0)
def register_connection(self, fileno, obj):
if fileno == -1:
logger.error("tried to register a connection with filenumber == -1")
return
self._connections[fileno] = obj
if hasattr(select, 'epoll'):
self._epoll.register(fileno, self._ro)
elif hasattr(select, 'kqueue'):
event = [
select.kevent(fileno,
filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_ADD)
]
self._kqueue.control(event, 0, 0)
def unregister_connection(self, fileno):
if fileno == -1:
logger.error("tried to unregister a connection with filenumber == -1")
return
try:
if hasattr(select, 'epoll'):
self._epoll.unregister(fileno)
del(self._connections[fileno])
del(self._servers[fileno])
except Exception as e:
pass
def monitor(self, obj):
self._monitor.append(obj)
def check(self):
for obj in self._monitor:
if not obj.connected:
obj.connect()
def trigger(self, fileno):
if fileno == -1:
logger.error("tried to trigger a connection with filenumber == -1")
return
if self._connections[fileno].outbuffer:
if hasattr(select, 'epoll'):
self._epoll.modify(fileno, self._rw)
elif hasattr(select, 'kqueue'):
event = [
select.kevent(fileno,
filter=select.KQ_FILTER_WRITE,
flags=select.KQ_EV_ADD | KQ_EV_ONESHOT)
]
self._kqueue.control(event, 0, 0)
def poll(self):
time.sleep(0.0000000001) # give epoll.modify a chance
if not self._connections:
time.sleep(1)
return
if -1 in self._connections:
logger.error("fileno -1 was found, this is an error situation, please report to SmartHomeNG team")
del( self._connections[-1])
for fileno in self._connections:
if fileno not in self._servers:
if hasattr(select, 'epoll'):
if self._connections[fileno].outbuffer:
self._epoll.modify(fileno, self._rw)
else:
self._epoll.modify(fileno, self._ro)
elif hasattr(select, 'kqueue'):
event = []
if self._connections[fileno].outbuffer:
event.append(select.kevent(fileno,
filter=select.KQ_FILTER_WRITE,
flags=select.KQ_EV_ADD | KQ_EV_ONESHOT))
else:
event.append(select.kevent(fileno,
filter=select.KQ_FILTER_READ,
flags=select.KQ_EV_ADD))
self._kqueue.control(event, 0, 0)
if hasattr(select, 'epoll'):
for fileno, event in self._epoll.poll(timeout=1):
if fileno in self._servers:
server = self._servers[fileno]
server.handle_connection()
else:
if event & select.EPOLLIN:
try:
con = self._connections[fileno]
con._in()
except Exception as e:
con.close()
continue
if event & select.EPOLLOUT:
try:
con = self._connections[fileno]
con._out()
except Exception as e:
con.close()
continue
if event & (select.EPOLLHUP | select.EPOLLERR):
try:
con = self._connections[fileno]
con.close()
continue
except:
pass
elif hasattr(select, 'kqueue'):
for event in self._kqueue.control(None, 1):
fileno = event.ident
if fileno in self._servers:
server = self._servers[fileno]
server.handle_connection()
else:
if event.filter == select.KQ_FILTER_READ:
try:
con = self._connections[fileno]
con._in()
except Exception as e: # noqa
con.close()
continue
if event.filter == select.KQ_FILTER_WRITE:
try:
con = self._connections[fileno]
con._out()
except Exception as e: # noqa
con.close()
continue
if event.flags & select.KQ_EV_EOF:
try:
con = self._connections[fileno]
con.close()
continue
except:
pass
else:
logger.exception("WARNING: no epoll/kqueue implementation available")
sys.exit(0)
def close(self):
if -1 in self._connections:
logger.error("Connection() close tried to close a filenumber == -1")
for fileno in self._connections:
try:
self._connections[fileno].close()
except:
pass
class Server(Base):
def __init__(self, host, port, proto='TCP'):
Base.__init__(self, monitor=True)
self._host = host
self._port = port
self._proto = proto
self.address = "{}:{}".format(host, port)
self.connected = False
def connect(self):
try:
sockaddr = self._create_socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(sockaddr)
if self._proto.startswith('TCP'):
self.socket.listen(5)
self.socket.setblocking(0)
except Exception as e:
logger.error("{}: problem binding {} ({}): {}".format(self._name, self.address, self._proto, e))
self.close()
else:
self.connected = True
logger.debug("{}: binding to {} ({})".format(self._name, self.address, self._proto))
self._poller.register_server(self.socket.fileno(), self)
def close(self):
self.connected = False
try:
self._poller.unregister_connection(self.socket.fileno())
except:
pass
try:
self.socket.shutdown(socket.SHUT_RDWR)
except:
pass
try:
self.socket.close()
except:
pass
try:
del(self.socket)
except:
pass
def accept(self):
try:
sock, addr = self.socket.accept()
sock.setblocking(0)
addr = "{}:{}".format(addr[0], addr[1])
logger.debug("{}: incoming connection from {} to {}".format(self._name, addr, self.address))
return sock, addr
except:
return None, None
def handle_connection(self):
pass
class Stream(Base):
def __init__(self, sock=None, address=None, monitor=False):
Base.__init__(self, monitor=monitor)
self.connected = False
self.address = address
self.inbuffer = bytearray()
self.outbuffer = collections.deque()
self.__olock = threading.Lock()
self._frame_size_in = 4096
self._frame_size_out = 4096
self.terminator = b'\r\n'
self._balance_open = False
self._balance_close = False
self._close_after_send = False
if sock is not None:
self.socket = sock
self._connected()
def _connected(self):
self._poller.register_connection(self.socket.fileno(), self)
self.connected = True
self.handle_connect()
def _in(self):
max_size = self._frame_size_in
try:
data = self.socket.recv(max_size)
except Exception as e: # noqa
self.close()
return
if data == b'':
self.close()
return
self.inbuffer.extend(data)
while True:
terminator = self.terminator
buffer_len = len(self.inbuffer)
if not terminator:
if not self._balance_open:
break
index = self._is_balanced()
if index:
data = self.inbuffer[:index]
self.inbuffer = self.inbuffer[index:]
self.found_balance(data)
else:
break
elif isinstance(terminator, int):
if buffer_len < terminator:
break
else:
data = self.inbuffer[:terminator]
self.inbuffer = self.inbuffer[terminator:]
self.terminator = 0
self.found_terminator(data)
else:
if terminator not in self.inbuffer:
break
index = self.inbuffer.find(terminator)
data = self.inbuffer[:index]
cut = index + len(terminator)
self.inbuffer = self.inbuffer[cut:]
self.found_terminator(data)
def _is_balanced(self):
stack = []
for index, char in enumerate(self.inbuffer):
if char == self._balance_open:
stack.append(char)
elif char == self._balance_close:
stack.append(char)
if stack.count(self._balance_open) < stack.count(self._balance_close):
logger.warning("{}: unbalanced input!".format(self._name))
logger.close()
return False
if stack.count(self._balance_open) == stack.count(self._balance_close):
return index + 1
return False
def _out(self):
if not self.__olock.acquire(timeout=1):
return
try:
while self.connected:
frame = self.outbuffer.pop()
if not frame:
if frame is None:
self.close()
return
continue # ignore empty frames
sent = self.socket.send(frame)
if sent < len(frame):
self.outbuffer.append(frame[sent:])
except IndexError: # buffer empty
return
except socket.error:
self.outbuffer.append(frame)
except Exception as e: # noqa
logger.exception("{}: {}".format(self._name, e))
self.close()
finally:
if self._close_after_send:
logger.debug("close after send")
self.close()
self.__olock.release()
def balance(self, bopen, bclose):
self._balance_open = ord(bopen)
self._balance_close = ord(bclose)
def close(self):
if self.connected:
logger.debug("{}: closing socket {}".format(self._name, self.address))
self.connected = False
try:
self._poller.unregister_connection(self.socket.fileno())
except:
pass
try:
self.handle_close()
except:
pass
try:
self.socket.shutdown(socket.SHUT_RDWR)
except:
pass
try:
self.socket.close()
except:
pass
try:
del(self.socket)
except:
pass
def discard_buffers(self):
self.inbuffer = bytearray()
self.outbuffer.clear()
def found_terminator(self, data):
pass
def found_balance(self, data):
pass
def handle_close(self):
pass
def handle_connect(self):
pass
def send(self, data, close=False):
self._close_after_send = close
if not self.connected:
return False
frame_size = self._frame_size_out
if len(data) > frame_size:
for i in range(0, len(data), frame_size):
self.outbuffer.appendleft(data[i:i + frame_size])
else:
self.outbuffer.appendleft(data)
self._out()
return True
class Client(Stream):
def __init__(self, host, port, proto='TCP', monitor=False):
Stream.__init__(self, monitor=monitor)
self._host = host
self._port = port
self._proto = proto
self.address = "{}:{}".format(host, port)
self._connection_attempts = 0
self._connection_errorlog = 60
self._connection_lock = threading.Lock()
def connect(self):
self._connection_lock.acquire()
if self.connected:
self._connection_lock.release()
return
try:
sockaddr = self._create_socket()
self.socket.settimeout(2)
self.socket.connect(sockaddr)
self.socket.setblocking(0)
# self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except Exception as e:
self._connection_attempts -= 1
if self._connection_attempts <= 0:
logger.error("{}: could not connect to {} ({}): {}".format(self._name, self.address, self._proto, e))
self._connection_attempts = self._connection_errorlog
self.close()
else:
logger.debug("{}: connected to {}".format(self._name, self.address))
self._connected()
finally:
self._connection_lock.release()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment