Skip to content

Instantly share code, notes, and snippets.

@marcusmueller
Last active December 6, 2015 12:25
Show Gist options
  • Save marcusmueller/6b6e823dcbefe0121213 to your computer and use it in GitHub Desktop.
Save marcusmueller/6b6e823dcbefe0121213 to your computer and use it in GitHub Desktop.
Minimal test for delay tagging
#!/usr/bin/env python2
##################################################
# GNU Radio Python Flow Graph
# Title: Tag Debug
# Generated: Sun Dec 6 13:07:15 2015
##################################################
from gnuradio import blocks
from gnuradio import eng_notation
from gnuradio import gr
from gnuradio.eng_option import eng_option
from gnuradio.filter import firdes
from optparse import OptionParser
import pmt
class tag_debug(gr.top_block):
def __init__(self):
gr.top_block.__init__(self, "Tag Debug")
##################################################
# Blocks
##################################################
self.vector_sink = blocks.vector_sink_f(1)
self.ref_sink = blocks.vector_sink_f(1)
self.blocks_tags_strobe_0 = blocks.tags_strobe(gr.sizeof_float*1, pmt.intern("TEST"), 100, pmt.intern("strobe"))
self.blocks_head_0 = blocks.head(gr.sizeof_float*1, 10**5)
self.blocks_delay_0 = blocks.delay(gr.sizeof_float*1, 100)
##################################################
# Connections
##################################################
self.connect((self.blocks_delay_0, 0), (self.blocks_head_0, 0))
self.connect((self.blocks_head_0, 0), (self.vector_sink, 0))
self.connect((self.blocks_tags_strobe_0, 0), (self.blocks_delay_0, 0))
self.connect((self.blocks_tags_strobe_0, 0), (self.ref_sink, 0))
if __name__ == '__main__':
parser = OptionParser(option_class=eng_option, usage="%prog: [options]")
(options, args) = parser.parse_args()
tb = tag_debug()
tb.start()
tb.wait()
lastoffset = 0
ok_count = 0
for tag in tb.ref_sink.tags():
newoffset = tag.offset
if not newoffset == lastoffset + 100:
print "pre-delay error: this offset %d, last offset %d" % (newoffset , lastoffset)
else:
ok_count += 1
lastoffset = newoffset
print "OK: %d of %d" % (ok_count , len(tb.ref_sink.tags()))
print "="*10
lastoffset = 100
ok_count = 0
for tag in tb.vector_sink.tags():
newoffset = tag.offset
if not newoffset == lastoffset + 100:
print "post-delay error: this offset %d, last offset %d" % (newoffset , lastoffset)
else:
ok_count += 1
lastoffset = newoffset
print "OK: %d of %d" % (ok_count , len(tb.vector_sink.tags()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment