Skip to content

Instantly share code, notes, and snippets.

@takaswie
Last active September 17, 2020 03:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save takaswie/838dea858f430aecef26fabd38f0663e to your computer and use it in GitHub Desktop.
Save takaswie/838dea858f430aecef26fabd38f0663e to your computer and use it in GitHub Desktop.
Avid Adlenarine asynchronous protocol
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (c) 2020 Takashi Sakamoto
import gi
gi.require_version('GLib', '2.0')
gi.require_version('Hinawa', '3.0')
from gi.repository import GLib, Hinawa
from threading import Thread, RLock, Condition
CMD_OFFSET = 0xfffff0e00000
RESP_OFFSET = 0xfffff0e00800
FRAME_SIZE = 0x800 # Approximately.
TIMEOUT_MS = 100
node = Hinawa.FwNode.new()
node.open('/dev/fw1')
ctx = GLib.MainContext.new()
src = node.create_source()
src.attach(ctx)
dispatcher = GLib.MainLoop.new(ctx, False)
th = Thread(target=lambda d: d.run(), args=(dispatcher, ))
th.start()
while not dispatcher.is_running():
pass
class AvidProto(Hinawa.FwResp):
def __init__(self):
super().__init__()
self.cv = Condition(RLock())
self.node = None
self.cv = None
self.resp_frame = None
def bind(self, node):
self.reserve(node, RESP_OFFSET, FRAME_SIZE)
self.node = node
self.cv = Condition(RLock())
def unbind(self):
self.release()
self.node = None
self.cv = None
def transaction(self, cmd):
req = Hinawa.FwReq.new()
self.resp_frame = None
req.transaction_sync(self.node, Hinawa.FwTcode.WRITE_BLOCK_REQUEST, CMD_OFFSET, len(cmd), cmd, TIMEOUT_MS)
with self.cv:
res = None
while self.resp_frame == None:
res = self.cv.wait(TIMEOUT_MS)
if not res:
raise IOError
resp_frame = self.resp_frame
self.resp_frame = None
return resp_frame
def do_requested2(self, rcode, offset, src, dst, card, generation, frame, length):
with self.cv:
self.resp_frame = bytearray(frame)
self.cv.notify()
return Hinawa.FwRcode.COMPLETE
proto = AvidProto()
proto.bind(node)
# For example.
resp = proto.transaction([0x0b, 0x0b, 0x00, 0x06, 0x40, 0x06, 0x00, 0x00, 0x00, 0x01])
# Expected: 0b 0b 00 04 40 06 00 00
for i, b in enumerate(resp):
print('{:2d}: {:02x}'.format(i, b))
proto.unbind()
dispatcher.quit()
th.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment