Last active
April 8, 2024 16:20
-
-
Save rxseger/994cf98066b31ff9aeb1d97cef9eaa05 to your computer and use it in GitHub Desktop.
Script using RTL-SDR with IR to power off system when RTL remote power button pressed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# Quick & dirty script to power down the computer when receiving | |
# a "power button" IR signal from the RTL remote over RTL-SDR dongle | |
# via rpc_ir librtlsdr: https://github.com/rxseger/librtlsdr/pull/1 | |
# Intended for use with Raspberry Pi, which lacks a built-in power button | |
# Usage: | |
# rtl_rpcd -I 1235 | |
# python nec-pwr.py | |
IR_ADDR = "127.0.0.1" # should be local unless you want to shutdown another system than receives the IR | |
IR_PORT = 1235 | |
RESTART_SDR_CMD = "service dump1090-mutability restart" | |
SHUTDOWN_CMD = "shutdown -h now" | |
REBOOT_CMD = "reboot" | |
LOG_PATH = "/var/log/ir.log" | |
import socket | |
import re | |
import os | |
import time | |
import datetime | |
import RPi.GPIO as GPIO | |
LED_G = 29 # G5 | |
LED_R = 31 # G6 | |
LED_Y = 32 # G12 | |
LED_B = 33 # G13 | |
GPIO.setmode(GPIO.BOARD) | |
GPIO.setup([LED_G, LED_R, LED_Y, LED_B], GPIO.OUT, initial=GPIO.HIGH) | |
# turn off yellow | |
GPIO.output(LED_Y, True) | |
def log(*a): | |
s = " ".join(map(str, a)) | |
if not os.getenv("QUIET"): print s | |
ts = datetime.datetime.now().isoformat() | |
fd = file(LOG_PATH, "at+") | |
fd.write("%s\t%s\n" % (ts, s)) | |
fd.flush() | |
fd.close() | |
def unpack_pulses(data): | |
s = '' | |
for i in range(0, len(data)): | |
b = ord(data[i]) | |
if b & 0x80: | |
pulse = 1 | |
else: | |
pulse = 0 | |
duration = (b & 0x7f) # *20 usec | |
#log(pulse,duration) | |
for j in range(0, duration): | |
s += "%s" % (pulse,) | |
return s | |
SCALE = 100 # times slower | |
def replay_pulses(groups): | |
for group in groups: | |
if '0' in group: | |
GPIO.output(LED_Y, True) | |
else: | |
GPIO.output(LED_Y, False) | |
time.sleep(len(group) * 20e-6 * SCALE) | |
GPIO.output(LED_Y, True) | |
global last_cmd | |
last_cmd = None | |
def measure_pulses(s): | |
global last_cmd | |
# Split by contiguous length of series on/off: 111.... or 000... etc. | |
groups = re.findall('0+|1+', s) | |
log('') | |
log('groups',groups) | |
# Measure lengths of pauses - corresponds to bits (alternated with bursts) | |
lengths = "" | |
for i in range(0, len(groups)): | |
group = groups[i] | |
if '0' in group: | |
if len(group) > 185: # very long (usually 191), stop trailer | |
break | |
elif len(group) > 20: # long (usually 33-34) is 1 | |
lengths += "1" | |
else: | |
lengths += "0" | |
log('lengths',lengths) | |
if len(lengths) < 8: | |
#log("repeat") | |
log("repeat") | |
return # some other payload, maybe repeat? | |
# match the last 16 bits, and hopefully first 16 but may be truncated so match from the back | |
match = re.match('^(.*)([01]{16})$', lengths) | |
if not match: | |
return | |
address_str, command_str = match.groups() | |
address = int(address_str, 2) | |
command = int(command_str, 2) | |
# if 16-bit command is 8-bits inverted, then is original 8-bit NEC | |
if command >> 8 == ~command & 0xff: | |
command = command & 0xff | |
#if address >> 8 == ~address & 0xff: | |
# address = address & 0xff | |
log("%.2x at %.2x" % (command, address)) | |
replay_pulses(groups) | |
# The RTL remote uses these commands for each button (found by empirical testing with Linux driver): | |
# 4d 01001101 power | |
# 54 01010100 source | |
# 16 00010110 mute | |
# 4c 01001100 record | |
# 05 00000101 ch+ | |
# 0c 00001100 time shift | |
# 0a 00001010 vol- | |
# 40 01000000 fullscreen | |
# 1e 00011110 vol+ | |
# 12 00010010 0 | |
# 02 00000010 ch- | |
# 1c 00011100 recall | |
# 09 00001001 1 | |
# 1d 00011101 2 | |
# 1f 00011111 3 | |
# 0d 00001101 4 | |
# 19 00011001 5 | |
# 1b 00011011 6 | |
# 11 00010001 7 | |
# 15 00010101 8 | |
# 17 00010111 9 | |
# power | |
if command == 0x4d: | |
log("Received IR power signal, shutting down!") | |
os.system(SHUTDOWN_CMD) | |
# recall | |
if command == 0xc7: | |
log("Received IR recall signal, rebooting!") | |
os.system(REBOOT_CMD) | |
#def frombin(s): | |
# if s == '': return 0 | |
# if len(s) != 8: return -1 | |
# return int(s, 2) | |
# | |
#log('len',len(lengths)) | |
#command_inv = frombin(lengths[len(lengths) - 8 - 1:-1]) | |
#command = frombin(lengths[len(lengths) - 8*2-1:len(lengths) - 8-1]) | |
#address_inv = frombin(lengths[len(lengths) - 8*3-1:len(lengths) - 8*2-1]) | |
#address = frombin(lengths[len(lengths) - 8*4-1:len(lengths) - 8*3-1]) | |
#log("%.2x %.2x %.2x %.2x" % (command, command_inv, address_inv, address)) | |
# Integrity check by verifying command is inverted command inverted | |
#if command != ~command_inv & 0xff: | |
# "32-bit NEC (used by Apple and TiVo remotes)" | |
# see https://github.com/torvalds/linux/blob/9256d5a308c95a50c6e85d682492ae1f86a70f9b/drivers/media/rc/img-ir/img-ir-nec.c | |
# command = (command_inv << 8) | command | |
#if address != ~address_inv & 0xff: | |
# address_str = "%.2x" % (address,) | |
#log("%.2x %s" % (command_inv, lengths)) | |
#cmd = "%.2x from %.2x" % (command, address) | |
#log(cmd) | |
#last_cmd = cmd | |
# Start bits are 8 zeros (may be partial) then 8 ones, match the 1's | |
# 0000001111111110110010010011011 | |
# 0* 1{8} (.*) | |
#match = re.match('^0*1{8}(.*)', lengths) | |
#if match is None: | |
# #log('nothing in ',lengths) | |
# return | |
## payload = match.groups()[0] | |
#log("PAYLOAD",payload) | |
#for i in range(0, len(groups), 2): | |
# a = groups[i] | |
# if i + 1 >= len(groups): break | |
# b = groups[i + 1] | |
# | |
# log(a,b) | |
def main(): | |
if os.getuid() != 0: | |
log("Must run as root to shutdown") | |
raise SystemExit | |
while True: | |
# First, restart the SDR software so it reconnects to rtl_rpcd, opening the device (required for IR) | |
log("Running %s" % (RESTART_SDR_CMD,)) | |
os.system(RESTART_SDR_CMD) | |
log("Connecting...") | |
s = socket.socket() | |
try: | |
s.connect((IR_ADDR, IR_PORT)) | |
except socket.error as e: | |
log("Couldn't connect to %s:%s: %s, waiting..." % (IR_ADDR, IR_PORT, e)) | |
time.sleep(1) | |
continue | |
log("Connected to %s:%s" % (IR_ADDR, IR_PORT)) | |
log("Reading IR data...") | |
try: | |
while True: | |
data = s.recv(1024) | |
if len(data) == 0: break | |
if data == '\0': continue # silence | |
log([data]) | |
pulses = unpack_pulses(data) | |
measure_pulses(pulses) | |
except socket.error as e: | |
log("Exception reading socket data: %s" % (e,)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment