Skip to content

Instantly share code, notes, and snippets.

@chintal
Last active August 29, 2015 14:17
Show Gist options
  • Save chintal/2511459c02a9767deb5d to your computer and use it in GitHub Desktop.
Save chintal/2511459c02a9767deb5d to your computer and use it in GitHub Desktop.
"""
This file is part of qicada
See the COPYING, README, and INSTALL files for more information
"""
from __future__ import print_function
import logging
import usb
import pyudev
import serial.tools.list_ports
from twisted.internet.abstract import FileDescriptor
from twisted.internet.protocol import Protocol
from twisted.internet import reactor
from subsystems import log
logger = log.get_logger(__name__)
logger.setLevel(logging.INFO)
supported_devices = [(0x2047, 0x0992)]
def get_vid_pid_from_udev_port(udev_port):
attributes = udev_port.parent.parent.attributes
vid = int('0x'+attributes['idVendor'].strip(), 16)
pid = int('0x'+attributes['idProduct'].strip(), 16)
return vid, pid
def is_device_supported_from_udev_port(udev_port):
vid, pid = get_vid_pid_from_udev_port(udev_port)
if (vid, pid) in supported_devices:
return True
return False
def get_serialno_from_udev_port(udev_port):
attributes = udev_port.parent.parent.attributes
return attributes['serial']
class ConnectedDevices(object):
def __init__(self):
self._devlist = []
self.rescan()
def __iter__(self):
return self._devlist.__iter__()
def __getitem__(self, y):
return self._devlist.__getitem__(y)
def get_device_by_udev_port(self, udev_port):
for device in self._devlist:
if udev_port in device.udev_ports:
return device
raise LookupError("udev_port not in connected devices")
def get_device_by_serialno(self, serialno):
for device in self._devlist:
if device.serialno == serialno:
return device
raise LookupError("Serial Number not in connected devices")
def __delitem__(self, i):
return self._devlist.__delitem__(i)
def __contains__(self, y):
if isinstance(y, QDeviceInfo):
return self._devlist.__contains__(y)
@property
def serialnos(self):
return [device.serialno for device in self._devlist]
def append(self, qdevice):
if isinstance(qdevice, QDeviceInfo):
if qdevice.serialno not in self.serialnos:
logger.info("Added Device : " + qdevice.serialno)
self._devlist.append(qdevice)
else:
logger.warning("Attempted to add device already connected : " + qdevice.serialno)
raise ValueError("Serial Number already connected")
else:
logger.warning("Attempted to add device which isn't a qdevice")
raise TypeError("Device is not a QDevice object")
def append_by_udev_port(self, udev_port):
if is_device_supported_from_udev_port(udev_port):
vid, pid = get_vid_pid_from_udev_port(udev_port)
serialno = get_serialno_from_udev_port(udev_port)
self.append(QDeviceInfo(serialno, vid, pid))
def remove(self, x):
logger.info("Removed Device : " + x.serialno)
return self._devlist.remove(x)
def remove_by_udev_port(self, udev_port):
device = self.get_device_by_udev_port(udev_port)
self.remove(device)
@property
def ports(self):
ports = []
for device in self._devlist:
ports.extend(device.ports)
return ports
@property
def udev_ports(self):
udev_ports = []
for device in self._devlist:
udev_ports.extend(device.udev_ports)
return udev_ports
def rescan(self):
logger.info("Full Rescan")
self._devlist = get_qdevices()
def __len__(self):
return len(self._devlist)
class QDeviceInfo(object):
def __init__(self, serialno, vid, pid):
self.serialno = serialno
self.PDS = None
self.CDS = None
self.BCI = None
self.RTCS = None
self._VID = vid
self._PID = pid
self.find_ports()
def find_ports(self):
description = self.desc_string
allports = serial.tools.list_ports.comports()
for port in allports:
if port[2] == description and port[1] == 'Payload Data Stream':
self.PDS = SerialPortID(port[0])
if port[2] == description and port[1] == 'Control Data Stream':
self.CDS = SerialPortID(port[0])
if port[2] == description and port[1] == 'Backchannel Interface':
self.BCI = SerialPortID(port[0])
@property
def has_payload_data_stream(self):
if self.PDS is not None:
return True
@property
def has_control_data_stream(self):
if self.CDS is not None:
return True
@property
def has_backchannel_interface(self):
if self.BCI is not None:
return True
@property
def has_realtime_control_stream(self):
if self.RTCS is not None:
return True
@property
def desc_string(self):
if self._VID == 0x2047 and self._PID == 0x0992:
return 'USB VID:PID=2047:0992 SNR=' + self.serialno
@property
def ports(self):
portlist = []
if self.has_backchannel_interface:
portlist.append(self.BCI.port)
if self.has_control_data_stream:
portlist.append(self.CDS.port)
if self.has_payload_data_stream:
portlist.append(self.PDS.port)
return portlist
@property
def udev_ports(self):
portlist = []
if self.has_backchannel_interface:
portlist.append(self.BCI.udev_port)
if self.has_control_data_stream:
portlist.append(self.CDS.udev_port)
if self.has_payload_data_stream:
portlist.append(self.PDS.udev_port)
return portlist
@property
def vid(self):
return self._VID
@property
def pid(self):
return self._PID
def __repr__(self):
repr_str = "QDevice :: \r\n"
repr_str += " Serial Number : " + self.serialno + "\r\n"
repr_str += " VID:PID : " + hex(self.vid) + ":" + hex(self.pid) + "\r\n"
if self.has_payload_data_stream:
repr_str += " Payload Datastream : " + self.PDS.port + "\r\n"
if self.has_control_data_stream:
repr_str += " Control Datastream : " + self.CDS.port + "\r\n"
if self.has_backchannel_interface:
repr_str += " Backchannel Interface : " + self.BCI.port + "\r\n"
return repr_str
class SerialPortID(object):
def __init__(self, porthandle):
self.port = porthandle
context = pyudev.Context()
self.udev_port = pyudev.Device.from_device_file(context, self.port)
def get_qdevices():
"""
This function uses pyusb to get all the supported devices presently
connected. This function is probably not a good fit for continuous
monitoring, but should be fine to find the devices at a specific
instance in response to some form of user interaction, such as:
- Program Init
- Button Press on something like "Force Re-scan"
Some other form of kernel level monitoring should be used for
continuous monitoring.
:rtype : List of QDevicePorts namedtuples
"""
busses = usb.busses()
qdevices = []
for bus in busses:
devices = bus.devices
for dev in devices:
handle = dev.open()
if (dev.idVendor, dev.idProduct) not in supported_devices:
continue
logger.info("Found QDevice")
logger.info("Address :: " + str(bus.dirname) + ":" + str(dev.filename))
logger.info(" VID:PID :: 0x{:04x}:0x{:04x}".format(dev.idVendor, dev.idProduct))
logger.debug(" Manufacturer: 0x{:x}".format(dev.iManufacturer))
try:
if dev.iManufacturer == 0:
logger.debug('')
else:
logger.debug(" --> '{}'".format(handle.getString(dev.iManufacturer, 255)))
except usb.USBError:
logger.warning("::Insufficient Permissions to read Manufacturer String")
logger.debug(" Product: 0x{:x}".format(dev.iProduct))
try:
if dev.iProduct == 0:
logger.debug('')
else:
logger.debug(" --> '{}'".format(handle.getString(dev.iProduct, 255)))
except usb.USBError:
logger.warning("::Insufficient Permissions to read Product String")
logger.debug(" Serial Number: 0x{:x}".format(dev.iSerialNumber))
try:
if dev.iSerialNumber == 0:
logger.info('')
else:
logger.info(" --> '{}'".format(handle.getString(dev.iSerialNumber, 255)))
except usb.USBError:
logger.warning("::Insufficient Permissions to read Serial Number")
sno = handle.getString(dev.iSerialNumber, 255)
qdevices.append(QDeviceInfo(sno, dev.idVendor, dev.idProduct))
return qdevices
connected_devices = ConnectedDevices()
class UdevMonitor(FileDescriptor):
"""
File Descriptor for pyudev.Monitor.
@see: U{http://packages.python.org/pyudev/api/monitor.html}.
"""
def __init__(self, _reactor, protocol, subsystem=None):
FileDescriptor.__init__(self, _reactor)
# Set up monitor
context = pyudev.Context()
self.monitor = pyudev.Monitor.from_netlink(context)
if subsystem:
self.monitor.filter_by(subsystem=subsystem)
# Connect protocol
assert isinstance(protocol, UdevMonitorListener)
self.protocol = protocol
self.protocol.makeConnection(self)
def fileno(self):
"""
Return monitor's file descriptor.
"""
return self.monitor.fileno()
def startReading(self):
"""
Start waiting for read availability.
"""
logger.debug("starting udev monitor fd")
self.monitor.start()
FileDescriptor.startReading(self)
def doRead(self):
"""
An event is ready, decode it through Monitor and call our protocol.
"""
logger.debug("udev reports event available")
event = self.monitor.receive_device()
if event:
action, device = event
self.protocol.eventReceived(action, device)
def writeSomeData(self, data):
raise IOError("You can't write to a udev Monitor")
class UdevMonitorListener(Protocol):
def __init__(self, _reactor=None):
if _reactor is not None:
self._reactor = _reactor
else:
self._reactor = reactor
self.subsystem = 'tty'
self.monitor = None
def startListening(self):
logger.info("Starting UdevMonitorListener")
self.monitor = UdevMonitor(self._reactor, self, self.subsystem)
self.monitor.startReading()
def eventReceived(self, action, device):
if device in connected_devices.udev_ports:
if action == u'remove':
connected_devices.remove_by_udev_port(device)
if action == u'add':
if is_device_supported_from_udev_port(device):
if device not in connected_devices.udev_ports:
connected_devices.append_by_udev_port(device)
def init(_reactor=None):
monitor_protocol = UdevMonitorListener(_reactor)
monitor_protocol.startListening()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment