Skip to content

Instantly share code, notes, and snippets.

Created May 21, 2012 18:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save anonymous/2763957 to your computer and use it in GitHub Desktop.
Save anonymous/2763957 to your computer and use it in GitHub Desktop.
IEEE 802.15.4 packet sink
import Numeric
from gnuradio import gr, gru
import crc16
import gnuradio.gr.gr_threading as _threading
import struct
import cogra
# cannot be imported due to
#Traceback (most recent call last):
# File "/home/wishi_t/source/gnuradio/students/marius/thesis/flowgraphs/ieee802154TX.py", line 11, in <module>
# from cogra import ieee_802_15_4_pktget
# File "/usr/local/lib/python2.7/site-packages/cogra/ieee_802_15_4_pktget.py", line 8, in <module>
# import cogra
# File "/usr/local/lib/python2.7/site-packages/cogra/cogra.py", line 26, in <module>
# _cogra = swig_import_helper()
# File "/usr/local/lib/python2.7/site-packages/cogra/cogra.py", line 22, in swig_import_helper
# _mod = imp.load_module('_cogra', fp, pathname, description)
#ImportError: libgruel-3.4.1git.so.0: cannot open shared object file: No such file or directory
MAX_PKT_SIZE = 128
class ieee802_15_4_demod_pkts(gr.hier_block2):
"""
@summary: 802_15_4 demodulator that is a GNU Radio sink.
The input is complex baseband. When packets are demodulated, they are passed to the
app via the callback.
"""
def __init__(self, *args, **kwargs):
"""
@summary: Hierarchical block to throw demod'ed bits into
packets
The input is the a demodulated float signal at baseband.
Demodulated packets are sent to the handler.
@param callback: function of two args: ok, payload
@type callback: ok: bool; payload: string
@param threshold: detect access_code with up to threshold bits wrong (-1 -> use default)
@type threshold: int
"""
try:
self.callback = kwargs.pop('callback')
self.threshold = kwargs.pop('threshold')
self.chan_num = kwargs.pop('channel')
except KeyError:
pass
gr.hier_block2.__init__(self, "ieee802_15_4_demod_pkts",
gr.io_signature(1, 1, gr.sizeof_gr_complex), # Input
gr.io_signature(0, 0, 0)) # Output
self._rcvd_pktq = gr.msg_queue() # holds packets from the PHY
self._packet_sink = ieee_802_15_4_frame_sink(self._rcvd_pktq, self.threshold)
self.connect(self, self._packet_sink)
self._watcher = _queue_watcher_thread(self._rcvd_pktq, self.callback, self.chan_num)
def carrier_sensed(self):
"""
@summary: return True if we detect carrier.
"""
return self._packet_sink.carrier_sensed()
class _queue_watcher_thread(_threading.Thread):
def __init__(self, rcvd_pktq, callback, chan_num):
_threading.Thread.__init__(self)
self.setDaemon(1)
self.rcvd_pktq = rcvd_pktq
self.callback = callback
self.chan_num = chan_num
self.prev_crc = -1
self.keep_running = True
self.start()
def run(self):
while self.keep_running:
print "802_15_4_pkt: waiting for packet"
msg = self.rcvd_pktq.delete_head()
ok = 0
payload = msg.to_string()
print "received packet "
if len(payload) > 2:
crc = crc16.CRC16()
else:
print "too small:", len(payload)
continue
# Calculate CRC skipping over LQI and CRC
crc.update(payload[1:-2])
crc_check = crc.intchecksum()
print "checksum: %s, received: %s" % (crc_check,
str(ord(payload[-2]) + ord(payload[-1]) * 256))
ok = (crc_check == ord(payload[-2]) + ord(payload[-1]) * 256)
msg_payload = payload
if self.prev_crc != crc_check:
self.prev_crc = crc_check
if self.callback:
self.callback(ok, msg_payload, self.chan_num)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment