Last active
December 6, 2015 12:25
-
-
Save marcusmueller/6b6e823dcbefe0121213 to your computer and use it in GitHub Desktop.
Minimal test for delay tagging
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2 | |
################################################## | |
# GNU Radio Python Flow Graph | |
# Title: Tag Debug | |
# Generated: Sun Dec 6 13:07:15 2015 | |
################################################## | |
from gnuradio import blocks | |
from gnuradio import eng_notation | |
from gnuradio import gr | |
from gnuradio.eng_option import eng_option | |
from gnuradio.filter import firdes | |
from optparse import OptionParser | |
import pmt | |
class tag_debug(gr.top_block): | |
def __init__(self): | |
gr.top_block.__init__(self, "Tag Debug") | |
################################################## | |
# Blocks | |
################################################## | |
self.vector_sink = blocks.vector_sink_f(1) | |
self.ref_sink = blocks.vector_sink_f(1) | |
self.blocks_tags_strobe_0 = blocks.tags_strobe(gr.sizeof_float*1, pmt.intern("TEST"), 100, pmt.intern("strobe")) | |
self.blocks_head_0 = blocks.head(gr.sizeof_float*1, 10**5) | |
self.blocks_delay_0 = blocks.delay(gr.sizeof_float*1, 100) | |
################################################## | |
# Connections | |
################################################## | |
self.connect((self.blocks_delay_0, 0), (self.blocks_head_0, 0)) | |
self.connect((self.blocks_head_0, 0), (self.vector_sink, 0)) | |
self.connect((self.blocks_tags_strobe_0, 0), (self.blocks_delay_0, 0)) | |
self.connect((self.blocks_tags_strobe_0, 0), (self.ref_sink, 0)) | |
if __name__ == '__main__': | |
parser = OptionParser(option_class=eng_option, usage="%prog: [options]") | |
(options, args) = parser.parse_args() | |
tb = tag_debug() | |
tb.start() | |
tb.wait() | |
lastoffset = 0 | |
ok_count = 0 | |
for tag in tb.ref_sink.tags(): | |
newoffset = tag.offset | |
if not newoffset == lastoffset + 100: | |
print "pre-delay error: this offset %d, last offset %d" % (newoffset , lastoffset) | |
else: | |
ok_count += 1 | |
lastoffset = newoffset | |
print "OK: %d of %d" % (ok_count , len(tb.ref_sink.tags())) | |
print "="*10 | |
lastoffset = 100 | |
ok_count = 0 | |
for tag in tb.vector_sink.tags(): | |
newoffset = tag.offset | |
if not newoffset == lastoffset + 100: | |
print "post-delay error: this offset %d, last offset %d" % (newoffset , lastoffset) | |
else: | |
ok_count += 1 | |
lastoffset = newoffset | |
print "OK: %d of %d" % (ok_count , len(tb.vector_sink.tags())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment