Skip to content

Instantly share code, notes, and snippets.

@sheeley
Created May 27, 2014 17:36
Show Gist options
  • Save sheeley/1971d372928d1786ab99 to your computer and use it in GitHub Desktop.
Save sheeley/1971d372928d1786ab99 to your computer and use it in GitHub Desktop.
class RFID(object):
def __init__(self):
self.initial_rospec_clear = False
def clear_rospec(self, proto):
if not self.initial_rospec_clear:
logger.info('rfid clearing rospec')
self.initial_rospec_clear = True
return proto.stopPolitely()
def tag_seen_callback(self, llrpMsg):
"""Function to run each time the reader reports seeing tags."""
tags = llrpMsg.msgdict['RO_ACCESS_REPORT']['TagReportData']
if tags:
now = ceil(time())
for tag in tags:
tag['lastSeen'] = now
tag_id = tag['EPC-96']
original_tag = self.all_tags.get(tag_id)
if not original_tag:
original_tag = {}
if not original_tag.get('firstSeen'):
tag['firstSeen'] = now
original_tag.update(tag)
self.all_tags[tag_id] = original_tag
smokesignal.emit(RFID, {
'host': self.rfid_config['host'],
'tags': self.all_tags,
'type': RFID
})
def start_rfid(self):
logger.info('running rfid')
config = self.rfid_config.copy()
hosts = config.get('host')
if isinstance(hosts, basestring):
hosts = [hosts]
port = config.get('port')
del config['port']
del config['host']
self.fac = LLRPClientFactory(**config)
self.fac.addTagReportCallback(self.tag_seen_callback)
self.fac.addStateCallback(LLRPClient.STATE_CONNECTED, self.clear_rospec)
for host in hosts:
reactor.connectTCP(host, port, self.fac, timeout=3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment