Skip to content

Instantly share code, notes, and snippets.

Last active December 23, 2022 19:58
Badger (ORION) RF modulation
author: CFSworks
category: '[GRC Hier Blocks]'
cmake_opt: ''
comment: ''
copyright: ''
description: ''
gen_cmake: 'On'
gen_linking: dynamic
generate_options: no_gui
hier_block_src_path: '.:'
id: orion
max_nouts: '0'
output_language: python
placement: (0,0)
qt_qss_theme: ''
realtime_scheduling: ''
run: 'True'
run_command: '{python} -u {filename}'
run_options: run
sizing_mode: fixed
thread_safe_setters: ''
title: Orion RF decoder
window_size: ''
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [8, 8]
rotation: 0
state: enabled
- name: decim
id: variable
comment: ''
value: '2'
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [184, 84.0]
rotation: 0
state: true
- name: fsk_deviation_hz
id: variable
comment: ''
value: 100e3
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [288, 20.0]
rotation: 0
state: true
- name: samp_rate
id: variable
comment: ''
value: 1e6
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [184, 12]
rotation: 0
state: enabled
- name: analog_pll_freqdet_cf_0
id: analog_pll_freqdet_cf
affinity: ''
alias: ''
comment: ''
max_freq: fsk_deviation_hz*1.2/(samp_rate/decim)*(3.1415*2)
maxoutbuf: '0'
min_freq: -fsk_deviation_hz*1.2/(samp_rate/decim)*(3.1415*2)
minoutbuf: '0'
w: '0.5'
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [400, 324.0]
rotation: 0
state: true
- name: blocks_file_source_0
id: blocks_file_source
affinity: ''
alias: ''
begin_tag: pmt.PMT_NIL
comment: ''
file: /tmp/
length: '0'
maxoutbuf: '0'
minoutbuf: '0'
offset: '0'
repeat: 'False'
type: byte
vlen: '1'
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [592, 148.0]
rotation: 180
state: enabled
- name: blocks_interleaved_char_to_complex_0
id: blocks_interleaved_char_to_complex
affinity: ''
alias: ''
comment: ''
maxoutbuf: '0'
minoutbuf: '0'
vector_input: 'False'
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [400, 180.0]
rotation: 180
state: enabled
- name: blocks_multiply_const_vxx_0
id: blocks_multiply_const_vxx
affinity: ''
alias: ''
comment: ''
const: 1/128
maxoutbuf: '0'
minoutbuf: '0'
type: complex
vlen: '1'
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [32, 180.0]
rotation: 180
state: enabled
- name: blocks_throttle_0
id: blocks_throttle
affinity: ''
alias: ''
comment: ''
ignoretag: 'True'
maxoutbuf: '0'
minoutbuf: '0'
samples_per_second: samp_rate
type: complex
vlen: '1'
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [232, 180.0]
rotation: 180
state: enabled
- name: dc_blocker_xx_0
id: dc_blocker_xx
affinity: ''
alias: ''
comment: ''
length: '512'
long_form: 'True'
maxoutbuf: '0'
minoutbuf: '0'
type: cc
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [48, 332.0]
rotation: 0
state: enabled
- name: epy_block_0
id: epy_block
_source_code: "import numpy as np\nfrom gnuradio import gr\nimport pmt\n\nPREAMBLE_BITS\
\ = 38\nSYNC_BITS = 10\nPAYLOAD_BITS = 120\nFOOTER_BITS = 16\n\nclass\
\ blk(gr.basic_block):\n \"\"\"Orion receiver\"\"\"\n\n def __init__(self,\
\ samples_per_bit=5, threshold=60):\n gr.sync_block.__init__(\n \
\ self,\n name='Orion receiver',\n in_sig=[np.float32],\n\
\ out_sig=[]\n )\n\n self.samples_per_bit = samples_per_bit\n\
\ self.threshold = threshold\n\n self.message_port_register_out(pmt.intern('msg'))\n\
\n\n def demodulate(self, samples):\n bits = np.zeros(PREAMBLE_BITS\
\ + SYNC_BITS + PAYLOAD_BITS + FOOTER_BITS,\n dtype=np.uint8)\n\
\ stride = 8\n sample_stride = stride * self.samples_per_bit\n\
\ assert len(bits) % stride == 0\n\n strength = np.square(samples\
\ - np.mean(samples))\n i = 0\n\n for bit in range(0, len(bits),\
\ stride):\n alignments = strength[i:i+sample_stride]\n \
\ if len(alignments) < sample_stride: return None, 0\n alignments.shape\
\ = (stride, self.samples_per_bit)\n\n strongest = np.argmax(np.sum(alignments,\
\ axis=0)[:3])\n i += strongest\n\n window = samples[i:i+sample_stride:self.samples_per_bit]\n\
\ bits[bit:bit+stride] = window > np.mean(window)\n\n \
\ i += sample_stride - 1\n\n return bits, i\n\n\n @staticmethod\n\
\ def realign(msg):\n template = np.zeros(SYNC_BITS+PAYLOAD_BITS+FOOTER_BITS,\
\ dtype=np.int8)\n template[:SYNC_BITS] = [-1, -1, -1, -1, 1, 1, 1, 1,\
\ -1, 1]\n template[-FOOTER_BITS:] = -1\n template[-FOOTER_BITS::2]\
\ = 1\n\n corr = np.correlate(msg.astype(np.int8)*2-1, template, mode='valid')\n\
\ best = np.argmax(corr)\n if corr[best] < np.sum(np.abs(template))*0.9:\
\ return None\n return np.packbits(msg[best+SYNC_BITS:][:PAYLOAD_BITS])\n\
\n\n def work(self, input_items, output_items):\n preamble = np.zeros(PREAMBLE_BITS\
\ * self.samples_per_bit)\n preamble[::self.samples_per_bit] = 1\n \
\ preamble[::2*self.samples_per_bit] = -1\n\n detect = np.correlate(input_items[0],\
\ preamble, mode='valid')\n if np.all(detect < self.threshold):\n \
\ # No beginning of preamble detected, throw out whole buffer\n \
\ return len(detect)\n\n first = np.argwhere(detect >= self.threshold)[0][0]\n\
\ begin = np.argmax(detect[:first+len(preamble)])\n\n msg, consume\
\ = self.demodulate(input_items[0][begin:])\n consume += begin\n\n \
\ if msg is not None:\n msg = self.realign(msg)\n\n if\
\ msg is not None:\n send_pmt = pmt.make_u8vector(len(msg), 0)\n\
\ for i,b in enumerate(msg):\n pmt.u8vector_set(send_pmt,\
\ i, int(b))\n self.message_port_pub(pmt.intern('msg'), pmt.cons(pmt.PMT_NIL,\
\ send_pmt))\n\n return consume\n"
affinity: ''
alias: ''
comment: ''
maxoutbuf: '0'
minoutbuf: '0'
samples_per_bit: int(samp_rate//100e3//decim)
threshold: '20'
_io_cache: ('Orion receiver', 'blk', [('samples_per_bit', '5'), ('threshold',
'60')], [('0', 'float', 1)], [('msg', 'message', 1)], 'Orion receiver', ['samples_per_bit',
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [464, 452.0]
rotation: 180
state: true
- name: epy_block_1
id: epy_block
_source_code: "import numpy as np\nfrom gnuradio import gr\nimport pmt\nimport\
\ time\n\n\nclass blk(gr.basic_block):\n def __init__(self, interval=3.0):\n\
\ gr.sync_block.__init__(\n self,\n name='PDU logger',\n\
\ in_sig=[],\n out_sig=[]\n )\n\n self.message_port_register_in(pmt.intern('msg'))\n\
\ self.message_port_register_in(pmt.intern('pdu'))\n self.set_msg_handler(pmt.intern('msg'),\
\ self.handle_msg)\n self.set_msg_handler(pmt.intern('pdu'), self.handle_pdu)\n\
\n self._last_log = 0\n self.interval = interval\n\n\n def\
\ handle_msg(self, msg_pmt):\n now = time.time()\n if now - self._last_log\
\ <= self.interval:\n return\n self._last_log = now\n\n \
\ print(f'{time.ctime()}: {pmt.write_string(msg_pmt)}')\n\n\n def handle_pdu(self,\
\ pdu_pmt):\n now = time.time()\n if now - self._last_log <= self.interval:\n\
\ return\n self._last_log = now\n\n pdu = pmt.cdr(pdu_pmt)\n\
\ data = ' '.join('%02x' % x for x in pmt.u8vector_elements(pdu))\n \
\ print(f'{time.ctime()}: {data}')\n"
affinity: ''
alias: ''
comment: ''
interval: '0.5'
maxoutbuf: '0'
minoutbuf: '0'
_io_cache: ('PDU logger', 'blk', [('interval', '3.0')], [('msg', 'message', 1),
('pdu', 'message', 1)], [], '', ['interval'])
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [632, 560.0]
rotation: 0
state: true
- name: epy_block_2
id: epy_block
_source_code: "import numpy as np\nfrom gnuradio import gr\nimport pmt\n\nfrom\
\ functools import reduce\nimport struct\n\nCODEBOOK = [\n 0b010110, # 0\n\
\ 0b001101, # 1\n 0b001110, # 2\n 0b001011, # 3\n 0b011100, # 4\n\
\ 0b011001, # 5\n 0b011010, # 6\n 0b010011, # 7\n 0b101100, # 8\n\
\ 0b100101, # 9\n 0b100110, # a\n 0b100011, # b\n 0b110100, # c\n\
\ 0b110001, # d\n 0b110010, # e\n 0b101001, # f\n]\n\nclass blk(gr.basic_block):\n\
\ \"\"\"Orion decode\"\"\"\n\n def __init__(self):\n gr.sync_block.__init__(\n\
\ self,\n name='Orion decode',\n in_sig=[],\n\
\ out_sig=[]\n )\n\n self.message_port_register_in(pmt.intern('msg'))\n\
\ self.message_port_register_out(pmt.intern('msg_fail'))\n self.message_port_register_out(pmt.intern('consumption'))\n\
\n self.set_msg_handler(pmt.intern('msg'), self.handle_msg)\n\n\n \
\ def handle_msg(self, msg):\n out = self.decode(msg)\n if out:\n\
\ self.message_port_pub(pmt.intern('consumption'), out)\n \
\ else:\n self.message_port_pub(pmt.intern('msg_fail'), msg)\n\n\n\
\ @staticmethod\n def do_crc(x):\n taps = 0x13d65\n i =\
\ 0\n while (1<<i)<x: i += 1\n while i:\n i -= 1\n\
\ x = min(x, x^(taps<<i))\n return x\n\n\n def decode(self,\
\ msg):\n data = pmt.cdr(msg)\n\n if not pmt.is_u8vector(data):\
\ return\n\n ary = np.array(list(pmt.u8vector_elements(data)), dtype=np.uint8)\n\
\ bits = np.unpackbits(ary)\n if len(bits)%6 != 0: return\n \
\ bits.shape = (-1, 6)\n\n codewords = list(, [32, 16,\
\ 8, 4, 2, 1]))\n try:\n nybbles = [CODEBOOK.index(w) for\
\ w in codewords]\n except ValueError:\n return\n\n \
\ b = bytes(nybbles[x*2]*16 + nybbles[x*2+1] for x in range(len(nybbles)//2))\n\
\n if self.do_crc(reduce(lambda x,y: x<<8 | y, b)) != 0xFFFF: return\n\
\n sender, consumption, _ = struct.unpack('<IIH', b)\n\n out =\
\ pmt.make_dict()\n out = pmt.dict_add(out, pmt.intern('sender'), pmt.from_long(sender))\n\
\ out = pmt.dict_add(out, pmt.intern('consumption'), pmt.from_long(consumption))\n\
\ return out\n"
affinity: ''
alias: ''
comment: ''
maxoutbuf: '0'
minoutbuf: '0'
_io_cache: ('Orion decode', 'blk', [], [('msg', 'message', 1)], [('consumption',
'message', 1), ('msg_fail', 'message', 1)], 'Orion decode', [])
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [368, 560.0]
rotation: 0
state: true
- name: fir_filter_xxx_0
id: fir_filter_xxx
affinity: ''
alias: ''
comment: ''
decim: '1'
maxoutbuf: '0'
minoutbuf: '0'
samp_delay: '1'
taps: '[0.225, 0.55, 0.225]'
type: fff
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [608, 332.0]
rotation: 0
state: enabled
- name: low_pass_filter_0
id: low_pass_filter
affinity: ''
alias: ''
beta: '6.76'
comment: ''
cutoff_freq: 200e3
decim: decim
gain: '1'
interp: '1'
maxoutbuf: '0'
minoutbuf: '0'
samp_rate: samp_rate
type: fir_filter_ccf
width: 50e3
win: firdes.WIN_HAMMING
bus_sink: false
bus_source: false
bus_structure: null
coordinate: [216, 292.0]
rotation: 0
state: enabled
- [analog_pll_freqdet_cf_0, '0', fir_filter_xxx_0, '0']
- [blocks_file_source_0, '0', blocks_interleaved_char_to_complex_0, '0']
- [blocks_interleaved_char_to_complex_0, '0', blocks_throttle_0, '0']
- [blocks_multiply_const_vxx_0, '0', dc_blocker_xx_0, '0']
- [blocks_throttle_0, '0', blocks_multiply_const_vxx_0, '0']
- [dc_blocker_xx_0, '0', low_pass_filter_0, '0']
- [epy_block_0, msg, epy_block_2, msg]
- [epy_block_2, consumption, epy_block_1, msg]
- [epy_block_2, msg_fail, epy_block_1, pdu]
- [fir_filter_xxx_0, '0', epy_block_0, '0']
- [low_pass_filter_0, '0', analog_pll_freqdet_cf_0, '0']
file_format: 1
#!/usr/bin/env python
import re
import struct
import binascii
import argparse
import numpy as np
'-+-++-', # 0
'--++-+', # 1
'--+++-', # 2
'--+-++', # 3
'-+++--', # 4
'-++--+', # 5
'-++-+-', # 6
'-+--++', # 7
'+-++--', # 8
'+--+-+', # 9
'+--++-', # a
'+---++', # b
'++-+--', # c
'++---+', # d
'++--+-', # e
'+-+--+', # f
PREAMBLE = '-+'*8
SYNC = '----++++-+'
FOOTER = '+-'*8
def crc16(data: bytes):
POLY = 0x13d65
state = 0
for b in data + b'\0\0': # 2 null loops to allow CRC16 to finalize
for x in range(8):
state <<= 1
if state&0x10000:
state ^= POLY
state ^= b
return state^0xFFFF
def encode(data: bytes):
out = ''
for b in data:
for nybble in [b>>4, b&0xF]: # high nybble first (typical hex order)
out += CODEBOOK[nybble]
return out
def modulate(syms: str):
syms = np.array([{'+': +1., '-': -1.}[s] for s in syms])
samp = np.repeat(syms, 10) # 100 Ksyms/sec -> 1 Msps
samp *= np.pi*2 * 100_000 / 1_000_000 # Norm. ang. freq. for +/- 100KHz
out = np.e ** (1j*np.cumsum(samp)) # FM modulation
return out
def save(samples: np.array, filename: str):
hackrf_format = np.outer(samples, [127, -127j]).real.astype(np.int8)
with open(filename, 'wb') as f:
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--verbose', '-v', action='store_true')
parser.add_argument('--output', '-o', default=None)
def parse_sender(sender):
m = re.match('^[0-9a-fA-F]{8}$', sender)
return binascii.unhexlify('ascii'))
parser.add_argument('sender', type=parse_sender)
parser.add_argument('consumption', type=int)
args = parser.parse_args()
assert 0 <= args.consumption < (1<<32)
msg = args.sender + struct.pack('<I', args.consumption)
msg += struct.pack('>H', crc16(msg))
if args.verbose:
print('Message (sender, consumption, CRC):')
print(' ' + ' '.join('%02x'%x for x in msg))
encoded = encode(msg)
if args.verbose:
print('Payload syms:')
print(' ' + ' '.join(binascii.hexlify(msg).decode('ascii')))
print(' ' + ' '.join(encoded[x*6:(x+1)*6] for x in range(len(msg)*2)))
encoded = PREAMBLE + SYNC + encoded + FOOTER
if args.verbose:
print('Full transmission:')
print(' ' + encoded)
if args.output:
modulated = modulate(encoded)
save(modulated, args.output)
if __name__ == '__main__':
Copy link

Saw your issue on rtlamr. Looks like we have different meters but I was hoping it might be similar, but the one I have is a I captured 914.7Mhz +/- 10mhz (40 M samples at 2 decimation) for a few hours and then ran it through DC Remove and Power Squelch to get a capture with just the higher power (-20db) samples.

Here's a picture with the FFT "held" so you can kind of see the channels.

Idk if you'd be at all interested in helping adapt your blocks here to decode my meter, looks like they might be pretty different... but thought I'd ask

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment