Skip to content

Instantly share code, notes, and snippets.

@CLTanuki
Created March 16, 2017 16:46
Show Gist options
  • Save CLTanuki/c970f84441b5efc5b6eef70da6025d13 to your computer and use it in GitHub Desktop.
Save CLTanuki/c970f84441b5efc5b6eef70da6025d13 to your computer and use it in GitHub Desktop.
import argparse
import logging
import struct
import crcmod.predefined
import ctypes
import atexit
from binascii import unhexlify, hexlify
__all__ = ['HIDException', 'DeviceInfo', 'Device', 'enumerate']
hidapi = None
library_paths = (
'libhidapi-libusb.so',
'libhidapi-libusb.so.0',
)
for lib in library_paths:
try:
hidapi = ctypes.cdll.LoadLibrary(lib)
break
except OSError:
pass
hidapi.hid_init()
atexit.register(hidapi.hid_exit)
class HIDException(Exception):
pass
class DeviceInfo(ctypes.Structure):
def as_dict(self):
ret = {}
for name, type in self._fields_:
if name == 'next':
continue
ret[name] = getattr(self, name, None)
return ret
DeviceInfo._fields_ = [
('path', ctypes.c_char_p),
('vendor_id', ctypes.c_ushort),
('product_id', ctypes.c_ushort),
('serial_number', ctypes.c_wchar_p),
('release_number', ctypes.c_ushort),
('manufacturer_string', ctypes.c_wchar_p),
('product_string', ctypes.c_wchar_p),
('usage_page', ctypes.c_ushort),
('usage', ctypes.c_ushort),
('interface_number', ctypes.c_int),
('next', ctypes.POINTER(DeviceInfo)),
]
hidapi.hid_enumerate.restype = ctypes.POINTER(DeviceInfo)
def enumerate(vid=0, pid=0):
ret = []
info = hidapi.hid_enumerate(vid, pid)
c = info
while c:
ret.append(c.contents.as_dict())
c = c.contents.next
hidapi.hid_free_enumeration(info)
return ret
class Device(object):
def __init__(self, vid=None, pid=None, serial=None, path=None):
if path:
print('Opening with path {}.'.format(path))
self.__dev = hidapi.hid_open_path(path)
elif serial:
print('Opening with serial {}.'.format(serial))
serial = ctypes.create_unicode_buffer(serial)
self.__dev = hidapi.hid_open(vid, pid, serial)
elif vid and pid:
print('Opening with vid {}/pid {}.'.format(vid, pid))
self.__dev = hidapi.hid_open(vid, pid, None)
else:
raise ValueError('Specify vid/pid or path.')
print('{} by {}.'.format(self.product, self.manufacturer))
if self.__dev == 0:
raise HIDException('Unable to open device.')
def __hidcall(self, function, *args, **kwargs):
if self.__dev == 0:
raise HIDException('Device closed.')
ret = function(*args, **kwargs)
if ret == -1:
err = hidapi.hid_error(self.__dev)
raise HIDException(err)
return ret
def __readstring(self, function, max_length=255):
buf = ctypes.create_unicode_buffer(max_length)
self.__hidcall(function, self.__dev, buf, max_length)
return buf.value
def write(self, data):
return self.__hidcall(hidapi.hid_write, self.__dev, data, len(data))
def read(self, size, timeout=None):
data = ctypes.create_string_buffer(size)
if timeout is None:
size = self.__hidcall(hidapi.hid_read, self.__dev, data, size)
else:
size = self.__hidcall(
hidapi.hid_read_timeout, self.__dev, data, size, timeout)
# print(data.raw, len(data), size)
return data.raw[:size]
def send_feature_report(self, data):
return self.__hidcall(hidapi.hid_send_feature_report,
self.__dev, data, len(data))
def get_feature_report(self, report_id, size):
data = ctypes.create_string_buffer(size)
# Pass the id of the report to be read.
data[0] = chr(report_id)
size = self.__hidcall(
hidapi.hid_get_feature_report, self.__dev, data, size)
print(data.raw, len(data))
return data.raw[:size]
def close(self):
if self.__dev != 0:
hidapi.hid_close(self.__dev)
self.__dev = 0
@property
def nonblocking(self):
return getattr(self, '_nonblocking', 0)
@nonblocking.setter
def nonblocking(self, value):
self.__hidcall(hidapi.hid_set_nonblocking, self.__dev, value)
setattr(self, '_nonblocking', value)
@property
def manufacturer(self):
return self.__readstring(hidapi.hid_get_manufacturer_string)
@property
def product(self):
return self.__readstring(hidapi.hid_get_product_string)
@property
def serial(self):
return self.__readstring(hidapi.hid_get_serial_number_string)
def get_indexed_string(self, index, max_length=255):
buf = ctypes.create_unicode_buffer(max_length)
self.__hidcall(hidapi.hid_get_indexed_string,
self.__dev, index, buf, max_length)
return buf.value
class Command(object):
# Configuring serial to use with reader
# Command settings
version = b'ViVOtech2\0'
vid = 0x1d5f
pid = 0x0100
code = None
sub_command = None
value = b''
package = None
processed = False
crc = crcmod.Crc(poly=0x11021, initCrc=0xFFFF, rev=False, xorOut=0x0000)
def __init__(self, path, verbose=False, log_name='__main__'):
self.device = None
self.device = Device(path=path)
if not self.device:
exit()
self._logger = logging.getLogger(__name__)
self.crc.new()
if verbose or __name__ == '__main__':
ch = logging.StreamHandler()
self._logger.addHandler(ch)
self._logger.setLevel(logging.DEBUG)
def prepare(self):
value = unhexlify(str(self.data, 'utf-8'))
self._logger.debug('PREPARING [raw_package]: {}'.format(value))
self.crc.update(self.version + value)
crc = bytearray(self.crc.digest())
crc.reverse()
self.package = value + crc
@property
def data(self):
if not self.processed:
data_len = hexlify(struct.pack('>H', len(self.value)))
data = b''.join([self.code, self.sub_command, data_len, self.value])
self._logger.debug('PREPARING [data]: {}'.format(data))
else:
data = None
return data
def process(self):
self.prepare()
self._logger.debug('PROCESSING [package]: {}'.format(self.package))
self.send()
def send(self):
self._logger.debug(self.package)
self.package += b'\x00' * (53 - len(self.package))
self.package = b'\x01\x56\x69\x56\x4F\x74\x65\x63\x68\x32\x00' + self.package
self.device.write(self.package)
res = self.device.read(128, timeout=5000)
self._logger.debug(res.decode('unicode_escape').encode('utf-8'))
self._logger.debug(hexlify(res[11:]))
class Ping(Command):
code = b'18'
sub_command = b'01'
def process(self):
res = super().process()
if res:
return res.get(1), res.get(2)
return None, None
class SetConfigurationDefaults(Command):
code = b'04'
sub_command = b'09'
class SetPoll(Command):
code = b'01'
sub_command = b'01'
shortcuts = {'auto': b'00',
'demand': b'01'}
def __init__(self, mode='auto'):
self.value = self.shortcuts.get(mode)
super().__init__()
class GetTransactionResult(Command):
code = b'03'
sub_command = b'00'
def process(self):
res = super().process()
if res:
return res.get(1), res.get(2)
return None, None
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='VivoPay Kiosk II communication.')
parser.add_argument('--path', metavar='-P', type=str, default='/dev/ACM0', help='port name')
args = parser.parse_args()
args = vars(args)
port = bytes(args.get('path'), 'utf-8')
for dev in enumerate():
cmd = None
if dev['product_string'] == 'ViVOpay Kiosk II':
try:
cmd = Ping(dev['path'])
cmd.process()
print(dev)
finally:
if cmd:
cmd.device.close()
if __name__ == '__main__':
cmd = Ping()
cmd.process()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment