-
-
Save itzexor/d77b9449a8b4c8f1467d865efa8c05e0 to your computer and use it in GitHub Desktop.
Revisions
-
itzexor revised this gist
Apr 7, 2020 . 1 changed file with 25 additions and 19 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,7 +1,9 @@ #!/usr/bin/python3 import asyncio import time from evdev import InputDevice, UInput from evdev.ecodes import * from types import SimpleNamespace # NEC protocol # scancode button name @@ -34,9 +36,6 @@ # delay in seconds between "keepalive" 0x0 scancode generation KEEPALIVE_DELAY = 1 KEY_TO_SCAN = { KEY_MUTE: 0x3818, KEY_VOLUMEUP: 0x380e, @@ -47,7 +46,7 @@ KEY_H: 0x3813 # source } async def listener(state): # grab ir device and add a fake ir device to forward events to # this will probably break if any other input devices are ever present on boot dev = InputDevice("/dev/input/event0") @@ -57,30 +56,37 @@ async def listener(queue): async for ev in dev.async_read_loop(): if ev.type == EV_KEY and ev.code in KEY_TO_SCAN: # 0 = release, 1 = down, 2 = repeat if ev.value != 0: await state.tx_queue.put(KEY_TO_SCAN[ev.code]) elif ev.type != EV_MSC: udev.write_event(ev) # keep the device awake or something, reduces max send latency ~10x async def keepalive_gen(state): while True: await asyncio.sleep(KEEPALIVE_DELAY) if time.monotonic() >= state.tx_time + KEEPALIVE_DELAY: await state.tx_queue.put(0) await state.tx_event.wait() state.tx_event.clear() async def sender(state): while True: scancode = f"nec:{await state.tx_queue.get():#x}" await asyncio.create_subprocess_exec("ir-ctl", "-S", scancode) state.tx_time = time.monotonic() state.tx_event.set() async def main(): coroutines = [listener, keepalive_gen, sender] state = SimpleNamespace(tx_queue=asyncio.Queue(), tx_time=0, tx_event=asyncio.Event()) q = asyncio.Queue() await asyncio.gather(*(asyncio.create_task(c(state)) for c in coroutines)) if __name__ == "__main__": asyncio.run(main()) -
itzexor revised this gist
Apr 5, 2020 . 1 changed file with 15 additions and 24 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -31,9 +31,12 @@ # 0x381e right # 0x3854 ok # delay in seconds between "keepalive" 0x0 scancode generation KEEPALIVE_DELAY = 1 # delay in seconds between consequtive keypresses (when many are queued) TRANSMIT_DELAY = 0.2 KEY_TO_SCAN = { KEY_MUTE: 0x3818, KEY_VOLUMEUP: 0x380e, @@ -44,7 +47,7 @@ KEY_H: 0x3813 # source } async def listener(queue): # grab ir device and add a fake ir device to forward events to # this will probably break if any other input devices are ever present on boot dev = InputDevice("/dev/input/event0") @@ -56,40 +59,28 @@ async def listener(queue, send_trigger): # 0 = release, 1 = down, 2 = repeat if ev.value == 1 or ev.value == 2: await queue.put(KEY_TO_SCAN[ev.code]) elif ev.type != EV_MSC: udev.write_event(ev) # keep the device awake or something, reduces max send latency ~10x async def keepalive_gen(queue): while True: #fixme await queue.put(0) await asyncio.sleep(KEEPALIVE_DELAY) async def sender(queue): while True: #fixme code = await queue.get() await asyncio.create_subprocess_exec("ir-ctl", "-S", f"nec:{code:#x}") if queue.qsize() > 0: await asyncio.sleep(TRANSMIT_DELAY) async def main(): queue = asyncio.Queue() await asyncio.gather(asyncio.create_task(listener(queue)), asyncio.create_task(keepalive_gen(queue)), asyncio.create_task(sender(queue))) if __name__ == "__main__": asyncio.run(main()) -
itzexor revised this gist
Apr 5, 2020 . 1 changed file with 73 additions and 10 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,32 +1,95 @@ #!/usr/bin/python3 import asyncio from evdev import InputDevice, UInput from evdev.ecodes import * # NEC protocol # scancode button name # 0x3834 mode [pic/sound] # 0x3811 closed captioning # 0x380d sleep # 0x3857 pic shape # 0x3801 1 # 0x3802 2 # 0x3803 3 # 0x3804 4 # 0x3805 5 # 0x3806 6 # 0x3807 7 # 0x3808 8 # 0x3809 9 # 0x3800 0 # 0x3870 previous channel # 0x384b . # 0x381a sap # 0x383c back # 0x3817 menu # 0x380c info # 0x384e up # 0x384f down # 0x381f left # 0x381e right # 0x3854 ok # delay in seconds between keepalive 0x0 scancode sending KEEPALIVE_DELAY = 1 KEY_TO_SCAN = { KEY_MUTE: 0x3818, KEY_VOLUMEUP: 0x380e, KEY_VOLUMEDOWN: 0x380f, KEY_PAGEUP: 0x380a, # channel up KEY_PAGEDOWN: 0x380b, # channel down KEY_SLEEP: 0x3812, # power KEY_H: 0x3813 # source } async def listener(queue, send_trigger): # grab ir device and add a fake ir device to forward events to # this will probably break if any other input devices are ever present on boot dev = InputDevice("/dev/input/event0") dev.grab() udev = UInput.from_device(dev, name="filtered-ir-device") async for ev in dev.async_read_loop(): if ev.type == EV_KEY and ev.code in KEY_TO_SCAN: # 0 = release, 1 = down, 2 = repeat if ev.value == 1 or ev.value == 2: await queue.put(KEY_TO_SCAN[ev.code]) send_trigger.set() elif ev.type != EV_MSC: udev.write_event(ev) # keep the device awake or something, reduces max send latency ~10x async def keepalive_timer(send_trigger): while True: #fixme await asyncio.sleep(KEEPALIVE_DELAY) send_trigger.set() async def sender(queue, send_trigger): while True: #fixme send_trigger.clear() if queue.empty(): code = 0 wait = True else: code = await queue.get() wait = False await asyncio.create_subprocess_exec("ir-ctl", "-S", f"nec:{code:#x}") if wait: await send_trigger.wait() async def main(): queue = asyncio.Queue() send_trigger = asyncio.Event() await asyncio.gather(asyncio.create_task(listener(queue, send_trigger)), asyncio.create_task(keepalive_timer(send_trigger)), asyncio.create_task(sender(queue, send_trigger))) if __name__ == "__main__": asyncio.run(main()) -
itzexor created this gist
Apr 5, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,32 @@ #!/usr/bin/python3 from evdev import InputDevice, categorize, UInput from evdev.ecodes import * from subprocess import Popen, run # NEC protocol KEY_TO_SCAN = { KEY_MUTE: 0x3818, KEY_VOLUMEUP: 0x380e, KEY_VOLUMEDOWN: 0x380f, KEY_PAGEUP: 0x380a, # channel up KEY_PAGEDOWN: 0x380b, # channel down KEY_SLEEP: 0x3812, # power toggle KEY_H: 0x3813 # source toggle } def main(): run(["/usr/bin/ir-keytable", "-a", "/etc/rc_maps.cfg", "-s", "rc0"]) run(["/usr/bin/ir-keytable", "-D", "500", "-P", "200"]) dev = InputDevice("/dev/input/event0") dev.grab() udev = UInput.from_device(dev, name="filtered-ir-device") for ev in dev.async_read_loop(): if ev.type == EV_KEY and ev.code in KEY_TO_SCAN: # 0 = release, 1 = down, 2 = repeat if ev.value == 1 or ev.value == 2: Popen(["/usr/bin/ir-ctl", "--scancode", f"nec:{KEY_TO_SCAN[ev.code]:#x}"]) elif ev.type != EV_MSC: udev.write_event(ev) if __name__ == "__main__": main()