Skip to content

Instantly share code, notes, and snippets.

@daniestevez
Created February 28, 2023 17:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daniestevez/f472b2d9ab41c7561b8f17ee44a54989 to your computer and use it in GitHub Desktop.
Save daniestevez/f472b2d9ab41c7561b8f17ee44a54989 to your computer and use it in GitHub Desktop.
Test script for GNU Radio bug: Setting processor affinity of hier_block2 with message ports segfaults
#!/usr/bin/env python3
from gnuradio import gr, blocks
import pmt
import argparse
import signal
import sys
class my_hier_block(gr.hier_block2):
def __init__(self):
gr.hier_block2.__init__(self, "my_hier_block",
gr.io_signature(0, 0, 0),
gr.io_signature(0, 0, 0))
self.message_port_register_hier_out('pdus')
self.strobe = blocks.message_strobe(pmt.PMT_NIL, 1000)
self.msg_connect((self.strobe, 'strobe'), (self, 'pdus'))
class my_flowgraph(gr.top_block):
def __init__(self, set_affinity):
gr.top_block.__init__(self, 'my_flowgraph', catch_exceptions=True)
self.my_hier = my_hier_block()
if set_affinity:
print('Trying to set affinity of a hier_block2')
self.my_hier.set_processor_affinity([0])
print('Affinty successfully set')
else:
print('Not setting affinity of a hier_block2')
self.message_debug = blocks.message_debug(True)
self.msg_connect((self.my_hier, 'pdus'),
(self.message_debug, 'print'))
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--set_affinity', action='store_true')
return parser.parse_args()
def main(top_block_cls=my_flowgraph):
args = parse_args()
tb = top_block_cls(args.set_affinity)
def sig_handler(sig=None, frame=None):
tb.stop()
tb.wait()
sys.exit(0)
signal.signal(signal.SIGINT, sig_handler)
signal.signal(signal.SIGTERM, sig_handler)
tb.start()
tb.wait()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment