-
-
Save ta0kira/0af12c70620c6cf4bd82 to your computer and use it in GitHub Desktop.
A replacement file to fix issue #4 with pyxid (https://github.com/cedrus-opensource/pyxid/issues/4). Replace pyxid/pyxid/internal.py with this file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from struct import unpack | |
import time | |
from constants import NO_KEY_DETECTED, FOUND_KEY_DOWN, FOUND_KEY_UP, \ | |
KEY_RELEASE_BITMASK, INVALID_PORT_BITS | |
import pyxid | |
class XidConnection(object): | |
def __init__(self, serial_port, baud_rate=115200): | |
self.serial_port = serial_port | |
self.serial_port.baudrate = baud_rate | |
self.__needs_interbyte_delay = True | |
self.__xid_packet_size = 6 | |
self.__default_read_timeout = 0 | |
self.__bytes_in_buffer = 0 | |
self.__response_buffer = '' | |
self.__last_resp_pressed = False | |
self.__last_resp_port = 0 | |
self.__last_resp_key = 0 | |
self.__last_resp_rt = 0 | |
self.__first_valid_packet = -1 | |
self.__using_stim_tracker = False | |
# the set lines cmd on RB-series and other XID response | |
# devices begins with 'ah'. If, however, a StimTracker is | |
# being used, this will be set to 'mh' instead. 'mh' is to be | |
# used for StimTracker only. It has no effect on response devices. | |
self.__set_lines_cmd = 'ah'+chr(0)+chr(0) | |
self.__line_state = 0 | |
def set_using_stim_tracker(self, using_st=True): | |
if using_st: | |
self.__using_stim_tracker = True | |
self.__set_lines_cmd = 'mh'+chr(0)+chr(0) | |
self.__needs_interbyte_delay = False | |
else: | |
self.__using_stim_tracker = False | |
self.__set_lines_cmd = 'ah'+chr(0)+chr(0) | |
self.__needs_interbyte_delay = True | |
def clear_digital_output_lines(self, lines, leave_remaining_lines=False): | |
if lines not in range(0, 256): | |
raise ValueError('lines must be between 0 and 255') | |
local_lines = ~lines | |
if local_lines < 0: | |
local_lines += 256 | |
if leave_remaining_lines: | |
local_lines = local_lines & self.__line_state | |
if self.__using_stim_tracker: | |
self.__set_lines_cmd = 'mh'+chr(local_lines)+chr(0) | |
else: | |
tmp_lines = ~local_lines | |
if tmp_lines < 0: | |
tmp_lines += 256 | |
self.__set_lines_cmd = 'ah'+chr(tmp_lines)+chr(0) | |
self.write(str(self.__set_lines_cmd)) | |
self.__line_state = local_lines | |
def set_digital_output_lines(self, lines, leave_remaining_lines=False): | |
if lines not in range(0, 256): | |
raise ValueError('lines must be between 0 and 255') | |
if leave_remaining_lines: | |
lines |= self.__line_state | |
if self.__using_stim_tracker: | |
self.__set_lines_cmd = 'mh'+chr(lines)+chr(0) | |
else: | |
lines_tmp = ~lines | |
if lines_tmp < 0: | |
lines_tmp += 256 | |
self.__set_lines_cmd = 'ah'+chr(lines_tmp)+chr(0) | |
self.write(str(self.__set_lines_cmd)) | |
self.__line_state = lines | |
def flush_input(self): | |
self.serial_port.flushInput() | |
def flush_output(self): | |
self.serial_port.flushOutput() | |
def open(self): | |
self.serial_port.open() | |
self.flush_input() | |
self.flush_output() | |
def close(self): | |
self.serial_port.close() | |
def send_xid_command(self, command, bytes_expected=0, timeout=0.1): | |
self.write(command) | |
self.serial_port.timeout = timeout | |
response = self.read(bytes_expected) | |
self.serial_port.timeout = self.__default_read_timeout | |
return response | |
def read(self, bytes_to_read): | |
return self.serial_port.read(bytes_to_read) | |
def write(self, command): | |
bytes_written = 0 | |
if self.__needs_interbyte_delay: | |
for char in command: | |
bytes_written += self.serial_port.write(char) | |
time.sleep(0.001) | |
else: | |
bytes_written = self.serial_port.write(command) | |
return bytes_written | |
def check_for_keypress(self): | |
response = self.read(self.__bytes_in_buffer % 6 if (self.__bytes_in_buffer % 6) else 6) | |
response_found = NO_KEY_DETECTED | |
if len(response) > 0: | |
self.__bytes_in_buffer += len(response) | |
self.__response_buffer += response | |
response_found = self.xid_input_found() | |
return response_found | |
def xid_input_found(self): | |
input_found = NO_KEY_DETECTED | |
input_error = False | |
if self.__bytes_in_buffer >= 6: | |
last_byte_index = (self.__bytes_in_buffer // 6) * 6 - self.__xid_packet_size | |
i = 0 | |
while i <= last_byte_index: | |
try: | |
(k, params, time) = unpack('<cBI', | |
self.__response_buffer[ | |
last_byte_index: | |
last_byte_index+6]) | |
except Exception: | |
input_error = True | |
i += 1 | |
continue | |
if (k == 'k' and | |
(params & INVALID_PORT_BITS) == 0 and | |
self.__response_buffer[i+5] == '\x00'): | |
# found a valid XID packet | |
self.__first_valid_packet = i | |
self.__last_resp_pressed = ( | |
params & KEY_RELEASE_BITMASK) == KEY_RELEASE_BITMASK | |
self.__last_resp_port = params & 0x0F; | |
key = ((params & 0xE0) >> 5) | |
if key == 0: | |
key = 8 | |
self.__last_resp_key = key | |
self.__last_resp_rt = time | |
if self.__last_resp_pressed: | |
input_found = FOUND_KEY_DOWN | |
else: | |
input_found = FOUND_KEY_UP | |
i += 1 | |
if input_error: | |
self.__response_buffer = '' | |
self.__bytes_in_buffer = 0 | |
return input_found | |
def get_current_response(self): | |
""" | |
reads the current response data from the object and returns | |
it in a dict. | |
Currently 'time' is reported as 0 until clock drift issues are | |
resolved. | |
""" | |
response_time = (self.__last_resp_rt if pyxid.use_response_pad_timer | |
else 0) | |
response = {'time': response_time, | |
'pressed': self.__last_resp_pressed, | |
'key': self.__last_resp_key, | |
'port': self.__last_resp_port} | |
self.remove_current_response() | |
return response | |
def remove_current_response(self): | |
if self.__first_valid_packet != -1: | |
self.__response_buffer = self.__response_buffer[ | |
self.__first_valid_packet + self.__xid_packet_size:] | |
self.__bytes_in_buffer -= self.__xid_packet_size | |
self.__first_valid_packet = -1 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment