Skip to content

Instantly share code, notes, and snippets.

@jdiez17
Last active September 5, 2020 16:04
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jdiez17/ed1b9016ce26db602298 to your computer and use it in GitHub Desktop.
Save jdiez17/ed1b9016ce26db602298 to your computer and use it in GitHub Desktop.
from gnuradio import analog
from gnuradio import blocks
from gnuradio import eng_notation
from gnuradio import filter
from gnuradio import gr
from gnuradio.eng_option import eng_option
from gnuradio.filter import firdes
from optparse import OptionParser
import osmosdr
import numpy
def sublistExists(list, sublist):
for i in range(len(list)-len(sublist)+1):
if sublist == list[i:i+len(sublist)]:
return True
return False
DOORBELL_ID = [0, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1]
ZERO_PULSE = 200 # microseconds
ONE_PULSE = 600 # microseconds
SAMPLE_RATE = 50000 # Hz
SEQUENCE = [1, 0, 1, 0, 1, 0]
END_OF_MSG = [1, 1, 1]
PACKET_LEN = 18
SAMPLES_ZERO_PULSE = SAMPLE_RATE * (ZERO_PULSE/1000000.)
SAMPLES_ONE_PULSE = SAMPLE_RATE * (ONE_PULSE/1000000.)
class pwm_demod_py(gr.sync_block):
def __init__(self):
self.state = 0
self.count = 0
self.symbols = []
gr.sync_block.__init__(self,
name="pwm_demod_py",
in_sig=[numpy.uint8],
out_sig=[])
def packet_handle(self, packet):
if len(packet) == PACKET_LEN:
# send message to homeassistant here
pass
def process_chunk(self, chunk, eom=END_OF_MSG):
for sample in chunk:
sample = int(sample)
if sample == self.state:
self.count += 1
else:
# Transition to 0: previous state was 1, try to decode symbol.
# Only emit symbol if count is over threshold
if sample == 0 and self.count > SAMPLES_ZERO_PULSE / 2:
symbol = 1 if self.count > SAMPLES_ONE_PULSE - 10 else 0
self.symbols.append(symbol)
# End of packet detector;
# yield symbols if packet trailer detected or long pause
if sublistExists(self.symbols, END_OF_MSG) or \
(sample == 1 and self.count > SAMPLES_ONE_PULSE + 10):
self.packet_handle(self.symbols)
self.symbols = []
self.count = 1
self.state = sample
def work(self, input_items, output_items):
in0 = input_items[0]
self.process_chunk(in0)
return len(in0)
class top_block(gr.top_block):
def __init__(self):
gr.top_block.__init__(self)
##################################################
# Variables
##################################################
self.threshold = threshold = 0.5
self.samp_rate = samp_rate = 0.25e6
self.gain = gain = 1
self.freq = freq = 434.92e6
self.rational_resampler_xxx_0 = filter.rational_resampler_ccc(
interpolation=1,
decimation=5,
taps=None,
fractional_bw=None,
)
self.osmosdr_source_0 = osmosdr.source( args="numchan=" + str(1) + " " + "" )
self.osmosdr_source_0.set_sample_rate(samp_rate)
self.osmosdr_source_0.set_center_freq(freq, 0)
self.osmosdr_source_0.set_freq_corr(0, 0)
self.osmosdr_source_0.set_dc_offset_mode(0, 0)
self.osmosdr_source_0.set_iq_balance_mode(0, 0)
self.osmosdr_source_0.set_gain_mode(False, 0)
self.osmosdr_source_0.set_gain(0, 0)
self.osmosdr_source_0.set_if_gain(24, 0)
self.osmosdr_source_0.set_bb_gain(38, 0)
self.osmosdr_source_0.set_antenna("", 0)
self.osmosdr_source_0.set_bandwidth(0, 0)
self.low_pass_filter_0 = filter.fir_filter_ccf(1, firdes.low_pass(
1, 1e6, 100e3, 1e3, firdes.WIN_HAMMING, 6.76))
self.blocks_threshold_ff_0 = blocks.threshold_ff(threshold, threshold, 0)
self.blocks_multiply_xx_0 = blocks.multiply_vcc(1)
self.blocks_multiply_const_vxx_0 = blocks.multiply_const_vcc((gain, ))
self.blocks_float_to_char_0 = blocks.float_to_char(1, 1)
self.blocks_complex_to_mag_squared_0 = blocks.complex_to_mag_squared(1)
self.pwm_demod = pwm_demod_py()
##################################################
# Connections
##################################################
self.connect((self.blocks_complex_to_mag_squared_0, 0), (self.blocks_threshold_ff_0, 0))
self.connect((self.blocks_multiply_const_vxx_0, 0), (self.rational_resampler_xxx_0, 0))
self.connect((self.blocks_threshold_ff_0, 0), (self.blocks_float_to_char_0, 0))
self.connect((self.low_pass_filter_0, 0), (self.blocks_multiply_const_vxx_0, 0))
self.connect((self.osmosdr_source_0, 0), (self.low_pass_filter_0, 0))
self.connect((self.rational_resampler_xxx_0, 0), (self.blocks_complex_to_mag_squared_0, 0))
self.connect((self.blocks_float_to_char_0, 0), (self.pwm_demod, 0))
def get_threshold(self):
return self.threshold
def set_threshold(self, threshold):
self.threshold = threshold
self._threshold_slider.set_value(self.threshold)
self._threshold_text_box.set_value(self.threshold)
self.blocks_threshold_ff_0.set_hi(self.threshold)
self.blocks_threshold_ff_0.set_lo(self.threshold)
def get_samp_rate(self):
return self.samp_rate
def set_samp_rate(self, samp_rate):
self.samp_rate = samp_rate
self.analog_sig_source_x_0.set_sampling_freq(self.samp_rate)
self.blocks_throttle_0.set_sample_rate(self.samp_rate)
self.osmosdr_source_0.set_sample_rate(self.samp_rate)
def get_gain(self):
return self.gain
def set_gain(self, gain):
self.gain = gain
self._gain_slider.set_value(self.gain)
self._gain_text_box.set_value(self.gain)
self.blocks_multiply_const_vxx_0.set_k((self.gain, ))
def get_freq(self):
return self.freq
def set_freq(self, freq):
self.freq = freq
self._freq_slider.set_value(self.freq)
self._freq_text_box.set_value(self.freq)
self.osmosdr_source_0.set_center_freq(self.freq, 0)
if __name__ == '__main__':
parser = OptionParser(option_class=eng_option, usage="%prog: [options]")
(options, args) = parser.parse_args()
tb = top_block()
tb.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment