Created
December 19, 2012 13:27
-
-
Save eblot/4336647 to your computer and use it in GitHub Desktop.
Simple driver for printer device based on pyusb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# MIT license | |
# Emmanuel Blot <emmanuel.blot@free.fr> | |
from base64 import b64decode | |
from util import BitField | |
import errno | |
import os | |
import usb.core | |
import usb.util | |
class PrinterError(Exception): | |
"""Base class for printer errors""" | |
class PrinterIOError(IOError): | |
"""Base class for IO error""" | |
def __init__(self, errno, strerror): | |
"""Initialize the object.""" | |
IOError.__init__(self, errno, strerror) | |
class PrinterConfigError(PrinterError): | |
"""Base class for configuration error""" | |
class PrinterOnError(PrinterError): | |
"""Printer is on error""" | |
class PrinterNotReadyError(PrinterError): | |
"""Printer is not ready""" | |
class PrinterOutOfPaperError(PrinterError): | |
"""Printer is out of paper""" | |
class Printer(object): | |
"""USB Printer driver""" | |
# Definitions for an USB Printer | |
PRINTER_INTERFACE_CLASS = 0x07 | |
PRINTER_INTERFACE_SUBCLASS = 0x01 | |
PRINTER_INTERFACE_PROTOCOL = ( | |
'reserved', 'unidir', 'bidir', '1284.4' | |
) | |
# Requests supported with a USB Printer | |
GET_DEVICE_ID = 0 | |
GET_PORT_STATUS = 1 | |
SOFT_RESET = 2 | |
def __init__(self): | |
self._cleanup() | |
# Public API (RPC) | |
def get_info(self): | |
"""Provides printer characteristics as a dictionary. | |
""" | |
if not self._device: | |
self._detect() | |
if not self._info: | |
self._info = self._get_device_id() | |
return self._info | |
def get_status(self): | |
"""Provides printer status as a dictionary. | |
""" | |
if not self._device: | |
self._detect() | |
bf = self._get_status() | |
status = { | |
'ready': Printer.selected(bf), | |
'paper_empty': Printer.paper_empty(bf), | |
'on_error': Printer.on_error(bf), | |
} | |
return status | |
def soft_reset(self): | |
"""Requests the printer to perform a software reset. | |
""" | |
if not self._device: | |
self._detect() | |
request = usb.util.build_request_type( | |
usb.util.CTRL_OUT, | |
usb.util.CTRL_TYPE_CLASS, | |
usb.util.CTRL_RECIPIENT_OTHER) | |
# does not seem supported | |
self._ctrl_transfer_out(request, Printer.SOFT_RESET, 0) | |
def write(self, data): | |
"""Print data. | |
Returns the count of printed bytes. | |
""" | |
self._check_status() | |
offset = 0 | |
size = len(data) | |
try: | |
while offset < size: | |
write_size = self._out_ep.wMaxPacketSize | |
if offset + write_size > size: | |
write_size = size - offset | |
length = self._device.write(self._out_ep.bEndpointAddress, | |
data[offset:offset+write_size], | |
self._interface, | |
self.usb_write_timeout) | |
if length <= 0: | |
raise PrinterIOError(errno.ENODEV, | |
"USB bulk write error") | |
offset += length | |
return offset | |
except usb.core.USBError, e: | |
self._check_status() | |
raise PrinterIOError(e.errno, 'UsbError: %s' % str(e)) | |
# Private implementation | |
@staticmethod | |
def on_error(status): | |
return not bool(status[3]) | |
@staticmethod | |
def selected(status): | |
return bool(status[4]) | |
@staticmethod | |
def paper_empty(status): | |
return bool(status[5]) | |
def _cleanup(self): | |
self.usb_write_timeout = 1000 | |
self.usb_read_timeout = 2000 | |
self._interface = None | |
self._config = None | |
self._device = None | |
self._info = None | |
self._in_ep = None | |
self._out_ep = None | |
def _detect(self): | |
if self._device is not None: | |
raise PrinterConfigError('Already detected') | |
def is_printer(device): | |
for config in device: | |
intf = config[(config.bNumInterfaces-1, 0)] | |
if (Printer.PRINTER_INTERFACE_CLASS != \ | |
intf.bInterfaceClass): | |
continue | |
if (Printer.PRINTER_INTERFACE_SUBCLASS != | |
intf.bInterfaceSubClass): | |
continue | |
if intf.bInterfaceProtocol > \ | |
len(Printer.PRINTER_INTERFACE_PROTOCOL[1:-1]): | |
continue | |
if intf.bNumEndpoints != intf.bInterfaceProtocol: | |
continue | |
return True | |
return False | |
printers = usb.core.find(find_all=True, custom_match=is_printer) | |
if not printers: | |
raise PrinterError('No printer connected') | |
# Only consider the first printer | |
printer = printers[0] | |
config = printer.get_active_configuration() | |
intf = config[(config.bNumInterfaces-1, 0)] | |
# unidir uses one endpoint, bidir uses two endpoints | |
for epix in range(intf.bNumEndpoints): | |
ep = intf[epix] | |
if ep.bEndpointAddress & 0x80: | |
assert(self._in_ep is None) | |
self._in_ep = ep | |
else: | |
assert(self._out_ep is None) | |
self._out_ep = ep | |
self._interface = intf | |
self._config = config | |
self._device = printer | |
def _get_device_id(self): | |
request = usb.util.build_request_type( | |
usb.util.CTRL_IN, | |
usb.util.CTRL_TYPE_CLASS, | |
usb.util.CTRL_RECIPIENT_INTERFACE) | |
data = self._ctrl_transfer_in(request, Printer.GET_DEVICE_ID, 256) | |
info = data.tostring()[2:] | |
values = dict() | |
for item in info.split(';'): | |
try: | |
k, v = item.split(':', 1) | |
values[k.lower()] = v | |
except ValueError: | |
pass | |
# update with USB data, so that USB takes over same kyes, if any | |
values.update( | |
{ 'vid': self._device.idVendor, | |
'pid': self._device.idProduct, | |
'product': usb.util.get_string(self._device, 64, | |
self._device.iProduct).encode('utf-8'), | |
'serial': usb.util.get_string(self._device, 64, | |
self._device.iSerialNumber).encode('utf-8') }) | |
return values | |
def _get_status(self): | |
request = usb.util.build_request_type( | |
usb.util.CTRL_IN, | |
usb.util.CTRL_TYPE_CLASS, | |
usb.util.CTRL_RECIPIENT_INTERFACE) | |
data = self._ctrl_transfer_in(request, Printer.GET_PORT_STATUS, 1) | |
bf = BitField(data[0]) | |
return bf | |
def _check_status(self): | |
status = self._get_status() | |
if Printer.paper_empty(status): | |
raise PrinterOutOfPaperError("out of paper") | |
if not Printer.selected(status): | |
raise PrinterReadyError("not ready") | |
if Printer.on_error(status): | |
raise PrinterOnError("general error") | |
def _ctrl_transfer_out(self, request, reqcode, value, data=''): | |
"""Send a control message to the device""" | |
try: | |
return self._device.ctrl_transfer( | |
request, reqcode, value, | |
self._interface.bInterfaceNumber, | |
data, self.usb_write_timeout) | |
except usb.core.USBError, e: | |
if errno.ENODEV == e.errno: | |
self._cleanup() | |
raise PrinterIOError(e.errno, 'UsbError: %s' % str(e)) | |
def _ctrl_transfer_in(self, request, reqcode, length): | |
"""Request for a control message from the device""" | |
try: | |
return self._device.ctrl_transfer( | |
request, reqcode, 0, | |
self._interface.bInterfaceNumber, | |
length, self.usb_read_timeout) | |
except usb.core.USBError, e: | |
if errno.ENODEV == e.errno: | |
self._cleanup() | |
raise PrinterIOError(e.errno, 'UsbError: %s' % str(e)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Salut,
Allways the same message : could not claim interface 0: Device or resource busy ...
How i can find the value GET_DEVICE_ID, GET_PORT_STATUS, SOFT_RESET for my printer model ?