Skip to content

Instantly share code, notes, and snippets.

@Marcus10110
Last active March 10, 2020 18:35
Show Gist options
  • Save Marcus10110/f178b66c9b7929d17a9875283b5771da to your computer and use it in GitHub Desktop.
Save Marcus10110/f178b66c9b7929d17a9875283b5771da to your computer and use it in GitHub Desktop.
Mark's first Saleae High Level Analyzers

Sample Saleae High Level Protocol Analyzers

This requires the Saleae Logic software version 2.2.6 or newer.

The High Level Analyzer API is likely to change dramatically over the next few months. Always check for updated samples and documentation at discuss.saleae.com.

Saleae High Level Analyzers (HLAs) allow you to write python code that processes a stream of protocol frames, and produces a new stream of protocol frames.

HLAs require two files, an extension.json file, and a python source file.

the extension.json file looks like so, and can include one or more HLAs.

{
    "version": "0.0.1",
    "apiVersion": "1.0.0",
    "author": "Mark \"cool guy\" Garrison",
    "name": "Marks Utilities",
    "extensions": {
        "Fancy I2C": {
            "type": "HighLevelAnalyzer",
            "entryPoint": "util.I2cHla"
        },
        "Text Messages": {
            "type": "HighLevelAnalyzer",
            "entryPoint": "util.TextMessages"
        }
    }
}

The "extensions" object should contain one key for each HLA class you would like to write. the "entryPoint" key contains the python file name, a dot, and the python class name, allowing you to write multiple HLAs in a single python file, or separate python files. (i.e. "fileName.className")

To write a HLA, you need to provide a single python class which implements these methods:

class Hla():
  def __init__(self):
    pass

  def get_capabilities(self):
    pass

  def set_settings(self, settings):
    pass

  def decode(self, data):
    return data

This example has no settings, and will simply copy the frames from the input analyzer to the output. It also provides no display template, so the default display template will be used.

def get_capabilities(self): isn't used yet, but will eventually support exposing selectable settings to the user.

def set_settings(self, settings): will eventually accept the user's settings, but for now the provided settings argument will be an empty object.

def set_settings(self, settings): can also optionally return an object that describes how to format the output frames for display in the software. There should be one entry for every frame "type" your HLA produces.

def decode(self, data): is called once for every frame passed to the HLA from the attached analyzer. It can return nothing, a single frame, or an array of frames.

both the input and the output frames share a common shape. The type is always a python dictionary with the following shape.

{
  "type": "error/address/data/start/stop/etc...",
  "start_time": 0.0052,
  "end_time": 0.0076,
  "data": {
    ...
  }
}

For output frames, the "type" key is used to locate the correct format string, if formatting strings are provided by the set_settings function return value.

"start_time" and "end_time" show the range of time of the frame, in seconds, relative to the start of the recording. (This is still the case even if a trigger is used, these times will not be relative to trigger time zero)

The "data" key can contain any number of keys, and then can be accessed in the custom format string for the given frame type.

Example:

# Format strings
def set_settings(self, settings):
  return {
      "result_types": {
          "error": {
              "format": "Error!"
          },
          "i2c": {
              "format": "address: {{data.address}}; data[{{data.count}}]: [ {{data.data}} ]"
          }
      }
  }
# Example frame of type "error":
def decode(self, data):
  return {
    "type": "error",
    "start_time": 0.0052,
    "end_time": 0.0076,
   "data": {}
  }

# Example frame of type "i2c":
def decode(self, data):
  return {
    "type": "i2c",
    "start_time": 0.0052,
    "end_time": 0.0076,
   "data": {
     "address": 17,
      "data": "0x17, 0x23, 0xFA",
      "count": 3
   }
  }

Input Frame Types

At launch, we've included HLA support for our Serial, I2C, and SPI analyzers. All of the other analyzers we include with our application cannot be used with HLAs yet, but we will quickly add support.

Serial format

{
  "type": "data",
  "start_time": 0.0052,
  "end_time": 0.0076,
  "data": {
    "value": 42,
    "parity_error": False,
    "framing_error": False,
    "address": False, # only used in Serial MP/MDB mode.
  }
}

I2C format

# Start Condition
{
  "type": "start",
  "start_time": 0.0052,
  "end_time": 0.0076,
  "data": {
  }
}
# Stop Condition
{
  "type": "stop",
  "start_time": 0.0052,
  "end_time": 0.0076,
  "data": {
  }
}
# Address Byte
{
  "type": "address",
  "start_time": 0.0052,
  "end_time": 0.0076,
  "data": {
    "address": 42
  }
}
# Data Byte
{
  "type": "data",
  "start_time": 0.0052,
  "end_time": 0.0076,
  "data": {
    "data": 42
  }
}

SPI Format

{
  "type": "result",
  "start_time": 0.0052,
  "end_time": 0.0076,
  "data": {
    "msio": 42,
    "mosi": 42
  }
}

Feedback Welcome!

The HLA API is far from complete. We expect to dramatically expand this in the near future, as well as add support for custom measurements for analog and digital channels. Feedback is welcome. Please direct it to discuss.saleae.com.

{
"version": "0.0.1",
"apiVersion": "1.0.0",
"author": "Mark \"cool guy\" Garrison",
"name": "Marks Utilities",
"extensions": {
"Fancy I2C": {
"type": "HighLevelAnalyzer",
"entryPoint": "util.I2cHla"
},
"Text Messages": {
"type": "HighLevelAnalyzer",
"entryPoint": "util.TextMessages"
}
}
}
# This HLA only supports the I2C analyzer results, and will produce a single frame for every transaction. (from start condition to stop condition).
# It demonstrates a custom format string, and how to parse frames produced by the I2C analyzer.
class I2cHla():
temp_frame = None
def __init__(self):
pass
def get_capabilities(self):
pass
def set_settings(self, settings):
return {
'result_types': {
'error': {
'format': 'Error!'
},
"hi2c": {
'format': 'address: {{data.address}}; data[{{data.count}}]: [ {{data.data}} ]'
}
}
}
def decode(self, data):
# set our frame to an error frame, which will eventually get over-written as we get data.
if self.temp_frame is None:
self.temp_frame = {
"type": "error",
"start_time": data["start_time"],
"end_time": data["end_time"],
"data": {
"address": "error",
"data": "",
"count": 0
}
}
if data["type"] == "start" or (data["type"] == "address" and self.temp_frame["type"] == "error" ):
self.temp_frame = {
"type": "hi2c",
"start_time": data["start_time"],
"data": {
"data": "",
"count": 0
}
}
if data["type"] == "address":
address_byte = data["data"]["address"][0]
self.temp_frame["data"]["address"] = hex(address_byte)
if data["type"] == "data":
data_byte = data["data"]["data"][0]
self.temp_frame["data"]["count"] += 1
if len(self.temp_frame["data"]["data"]) > 0:
self.temp_frame["data"]["data"] += ", "
self.temp_frame["data"]["data"] += hex(data_byte)
if data["type"] == "stop":
self.temp_frame["end_time"] = data["end_time"]
# "end_time": data["end_time"],
new_frame = self.temp_frame
self.temp_frame = None
return new_frame
# This HLA takes a stream of bytes (preferably ascii characters) and combines individual frames into larger frames in an attempt to make text strings easier to read.
# For example, this should make reading serial log messages much easier in the software.
# It supports delimiting on special characters, and after a certain delay is detected between characters.
# It supports the I2C, SPI, and Serial analyzers, although it's most useful for serial port messages.
class TextMessages():
temp_frame = None
def __init__(self):
pass
def get_capabilities(self):
pass
def set_settings(self, settings):
return {
'result_types': {
'error': {
'format': 'Error!'
},
"message": {
'format': '{{data.str}}'
}
}
}
def clear_stored_message(self, data):
self.temp_frame = {
"type": "message",
"start_time": data["start_time"],
"end_time": data["end_time"],
"data": {
"str": "",
}
}
def append_char(self, char):
self.temp_frame["data"]["str"] += char
def have_existing_message(self):
if self.temp_frame is None:
return False
if len(self.temp_frame["data"]["str"]) == 0:
return False
return True
def update_end_time(self, data):
self.temp_frame["end_time"] = data["end_time"]
def decode(self, data):
# All protocols - delimit on special characters
delimiters = [ "\0", "\n", "\r", " " ]
# All protocols - delimit on a delay
maximum_delay = 0.5E-3 # consider frames further apart than this separate messages
# I2C - delimit on address byte
# SPI - delimit on Enable toggle. TODO: add support for the SPI analyzer to send Enable/disable frames, or at least a Packet ID to the low level analyzer.
frame_start = data["start_time"]
frame_end = data["end_time"]
char = "unknown error."
# setup initial result, if not present
first_frame = False
if self.temp_frame is None:
first_frame = True
self.clear_stored_message(data)
# handle serial data
if data["type"] == "data" and "value" in data["data"].keys():
value = data["data"]["value"]
char = chr(value)
# handle I2C address
if data["type"] == "address":
value = data["data"]["address"][0]
# if we have an existing message, send it
if self.have_existing_message() == True:
ret = self.temp_frame
self.clear_stored_message(data)
self.append_char("address: " + hex(value) + ";")
return ret
# append the address to the beginning of the new message
self.append_char("address: " + hex(value) + ";")
return None
# handle I2C data byte
if data["type"] == "data" and "data" in data["data"].keys() and type(data["data"]["data"]) is bytes:
value = data["data"]["data"][0]
char = chr(value)
# handle I2C start condition
if data["type"] == "start":
return
# handle I2C stop condition
if data["type"] == "stop":
if self.have_existing_message() == True:
ret = self.temp_frame
self.temp_frame = None
return ret
self.temp_frame = None
return
# handle SPI byte
if data["type"] == "result":
char = ""
if "miso" in data["data"].keys() and data["data"]["miso"] != 0:
char += chr(data["data"]["miso"])
if "mosi" in data["data"].keys() and data["data"]["mosi"] != 0:
char += chr(data["data"]["mosi"])
# If we have a timeout event, commit the frame and make sure not to add the new frame after the delay, and add the current character to the next frame.
if first_frame == False and self.temp_frame is not None:
if self.temp_frame["end_time"] + maximum_delay < frame_start:
ret = self.temp_frame
self.clear_stored_message(data)
self.append_char(char)
return ret
self.append_char(char)
self.update_end_time(data)
# if the current character is a delimiter, commit it.
if char in delimiters:
ret = self.temp_frame
# leave the temp_frame blank, so the next frame is the beginning of the next message.
self.temp_frame = None
return ret
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment