Skip to content

Instantly share code, notes, and snippets.

@CFSworks
Last active December 23, 2022 19:58
Show Gist options
  • Save CFSworks/697c63511d05a1e18d8668a01705bd80 to your computer and use it in GitHub Desktop.
Save CFSworks/697c63511d05a1e18d8668a01705bd80 to your computer and use it in GitHub Desktop.
Badger (ORION) RF modulation
options:
parameters:
author: CFSworks
category: '[GRC Hier Blocks]'
cmake_opt: ''
comment: ''
copyright: ''
description: ''
gen_cmake: 'On'
gen_linking: dynamic
generate_options: no_gui
hier_block_src_path: '.:'
id: orion
max_nouts: '0'
output_language: python
placement: (0,0)
qt_qss_theme: ''
realtime_scheduling: ''
run: 'True'
run_command: '{python} -u {filename}'
run_options: run
sizing_mode: fixed
thread_safe_setters: ''
title: Orion RF decoder
window_size: ''
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [8, 8]
rotation: 0
state: enabled
blocks:
- name: decim
id: variable
parameters:
comment: ''
value: '2'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [184, 84.0]
rotation: 0
state: true
- name: fsk_deviation_hz
id: variable
parameters:
comment: ''
value: 100e3
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [288, 20.0]
rotation: 0
state: true
- name: samp_rate
id: variable
parameters:
comment: ''
value: 1e6
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [184, 12]
rotation: 0
state: enabled
- name: analog_pll_freqdet_cf_0
id: analog_pll_freqdet_cf
parameters:
affinity: ''
alias: ''
comment: ''
max_freq: fsk_deviation_hz*1.2/(samp_rate/decim)*(3.1415*2)
maxoutbuf: '0'
min_freq: -fsk_deviation_hz*1.2/(samp_rate/decim)*(3.1415*2)
minoutbuf: '0'
w: '0.5'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [400, 324.0]
rotation: 0
state: true
- name: blocks_file_source_0
id: blocks_file_source
parameters:
affinity: ''
alias: ''
begin_tag: pmt.PMT_NIL
comment: ''
file: /tmp/baseband.iq
length: '0'
maxoutbuf: '0'
minoutbuf: '0'
offset: '0'
repeat: 'False'
type: byte
vlen: '1'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [592, 148.0]
rotation: 180
state: enabled
- name: blocks_interleaved_char_to_complex_0
id: blocks_interleaved_char_to_complex
parameters:
affinity: ''
alias: ''
comment: ''
maxoutbuf: '0'
minoutbuf: '0'
vector_input: 'False'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [400, 180.0]
rotation: 180
state: enabled
- name: blocks_multiply_const_vxx_0
id: blocks_multiply_const_vxx
parameters:
affinity: ''
alias: ''
comment: ''
const: 1/128
maxoutbuf: '0'
minoutbuf: '0'
type: complex
vlen: '1'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [32, 180.0]
rotation: 180
state: enabled
- name: blocks_throttle_0
id: blocks_throttle
parameters:
affinity: ''
alias: ''
comment: ''
ignoretag: 'True'
maxoutbuf: '0'
minoutbuf: '0'
samples_per_second: samp_rate
type: complex
vlen: '1'
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [232, 180.0]
rotation: 180
state: enabled
- name: dc_blocker_xx_0
id: dc_blocker_xx
parameters:
affinity: ''
alias: ''
comment: ''
length: '512'
long_form: 'True'
maxoutbuf: '0'
minoutbuf: '0'
type: cc
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [48, 332.0]
rotation: 0
state: enabled
- name: epy_block_0
id: epy_block
parameters:
_source_code: "import numpy as np\nfrom gnuradio import gr\nimport pmt\n\nPREAMBLE_BITS\
\ = 38\nSYNC_BITS = 10\nPAYLOAD_BITS = 120\nFOOTER_BITS = 16\n\nclass\
\ blk(gr.basic_block):\n \"\"\"Orion receiver\"\"\"\n\n def __init__(self,\
\ samples_per_bit=5, threshold=60):\n gr.sync_block.__init__(\n \
\ self,\n name='Orion receiver',\n in_sig=[np.float32],\n\
\ out_sig=[]\n )\n\n self.samples_per_bit = samples_per_bit\n\
\ self.threshold = threshold\n\n self.message_port_register_out(pmt.intern('msg'))\n\
\n\n def demodulate(self, samples):\n bits = np.zeros(PREAMBLE_BITS\
\ + SYNC_BITS + PAYLOAD_BITS + FOOTER_BITS,\n dtype=np.uint8)\n\
\ stride = 8\n sample_stride = stride * self.samples_per_bit\n\
\ assert len(bits) % stride == 0\n\n strength = np.square(samples\
\ - np.mean(samples))\n i = 0\n\n for bit in range(0, len(bits),\
\ stride):\n alignments = strength[i:i+sample_stride]\n \
\ if len(alignments) < sample_stride: return None, 0\n alignments.shape\
\ = (stride, self.samples_per_bit)\n\n strongest = np.argmax(np.sum(alignments,\
\ axis=0)[:3])\n i += strongest\n\n window = samples[i:i+sample_stride:self.samples_per_bit]\n\
\ bits[bit:bit+stride] = window > np.mean(window)\n\n \
\ i += sample_stride - 1\n\n return bits, i\n\n\n @staticmethod\n\
\ def realign(msg):\n template = np.zeros(SYNC_BITS+PAYLOAD_BITS+FOOTER_BITS,\
\ dtype=np.int8)\n template[:SYNC_BITS] = [-1, -1, -1, -1, 1, 1, 1, 1,\
\ -1, 1]\n template[-FOOTER_BITS:] = -1\n template[-FOOTER_BITS::2]\
\ = 1\n\n corr = np.correlate(msg.astype(np.int8)*2-1, template, mode='valid')\n\
\ best = np.argmax(corr)\n if corr[best] < np.sum(np.abs(template))*0.9:\
\ return None\n return np.packbits(msg[best+SYNC_BITS:][:PAYLOAD_BITS])\n\
\n\n def work(self, input_items, output_items):\n preamble = np.zeros(PREAMBLE_BITS\
\ * self.samples_per_bit)\n preamble[::self.samples_per_bit] = 1\n \
\ preamble[::2*self.samples_per_bit] = -1\n\n detect = np.correlate(input_items[0],\
\ preamble, mode='valid')\n if np.all(detect < self.threshold):\n \
\ # No beginning of preamble detected, throw out whole buffer\n \
\ return len(detect)\n\n first = np.argwhere(detect >= self.threshold)[0][0]\n\
\ begin = np.argmax(detect[:first+len(preamble)])\n\n msg, consume\
\ = self.demodulate(input_items[0][begin:])\n consume += begin\n\n \
\ if msg is not None:\n msg = self.realign(msg)\n\n if\
\ msg is not None:\n send_pmt = pmt.make_u8vector(len(msg), 0)\n\
\ for i,b in enumerate(msg):\n pmt.u8vector_set(send_pmt,\
\ i, int(b))\n self.message_port_pub(pmt.intern('msg'), pmt.cons(pmt.PMT_NIL,\
\ send_pmt))\n\n return consume\n"
affinity: ''
alias: ''
comment: ''
maxoutbuf: '0'
minoutbuf: '0'
samples_per_bit: int(samp_rate//100e3//decim)
threshold: '20'
states:
_io_cache: ('Orion receiver', 'blk', [('samples_per_bit', '5'), ('threshold',
'60')], [('0', 'float', 1)], [('msg', 'message', 1)], 'Orion receiver', ['samples_per_bit',
'threshold'])
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [464, 452.0]
rotation: 180
state: true
- name: epy_block_1
id: epy_block
parameters:
_source_code: "import numpy as np\nfrom gnuradio import gr\nimport pmt\nimport\
\ time\n\n\nclass blk(gr.basic_block):\n def __init__(self, interval=3.0):\n\
\ gr.sync_block.__init__(\n self,\n name='PDU logger',\n\
\ in_sig=[],\n out_sig=[]\n )\n\n self.message_port_register_in(pmt.intern('msg'))\n\
\ self.message_port_register_in(pmt.intern('pdu'))\n self.set_msg_handler(pmt.intern('msg'),\
\ self.handle_msg)\n self.set_msg_handler(pmt.intern('pdu'), self.handle_pdu)\n\
\n self._last_log = 0\n self.interval = interval\n\n\n def\
\ handle_msg(self, msg_pmt):\n now = time.time()\n if now - self._last_log\
\ <= self.interval:\n return\n self._last_log = now\n\n \
\ print(f'{time.ctime()}: {pmt.write_string(msg_pmt)}')\n\n\n def handle_pdu(self,\
\ pdu_pmt):\n now = time.time()\n if now - self._last_log <= self.interval:\n\
\ return\n self._last_log = now\n\n pdu = pmt.cdr(pdu_pmt)\n\
\ data = ' '.join('%02x' % x for x in pmt.u8vector_elements(pdu))\n \
\ print(f'{time.ctime()}: {data}')\n"
affinity: ''
alias: ''
comment: ''
interval: '0.5'
maxoutbuf: '0'
minoutbuf: '0'
states:
_io_cache: ('PDU logger', 'blk', [('interval', '3.0')], [('msg', 'message', 1),
('pdu', 'message', 1)], [], '', ['interval'])
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [632, 560.0]
rotation: 0
state: true
- name: epy_block_2
id: epy_block
parameters:
_source_code: "import numpy as np\nfrom gnuradio import gr\nimport pmt\n\nfrom\
\ functools import reduce\nimport struct\n\nCODEBOOK = [\n 0b010110, # 0\n\
\ 0b001101, # 1\n 0b001110, # 2\n 0b001011, # 3\n 0b011100, # 4\n\
\ 0b011001, # 5\n 0b011010, # 6\n 0b010011, # 7\n 0b101100, # 8\n\
\ 0b100101, # 9\n 0b100110, # a\n 0b100011, # b\n 0b110100, # c\n\
\ 0b110001, # d\n 0b110010, # e\n 0b101001, # f\n]\n\nclass blk(gr.basic_block):\n\
\ \"\"\"Orion decode\"\"\"\n\n def __init__(self):\n gr.sync_block.__init__(\n\
\ self,\n name='Orion decode',\n in_sig=[],\n\
\ out_sig=[]\n )\n\n self.message_port_register_in(pmt.intern('msg'))\n\
\ self.message_port_register_out(pmt.intern('msg_fail'))\n self.message_port_register_out(pmt.intern('consumption'))\n\
\n self.set_msg_handler(pmt.intern('msg'), self.handle_msg)\n\n\n \
\ def handle_msg(self, msg):\n out = self.decode(msg)\n if out:\n\
\ self.message_port_pub(pmt.intern('consumption'), out)\n \
\ else:\n self.message_port_pub(pmt.intern('msg_fail'), msg)\n\n\n\
\ @staticmethod\n def do_crc(x):\n taps = 0x13d65\n i =\
\ 0\n while (1<<i)<x: i += 1\n while i:\n i -= 1\n\
\ x = min(x, x^(taps<<i))\n return x\n\n\n def decode(self,\
\ msg):\n data = pmt.cdr(msg)\n\n if not pmt.is_u8vector(data):\
\ return\n\n ary = np.array(list(pmt.u8vector_elements(data)), dtype=np.uint8)\n\
\ bits = np.unpackbits(ary)\n if len(bits)%6 != 0: return\n \
\ bits.shape = (-1, 6)\n\n codewords = list(np.dot(bits, [32, 16,\
\ 8, 4, 2, 1]))\n try:\n nybbles = [CODEBOOK.index(w) for\
\ w in codewords]\n except ValueError:\n return\n\n \
\ b = bytes(nybbles[x*2]*16 + nybbles[x*2+1] for x in range(len(nybbles)//2))\n\
\n if self.do_crc(reduce(lambda x,y: x<<8 | y, b)) != 0xFFFF: return\n\
\n sender, consumption, _ = struct.unpack('<IIH', b)\n\n out =\
\ pmt.make_dict()\n out = pmt.dict_add(out, pmt.intern('sender'), pmt.from_long(sender))\n\
\ out = pmt.dict_add(out, pmt.intern('consumption'), pmt.from_long(consumption))\n\
\ return out\n"
affinity: ''
alias: ''
comment: ''
maxoutbuf: '0'
minoutbuf: '0'
states:
_io_cache: ('Orion decode', 'blk', [], [('msg', 'message', 1)], [('consumption',
'message', 1), ('msg_fail', 'message', 1)], 'Orion decode', [])
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [368, 560.0]
rotation: 0
state: true
- name: fir_filter_xxx_0
id: fir_filter_xxx
parameters:
affinity: ''
alias: ''
comment: ''
decim: '1'
maxoutbuf: '0'
minoutbuf: '0'
samp_delay: '1'
taps: '[0.225, 0.55, 0.225]'
type: fff
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [608, 332.0]
rotation: 0
state: enabled
- name: low_pass_filter_0
id: low_pass_filter
parameters:
affinity: ''
alias: ''
beta: '6.76'
comment: ''
cutoff_freq: 200e3
decim: decim
gain: '1'
interp: '1'
maxoutbuf: '0'
minoutbuf: '0'
samp_rate: samp_rate
type: fir_filter_ccf
width: 50e3
win: firdes.WIN_HAMMING
states:
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [216, 292.0]
rotation: 0
state: enabled
connections:
- [analog_pll_freqdet_cf_0, '0', fir_filter_xxx_0, '0']
- [blocks_file_source_0, '0', blocks_interleaved_char_to_complex_0, '0']
- [blocks_interleaved_char_to_complex_0, '0', blocks_throttle_0, '0']
- [blocks_multiply_const_vxx_0, '0', dc_blocker_xx_0, '0']
- [blocks_throttle_0, '0', blocks_multiply_const_vxx_0, '0']
- [dc_blocker_xx_0, '0', low_pass_filter_0, '0']
- [epy_block_0, msg, epy_block_2, msg]
- [epy_block_2, consumption, epy_block_1, msg]
- [epy_block_2, msg_fail, epy_block_1, pdu]
- [fir_filter_xxx_0, '0', epy_block_0, '0']
- [low_pass_filter_0, '0', analog_pll_freqdet_cf_0, '0']
metadata:
file_format: 1
#!/usr/bin/env python
import re
import struct
import binascii
import argparse
import numpy as np
CODEBOOK = [
'-+-++-', # 0
'--++-+', # 1
'--+++-', # 2
'--+-++', # 3
'-+++--', # 4
'-++--+', # 5
'-++-+-', # 6
'-+--++', # 7
'+-++--', # 8
'+--+-+', # 9
'+--++-', # a
'+---++', # b
'++-+--', # c
'++---+', # d
'++--+-', # e
'+-+--+', # f
]
PREAMBLE = '-+'*8
SYNC = '----++++-+'
FOOTER = '+-'*8
def crc16(data: bytes):
POLY = 0x13d65
state = 0
for b in data + b'\0\0': # 2 null loops to allow CRC16 to finalize
for x in range(8):
state <<= 1
if state&0x10000:
state ^= POLY
state ^= b
return state^0xFFFF
def encode(data: bytes):
out = ''
for b in data:
for nybble in [b>>4, b&0xF]: # high nybble first (typical hex order)
out += CODEBOOK[nybble]
return out
def modulate(syms: str):
syms = np.array([{'+': +1., '-': -1.}[s] for s in syms])
samp = np.repeat(syms, 10) # 100 Ksyms/sec -> 1 Msps
samp *= np.pi*2 * 100_000 / 1_000_000 # Norm. ang. freq. for +/- 100KHz
out = np.e ** (1j*np.cumsum(samp)) # FM modulation
return out
def save(samples: np.array, filename: str):
hackrf_format = np.outer(samples, [127, -127j]).real.astype(np.int8)
with open(filename, 'wb') as f:
f.write(hackrf_format)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--verbose', '-v', action='store_true')
parser.add_argument('--output', '-o', default=None)
def parse_sender(sender):
m = re.match('^[0-9a-fA-F]{8}$', sender)
return binascii.unhexlify(m.group(0).encode('ascii'))
parser.add_argument('sender', type=parse_sender)
parser.add_argument('consumption', type=int)
args = parser.parse_args()
assert 0 <= args.consumption < (1<<32)
msg = args.sender + struct.pack('<I', args.consumption)
msg += struct.pack('>H', crc16(msg))
if args.verbose:
print('Message (sender, consumption, CRC):')
print(' ' + ' '.join('%02x'%x for x in msg))
encoded = encode(msg)
if args.verbose:
print('Payload syms:')
print(' ' + ' '.join(binascii.hexlify(msg).decode('ascii')))
print(' ' + ' '.join(encoded[x*6:(x+1)*6] for x in range(len(msg)*2)))
encoded = PREAMBLE + SYNC + encoded + FOOTER
if args.verbose:
print('Full transmission:')
print(' ' + encoded)
if args.output:
modulated = modulate(encoded)
save(modulated, args.output)
if __name__ == '__main__':
main()
@b1tninja
Copy link

Saw your issue on rtlamr. Looks like we have different meters but I was hoping it might be similar, but the one I have is a https://fcc.report/FCC-ID/GIF2012WSE. I captured 914.7Mhz +/- 10mhz (40 M samples at 2 decimation) for a few hours and then ran it through DC Remove and Power Squelch to get a capture with just the higher power (-20db) samples.

Here's a picture with the FFT "held" so you can kind of see the channels.
https://i.imgur.com/aA4qRMC.jpeg

Idk if you'd be at all interested in helping adapt your blocks here to decode my meter, looks like they might be pretty different... but thought I'd ask

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment