Created
March 16, 2017 16:46
-
-
Save CLTanuki/c970f84441b5efc5b6eef70da6025d13 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import logging | |
import struct | |
import crcmod.predefined | |
import ctypes | |
import atexit | |
from binascii import unhexlify, hexlify | |
__all__ = ['HIDException', 'DeviceInfo', 'Device', 'enumerate'] | |
hidapi = None | |
library_paths = ( | |
'libhidapi-libusb.so', | |
'libhidapi-libusb.so.0', | |
) | |
for lib in library_paths: | |
try: | |
hidapi = ctypes.cdll.LoadLibrary(lib) | |
break | |
except OSError: | |
pass | |
hidapi.hid_init() | |
atexit.register(hidapi.hid_exit) | |
class HIDException(Exception): | |
pass | |
class DeviceInfo(ctypes.Structure): | |
def as_dict(self): | |
ret = {} | |
for name, type in self._fields_: | |
if name == 'next': | |
continue | |
ret[name] = getattr(self, name, None) | |
return ret | |
DeviceInfo._fields_ = [ | |
('path', ctypes.c_char_p), | |
('vendor_id', ctypes.c_ushort), | |
('product_id', ctypes.c_ushort), | |
('serial_number', ctypes.c_wchar_p), | |
('release_number', ctypes.c_ushort), | |
('manufacturer_string', ctypes.c_wchar_p), | |
('product_string', ctypes.c_wchar_p), | |
('usage_page', ctypes.c_ushort), | |
('usage', ctypes.c_ushort), | |
('interface_number', ctypes.c_int), | |
('next', ctypes.POINTER(DeviceInfo)), | |
] | |
hidapi.hid_enumerate.restype = ctypes.POINTER(DeviceInfo) | |
def enumerate(vid=0, pid=0): | |
ret = [] | |
info = hidapi.hid_enumerate(vid, pid) | |
c = info | |
while c: | |
ret.append(c.contents.as_dict()) | |
c = c.contents.next | |
hidapi.hid_free_enumeration(info) | |
return ret | |
class Device(object): | |
def __init__(self, vid=None, pid=None, serial=None, path=None): | |
if path: | |
print('Opening with path {}.'.format(path)) | |
self.__dev = hidapi.hid_open_path(path) | |
elif serial: | |
print('Opening with serial {}.'.format(serial)) | |
serial = ctypes.create_unicode_buffer(serial) | |
self.__dev = hidapi.hid_open(vid, pid, serial) | |
elif vid and pid: | |
print('Opening with vid {}/pid {}.'.format(vid, pid)) | |
self.__dev = hidapi.hid_open(vid, pid, None) | |
else: | |
raise ValueError('Specify vid/pid or path.') | |
print('{} by {}.'.format(self.product, self.manufacturer)) | |
if self.__dev == 0: | |
raise HIDException('Unable to open device.') | |
def __hidcall(self, function, *args, **kwargs): | |
if self.__dev == 0: | |
raise HIDException('Device closed.') | |
ret = function(*args, **kwargs) | |
if ret == -1: | |
err = hidapi.hid_error(self.__dev) | |
raise HIDException(err) | |
return ret | |
def __readstring(self, function, max_length=255): | |
buf = ctypes.create_unicode_buffer(max_length) | |
self.__hidcall(function, self.__dev, buf, max_length) | |
return buf.value | |
def write(self, data): | |
return self.__hidcall(hidapi.hid_write, self.__dev, data, len(data)) | |
def read(self, size, timeout=None): | |
data = ctypes.create_string_buffer(size) | |
if timeout is None: | |
size = self.__hidcall(hidapi.hid_read, self.__dev, data, size) | |
else: | |
size = self.__hidcall( | |
hidapi.hid_read_timeout, self.__dev, data, size, timeout) | |
# print(data.raw, len(data), size) | |
return data.raw[:size] | |
def send_feature_report(self, data): | |
return self.__hidcall(hidapi.hid_send_feature_report, | |
self.__dev, data, len(data)) | |
def get_feature_report(self, report_id, size): | |
data = ctypes.create_string_buffer(size) | |
# Pass the id of the report to be read. | |
data[0] = chr(report_id) | |
size = self.__hidcall( | |
hidapi.hid_get_feature_report, self.__dev, data, size) | |
print(data.raw, len(data)) | |
return data.raw[:size] | |
def close(self): | |
if self.__dev != 0: | |
hidapi.hid_close(self.__dev) | |
self.__dev = 0 | |
@property | |
def nonblocking(self): | |
return getattr(self, '_nonblocking', 0) | |
@nonblocking.setter | |
def nonblocking(self, value): | |
self.__hidcall(hidapi.hid_set_nonblocking, self.__dev, value) | |
setattr(self, '_nonblocking', value) | |
@property | |
def manufacturer(self): | |
return self.__readstring(hidapi.hid_get_manufacturer_string) | |
@property | |
def product(self): | |
return self.__readstring(hidapi.hid_get_product_string) | |
@property | |
def serial(self): | |
return self.__readstring(hidapi.hid_get_serial_number_string) | |
def get_indexed_string(self, index, max_length=255): | |
buf = ctypes.create_unicode_buffer(max_length) | |
self.__hidcall(hidapi.hid_get_indexed_string, | |
self.__dev, index, buf, max_length) | |
return buf.value | |
class Command(object): | |
# Configuring serial to use with reader | |
# Command settings | |
version = b'ViVOtech2\0' | |
vid = 0x1d5f | |
pid = 0x0100 | |
code = None | |
sub_command = None | |
value = b'' | |
package = None | |
processed = False | |
crc = crcmod.Crc(poly=0x11021, initCrc=0xFFFF, rev=False, xorOut=0x0000) | |
def __init__(self, path, verbose=False, log_name='__main__'): | |
self.device = None | |
self.device = Device(path=path) | |
if not self.device: | |
exit() | |
self._logger = logging.getLogger(__name__) | |
self.crc.new() | |
if verbose or __name__ == '__main__': | |
ch = logging.StreamHandler() | |
self._logger.addHandler(ch) | |
self._logger.setLevel(logging.DEBUG) | |
def prepare(self): | |
value = unhexlify(str(self.data, 'utf-8')) | |
self._logger.debug('PREPARING [raw_package]: {}'.format(value)) | |
self.crc.update(self.version + value) | |
crc = bytearray(self.crc.digest()) | |
crc.reverse() | |
self.package = value + crc | |
@property | |
def data(self): | |
if not self.processed: | |
data_len = hexlify(struct.pack('>H', len(self.value))) | |
data = b''.join([self.code, self.sub_command, data_len, self.value]) | |
self._logger.debug('PREPARING [data]: {}'.format(data)) | |
else: | |
data = None | |
return data | |
def process(self): | |
self.prepare() | |
self._logger.debug('PROCESSING [package]: {}'.format(self.package)) | |
self.send() | |
def send(self): | |
self._logger.debug(self.package) | |
self.package += b'\x00' * (53 - len(self.package)) | |
self.package = b'\x01\x56\x69\x56\x4F\x74\x65\x63\x68\x32\x00' + self.package | |
self.device.write(self.package) | |
res = self.device.read(128, timeout=5000) | |
self._logger.debug(res.decode('unicode_escape').encode('utf-8')) | |
self._logger.debug(hexlify(res[11:])) | |
class Ping(Command): | |
code = b'18' | |
sub_command = b'01' | |
def process(self): | |
res = super().process() | |
if res: | |
return res.get(1), res.get(2) | |
return None, None | |
class SetConfigurationDefaults(Command): | |
code = b'04' | |
sub_command = b'09' | |
class SetPoll(Command): | |
code = b'01' | |
sub_command = b'01' | |
shortcuts = {'auto': b'00', | |
'demand': b'01'} | |
def __init__(self, mode='auto'): | |
self.value = self.shortcuts.get(mode) | |
super().__init__() | |
class GetTransactionResult(Command): | |
code = b'03' | |
sub_command = b'00' | |
def process(self): | |
res = super().process() | |
if res: | |
return res.get(1), res.get(2) | |
return None, None | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='VivoPay Kiosk II communication.') | |
parser.add_argument('--path', metavar='-P', type=str, default='/dev/ACM0', help='port name') | |
args = parser.parse_args() | |
args = vars(args) | |
port = bytes(args.get('path'), 'utf-8') | |
for dev in enumerate(): | |
cmd = None | |
if dev['product_string'] == 'ViVOpay Kiosk II': | |
try: | |
cmd = Ping(dev['path']) | |
cmd.process() | |
print(dev) | |
finally: | |
if cmd: | |
cmd.device.close() | |
if __name__ == '__main__': | |
cmd = Ping() | |
cmd.process() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment