Skip to content

Instantly share code, notes, and snippets.

@Evidlo
Created January 7, 2017 20:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Evidlo/2ef54e86a4e7e30f7105b97bb2ebbec1 to your computer and use it in GitHub Desktop.
Save Evidlo/2ef54e86a4e7e30f7105b97bb2ebbec1 to your computer and use it in GitHub Desktop.
#/bin/env python3
## Evan Widloski - 2017-01-06
## Asynchronous magstripe reader example
## Requires root and >= python 3.5
# Much taken from: http://python-evdev.readthedocs.io/en/latest/tutorial.html#reading-events-from-multiple-devices-using-asyncio
import asyncio, evdev, json, os
# MSR100 reader vendor/product ids
vendor = 0xc216
product = 0x0180
card_record = './cards.json'
# find card reader from vendor and product ids
try:
devices = [evdev.InputDevice(device) for device in evdev.list_devices()]
card_reader = next(device for device in devices
if device.info.vendor == vendor and device.info.product == product)
except StopIteration:
print('Device not found: productID: {0}, vendorID: {1}'.format(hex(vendor), hex(product)))
for device in devices:
device.close()
quit()
# take full control of reader (requires root)
card_reader.grab()
print('Connected to device: productID: {0}, vendorID: {1}'.format(hex(vendor), hex(product)))
swiped_card = []
# handle device events asynchronously
async def listen_cards(device, recording = False):
listening = False
try:
async for event in device.async_read_loop():
# only get keypress events
if event.type == evdev.ecodes.EV_KEY:
decoded_key = evdev.categorize(event)
# only get downpress
if decoded_key.keystate == decoded_key.key_down:
# new card reading has begun
if decoded_key.keycode == 'KEY_SEMICOLON':
swiped_card = []
listening = True
if listening:
swiped_card.append(decoded_key.keycode)
# card is finished reading
if decoded_key.keycode == 'KEY_ENTER':
if recording:
cards.append(swiped_card)
print('Added new card')
else:
if swiped_card in cards:
print('Recognized card')
else:
print('Failed to recognize card')
listening = False
except OSError as e:
print('Connection to reader lost.')
for device in devices:
device.close()
loop.stop()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-r",action="store_true",help="Record new cards")
args = parser.parse_args()
# record mode
if args.r:
recording = True
if not os.path.exists(card_record):
print('{0} not found. Creating...'.format(card_record))
cards = []
else:
# try to load cards from card_record
f = open(card_record, 'r')
try:
cards = json.loads(f.read())
except ValueError:
print('{0} seems to be corrupt. Exiting...'.format(card_record))
quit()
# listen mode
else:
recording = False
if not os.path.exists(card_record):
print('{0} not found. Run record script to add new cards.'.format(card_record))
quit()
f = open(card_record, 'r')
cards = json.loads(f.read())
# set up asynchronous bits
asyncio.ensure_future(listen_cards(card_reader, recording))
loop = asyncio.get_event_loop()
try:
loop.run_forever()
except KeyboardInterrupt as e:
print('Caught keyboard interrupt. Freeing reader...')
if args.r:
print('Saving recorded cards to {0}...'.format(card_record))
f = open(card_record, 'w')
f.write(json.dumps(cards))
f.close()
card_reader.ungrab()
for task in asyncio.Task.all_tasks():
task.cancel()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment