Last active
August 29, 2015 14:17
-
-
Save chintal/2511459c02a9767deb5d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This file is part of qicada | |
See the COPYING, README, and INSTALL files for more information | |
""" | |
from __future__ import print_function | |
import logging | |
import usb | |
import pyudev | |
import serial.tools.list_ports | |
from twisted.internet.abstract import FileDescriptor | |
from twisted.internet.protocol import Protocol | |
from twisted.internet import reactor | |
from subsystems import log | |
logger = log.get_logger(__name__) | |
logger.setLevel(logging.INFO) | |
supported_devices = [(0x2047, 0x0992)] | |
def get_vid_pid_from_udev_port(udev_port): | |
attributes = udev_port.parent.parent.attributes | |
vid = int('0x'+attributes['idVendor'].strip(), 16) | |
pid = int('0x'+attributes['idProduct'].strip(), 16) | |
return vid, pid | |
def is_device_supported_from_udev_port(udev_port): | |
vid, pid = get_vid_pid_from_udev_port(udev_port) | |
if (vid, pid) in supported_devices: | |
return True | |
return False | |
def get_serialno_from_udev_port(udev_port): | |
attributes = udev_port.parent.parent.attributes | |
return attributes['serial'] | |
class ConnectedDevices(object): | |
def __init__(self): | |
self._devlist = [] | |
self.rescan() | |
def __iter__(self): | |
return self._devlist.__iter__() | |
def __getitem__(self, y): | |
return self._devlist.__getitem__(y) | |
def get_device_by_udev_port(self, udev_port): | |
for device in self._devlist: | |
if udev_port in device.udev_ports: | |
return device | |
raise LookupError("udev_port not in connected devices") | |
def get_device_by_serialno(self, serialno): | |
for device in self._devlist: | |
if device.serialno == serialno: | |
return device | |
raise LookupError("Serial Number not in connected devices") | |
def __delitem__(self, i): | |
return self._devlist.__delitem__(i) | |
def __contains__(self, y): | |
if isinstance(y, QDeviceInfo): | |
return self._devlist.__contains__(y) | |
@property | |
def serialnos(self): | |
return [device.serialno for device in self._devlist] | |
def append(self, qdevice): | |
if isinstance(qdevice, QDeviceInfo): | |
if qdevice.serialno not in self.serialnos: | |
logger.info("Added Device : " + qdevice.serialno) | |
self._devlist.append(qdevice) | |
else: | |
logger.warning("Attempted to add device already connected : " + qdevice.serialno) | |
raise ValueError("Serial Number already connected") | |
else: | |
logger.warning("Attempted to add device which isn't a qdevice") | |
raise TypeError("Device is not a QDevice object") | |
def append_by_udev_port(self, udev_port): | |
if is_device_supported_from_udev_port(udev_port): | |
vid, pid = get_vid_pid_from_udev_port(udev_port) | |
serialno = get_serialno_from_udev_port(udev_port) | |
self.append(QDeviceInfo(serialno, vid, pid)) | |
def remove(self, x): | |
logger.info("Removed Device : " + x.serialno) | |
return self._devlist.remove(x) | |
def remove_by_udev_port(self, udev_port): | |
device = self.get_device_by_udev_port(udev_port) | |
self.remove(device) | |
@property | |
def ports(self): | |
ports = [] | |
for device in self._devlist: | |
ports.extend(device.ports) | |
return ports | |
@property | |
def udev_ports(self): | |
udev_ports = [] | |
for device in self._devlist: | |
udev_ports.extend(device.udev_ports) | |
return udev_ports | |
def rescan(self): | |
logger.info("Full Rescan") | |
self._devlist = get_qdevices() | |
def __len__(self): | |
return len(self._devlist) | |
class QDeviceInfo(object): | |
def __init__(self, serialno, vid, pid): | |
self.serialno = serialno | |
self.PDS = None | |
self.CDS = None | |
self.BCI = None | |
self.RTCS = None | |
self._VID = vid | |
self._PID = pid | |
self.find_ports() | |
def find_ports(self): | |
description = self.desc_string | |
allports = serial.tools.list_ports.comports() | |
for port in allports: | |
if port[2] == description and port[1] == 'Payload Data Stream': | |
self.PDS = SerialPortID(port[0]) | |
if port[2] == description and port[1] == 'Control Data Stream': | |
self.CDS = SerialPortID(port[0]) | |
if port[2] == description and port[1] == 'Backchannel Interface': | |
self.BCI = SerialPortID(port[0]) | |
@property | |
def has_payload_data_stream(self): | |
if self.PDS is not None: | |
return True | |
@property | |
def has_control_data_stream(self): | |
if self.CDS is not None: | |
return True | |
@property | |
def has_backchannel_interface(self): | |
if self.BCI is not None: | |
return True | |
@property | |
def has_realtime_control_stream(self): | |
if self.RTCS is not None: | |
return True | |
@property | |
def desc_string(self): | |
if self._VID == 0x2047 and self._PID == 0x0992: | |
return 'USB VID:PID=2047:0992 SNR=' + self.serialno | |
@property | |
def ports(self): | |
portlist = [] | |
if self.has_backchannel_interface: | |
portlist.append(self.BCI.port) | |
if self.has_control_data_stream: | |
portlist.append(self.CDS.port) | |
if self.has_payload_data_stream: | |
portlist.append(self.PDS.port) | |
return portlist | |
@property | |
def udev_ports(self): | |
portlist = [] | |
if self.has_backchannel_interface: | |
portlist.append(self.BCI.udev_port) | |
if self.has_control_data_stream: | |
portlist.append(self.CDS.udev_port) | |
if self.has_payload_data_stream: | |
portlist.append(self.PDS.udev_port) | |
return portlist | |
@property | |
def vid(self): | |
return self._VID | |
@property | |
def pid(self): | |
return self._PID | |
def __repr__(self): | |
repr_str = "QDevice :: \r\n" | |
repr_str += " Serial Number : " + self.serialno + "\r\n" | |
repr_str += " VID:PID : " + hex(self.vid) + ":" + hex(self.pid) + "\r\n" | |
if self.has_payload_data_stream: | |
repr_str += " Payload Datastream : " + self.PDS.port + "\r\n" | |
if self.has_control_data_stream: | |
repr_str += " Control Datastream : " + self.CDS.port + "\r\n" | |
if self.has_backchannel_interface: | |
repr_str += " Backchannel Interface : " + self.BCI.port + "\r\n" | |
return repr_str | |
class SerialPortID(object): | |
def __init__(self, porthandle): | |
self.port = porthandle | |
context = pyudev.Context() | |
self.udev_port = pyudev.Device.from_device_file(context, self.port) | |
def get_qdevices(): | |
""" | |
This function uses pyusb to get all the supported devices presently | |
connected. This function is probably not a good fit for continuous | |
monitoring, but should be fine to find the devices at a specific | |
instance in response to some form of user interaction, such as: | |
- Program Init | |
- Button Press on something like "Force Re-scan" | |
Some other form of kernel level monitoring should be used for | |
continuous monitoring. | |
:rtype : List of QDevicePorts namedtuples | |
""" | |
busses = usb.busses() | |
qdevices = [] | |
for bus in busses: | |
devices = bus.devices | |
for dev in devices: | |
handle = dev.open() | |
if (dev.idVendor, dev.idProduct) not in supported_devices: | |
continue | |
logger.info("Found QDevice") | |
logger.info("Address :: " + str(bus.dirname) + ":" + str(dev.filename)) | |
logger.info(" VID:PID :: 0x{:04x}:0x{:04x}".format(dev.idVendor, dev.idProduct)) | |
logger.debug(" Manufacturer: 0x{:x}".format(dev.iManufacturer)) | |
try: | |
if dev.iManufacturer == 0: | |
logger.debug('') | |
else: | |
logger.debug(" --> '{}'".format(handle.getString(dev.iManufacturer, 255))) | |
except usb.USBError: | |
logger.warning("::Insufficient Permissions to read Manufacturer String") | |
logger.debug(" Product: 0x{:x}".format(dev.iProduct)) | |
try: | |
if dev.iProduct == 0: | |
logger.debug('') | |
else: | |
logger.debug(" --> '{}'".format(handle.getString(dev.iProduct, 255))) | |
except usb.USBError: | |
logger.warning("::Insufficient Permissions to read Product String") | |
logger.debug(" Serial Number: 0x{:x}".format(dev.iSerialNumber)) | |
try: | |
if dev.iSerialNumber == 0: | |
logger.info('') | |
else: | |
logger.info(" --> '{}'".format(handle.getString(dev.iSerialNumber, 255))) | |
except usb.USBError: | |
logger.warning("::Insufficient Permissions to read Serial Number") | |
sno = handle.getString(dev.iSerialNumber, 255) | |
qdevices.append(QDeviceInfo(sno, dev.idVendor, dev.idProduct)) | |
return qdevices | |
connected_devices = ConnectedDevices() | |
class UdevMonitor(FileDescriptor): | |
""" | |
File Descriptor for pyudev.Monitor. | |
@see: U{http://packages.python.org/pyudev/api/monitor.html}. | |
""" | |
def __init__(self, _reactor, protocol, subsystem=None): | |
FileDescriptor.__init__(self, _reactor) | |
# Set up monitor | |
context = pyudev.Context() | |
self.monitor = pyudev.Monitor.from_netlink(context) | |
if subsystem: | |
self.monitor.filter_by(subsystem=subsystem) | |
# Connect protocol | |
assert isinstance(protocol, UdevMonitorListener) | |
self.protocol = protocol | |
self.protocol.makeConnection(self) | |
def fileno(self): | |
""" | |
Return monitor's file descriptor. | |
""" | |
return self.monitor.fileno() | |
def startReading(self): | |
""" | |
Start waiting for read availability. | |
""" | |
logger.debug("starting udev monitor fd") | |
self.monitor.start() | |
FileDescriptor.startReading(self) | |
def doRead(self): | |
""" | |
An event is ready, decode it through Monitor and call our protocol. | |
""" | |
logger.debug("udev reports event available") | |
event = self.monitor.receive_device() | |
if event: | |
action, device = event | |
self.protocol.eventReceived(action, device) | |
def writeSomeData(self, data): | |
raise IOError("You can't write to a udev Monitor") | |
class UdevMonitorListener(Protocol): | |
def __init__(self, _reactor=None): | |
if _reactor is not None: | |
self._reactor = _reactor | |
else: | |
self._reactor = reactor | |
self.subsystem = 'tty' | |
self.monitor = None | |
def startListening(self): | |
logger.info("Starting UdevMonitorListener") | |
self.monitor = UdevMonitor(self._reactor, self, self.subsystem) | |
self.monitor.startReading() | |
def eventReceived(self, action, device): | |
if device in connected_devices.udev_ports: | |
if action == u'remove': | |
connected_devices.remove_by_udev_port(device) | |
if action == u'add': | |
if is_device_supported_from_udev_port(device): | |
if device not in connected_devices.udev_ports: | |
connected_devices.append_by_udev_port(device) | |
def init(_reactor=None): | |
monitor_protocol = UdevMonitorListener(_reactor) | |
monitor_protocol.startListening() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment