Skip to content

Instantly share code, notes, and snippets.

@AndBondStyle
Created January 13, 2024 22:44
Show Gist options
  • Save AndBondStyle/76170571d28412a8ca2f8afbe06c48c1 to your computer and use it in GitHub Desktop.
Save AndBondStyle/76170571d28412a8ca2f8afbe06c48c1 to your computer and use it in GitHub Desktop.
from odrive.pyfibre.fibre.libfibre import *
import select
tasks = [] # time, callback, args...
now = time.perf_counter
# super stupid "event loop" implementation
def spin():
for _ in range(10):
delete = []
for i, task in enumerate(tasks.copy()):
when, callback, *args = task
if when <= now():
# print("-->", callback)
callback(*args)
# print("<--")
delete.append(i)
for i in reversed(delete):
del tasks[i]
# "coroutine" that watches file descriptior for read/write access
def watch_fd(fd, mode, callback, ctx):
if mode == "read":
rlist, _, _ = select.select([fd], [], [], 0)
if rlist:
print(">>> read", fd)
tasks.append((now(), callback, ctx, 1))
# return
if mode == "write":
_, wlist, _ = select.select([], [fd], [], 0)
if wlist:
print(">>> write", fd)
tasks.append((now(), callback, ctx, 4))
# return
tasks.append((now(), watch_fd, fd, mode, callback, ctx))
# debug decorator
def logcall(func):
def deco(*args):
print("[call]", func.__name__, *args)
return func(*args)
return deco
# =========================
# libfibre callbacks impl
# =========================
@logcall
def _post(callback, ctx):
tasks.append((now(), callback, ctx))
return 0
@logcall
def _register_event(event_fd, events, callback, ctx):
assert not events & 0xfffffffa
if events & 1:
tasks.append((now(), watch_fd, event_fd, "read", callback, ctx))
if events & 4:
tasks.append((now(), watch_fd, event_fd, "write", callback, ctx))
return 0
@logcall
def _deregister_event(event_fd):
return 0
@logcall
def _call_later(delay, callback, ctx):
return 0
@logcall
def _cancel_timer(timer_id):
return 0
@logcall
def _on_found_object(ctx, obj, intf):
pass
@logcall
def _on_lost_object(ctx, obj):
pass
@logcall
def _on_discovery_stopped(ctx, result):
pass
@logcall
def _on_attribute_added(ctx, attr, name, name_length, subintf, subintf_name, subintf_name_length):
pass
@logcall
def _on_attribute_removed(ctx, attr):
pass
@logcall
def _on_function_added(ctx, func, name, name_length, input_names, input_codecs, output_names, output_codecs):
pass
@logcall
def _on_function_removed(ctx, func):
pass
@logcall
def _on_call_completed(ctx, status, tx_end, rx_end, tx_buf, tx_len, rx_buf, rx_len):
pass
c_post = PostSignature(_post)
c_register_event = RegisterEventSignature(_register_event)
c_deregister_event = DeregisterEventSignature(_deregister_event)
c_call_later = CallLaterSignature(_call_later)
c_cancel_timer = CancelTimerSignature(_cancel_timer)
c_on_found_object = OnFoundObjectSignature(_on_found_object)
c_on_lost_object = OnLostObjectSignature(_on_lost_object)
c_on_discovery_stopped = OnStoppedSignature(_on_discovery_stopped)
c_on_attribute_added = OnAttributeAddedSignature(_on_attribute_added)
c_on_attribute_removed = OnAttributeRemovedSignature(_on_attribute_removed)
c_on_function_added = OnFunctionAddedSignature(_on_function_added)
c_on_function_removed = OnFunctionRemovedSignature(_on_function_removed)
c_on_call_completed = OnCallCompletedSignature(_on_call_completed)
event_loop = LibFibreEventLoop()
event_loop.post = c_post
event_loop.register_event = c_register_event
event_loop.deregister_event = c_deregister_event
event_loop.call_later = c_call_later
event_loop.cancel_timer = c_cancel_timer
# entrypoint
ctx = c_void_p(libfibre_open(event_loop))
assert ctx
spin()
path = 'usb:idVendor=0x1209,idProduct=0x0D32,bInterfaceClass=0,bInterfaceSubClass=1,bInterfaceProtocol=0'
buf = path.encode('ascii')
domain_handle = libfibre_open_domain(ctx, buf, len(buf))
print(domain_handle)
spin()
discovery_handle = c_void_p(0)
discovery_id = 0
libfibre_start_discovery(domain_handle, byref(discovery_handle), c_on_found_object, c_on_lost_object, c_on_discovery_stopped, discovery_id)
# "event loop"
while True:
spin()
time.sleep(0.001)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment