Skip to content

Instantly share code, notes, and snippets.

@devanlai
Created April 12, 2020 18:28
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devanlai/4f56c11e24efcfeefa73cc7a9a48d7fe to your computer and use it in GitHub Desktop.
Save devanlai/4f56c11e24efcfeefa73cc7a9a48d7fe to your computer and use it in GitHub Desktop.
WIP pure-python sigrok decoder host
__all__ = ["Decoder", "OUTPUT_ANN", "OUTPUT_PYTHON", "OUTPUT_BINARY", "OUTPUT_META", "SRD_CONF_SAMPLERATE"]
from .decoder import Decoder
from .constants import OUTPUT_ANN, OUTPUT_PYTHON, OUTPUT_BINARY, OUTPUT_META, SRD_CONF_SAMPLERATE
__all__ = ["OUTPUT_ANN", "OUTPUT_PYTHON", "OUTPUT_BINARY", "OUTPUT_META", "SRD_CONF_SAMPLERATE"]
OUTPUT_ANN, OUTPUT_PYTHON, OUTPUT_BINARY, OUTPUT_META = range(4)
SRD_CONF_SAMPLERATE = 10000
__all__ = ["Decoder"]
from array import array
from typing import Any, Optional, Sequence, Tuple
from queue import Queue
from .constants import *
class BaseDecoder(object):
def __init__(self) -> None:
pass
def put(self, startsample:int, endsample:int, output_id:int, data:Any) -> None:
print("Put", startsample, endsample, data)
def register(self, output_type:int, proto_id:str = None, meta:Tuple[int, str, str] = None) -> int:
pass
def has_channel(self, index: int) -> bool:
return self._channel_map.get(index) is not None
def start(self) -> None:
raise NotImplementedError
def reset(self) -> None:
raise NotImplementedError
def metadata(self, key:int, value:Any) -> None:
pass
class RawDecoder(BaseDecoder):
def __init__(self) -> None:
pass
def _initialize(self, options: dict, channel_map: dict, pin_initializer: Optional[int]) -> None:
self._chunk = None
self._sample_offset = 0
self._sample_base = 0
self._queue = Queue()
self._channel_map = channel_map
self._initial_pins = pin_initializer
self._prev_pins = pin_initializer
self._num_channels = 0
if hasattr(self, "channels"):
self._num_channels += len(self.channels)
if hasattr(self, "optional_channels"):
self._num_channels += len(self.optional_channels)
self.options = options
self.matched = None
self.samplenum = 0
def _reset(self) -> None:
# TODO: handle the input queue
self._chunk = None
self._sample_offset = 0
self._sample_base = 0
self._prev_pins = self._initial_pins
self.matched = None
self.samplenum = 0
def _lookup_pin(self, channel: int) -> int:
return self._channel_map.get(channel)
def _lookup_channel(self, channel:int) -> dict:
if hasattr(self, "channels") and channel < len(self.channels):
return self.channels[channel]
else:
return self.optional_channels[channel]
def wait(self, conditions:Sequence[dict] = None) -> Tuple[int, ...]:
if not conditions:
conditions = [{"skip": 1}]
# TODO: handle case where we cross chunk boundaries
matched = [False for cond in conditions]
initial_samplenum = self.samplenum
while not any(matched):
if self._chunk is None:
# Fetch the next chunk
self._sample_base = self.samplenum
self._sample_offset = 0
next_chunk = self._queue.get()
if next_chunk is None:
# Received end-of-data signal; terminate this thread
self._release_chunk()
raise EOFError
self._chunk = next_chunk
if self.samplenum == 0 and self._initial_pins is None:
self._prev_pins = self._chunk[0]
# Evaluate conditions
current_pins = self._chunk[self._sample_offset]
for i,product in enumerate(conditions):
matched[i] = False
for channel, pin_condition in product.items():
if channel == 'skip':
skip_count = pin_condition
if self.samplenum + 1 < initial_samplenum + skip_count:
# Have not waited for enough samples
break
elif 0 <= channel < self._num_channels:
pin_index = self._lookup_pin(channel)
if pin_index is None:
raise ValueError("optional channel '{0.id}' is not mapped".format(self._lookup_channel(channel)))
bitmask = (1 << pin_index)
if pin_condition == "l":
if (current_pins & bitmask) != 0:
break
elif pin_condition == "h":
if (current_pins & bitmask) != bitmask:
break
elif pin_condition == "r":
if (self._prev_pins & bitmask) != 0 or (current_pins & bitmask) != bitmask:
break
elif pin_condition == "f":
if (self._prev_pins & bitmask) != bitmask or (current_pins & bitmask) != 0:
break
elif pin_condition == "e":
if ((self._prev_pins ^ current_pins) & bitmask) == 0:
break
elif pin_condition == "s":
if ((self._prev_pins ^ current_pins) & bitmask) == bitmask:
break
else:
raise ValueError("Invalid pin state condition '{}'".format(pin_condition))
else:
raise ValueError("Invalid channel index {:d}".format(channel))
else:
matched[i] = True
# Advance to the next sample
self._prev_pins = current_pins
self._sample_offset += 1
self.samplenum += 1
if self._sample_offset >= len(self._chunk):
# Done with this chunk
self._chunk = None
self._release_chunk()
# Matched at least one condition
self.matched = tuple(matched)
channel_values = []
for i in range(self._num_channels):
pin_index = self._lookup_pin(i)
if pin_index is not None:
value = 1 if (current_pins & (1 << pin_index)) else 0
else:
value = None
channel_values.append(value)
return tuple(channel_values)
def decode(self) -> None:
raise NotImplementedError
def _feed_chunk(self, data: array) -> None:
self._queue.put(data)
def _release_chunk(self) -> None:
self._queue.task_done()
def _signal_end(self) -> None:
self._queue.put(None)
Decoder = RawDecoder
class StackedDecoder(BaseDecoder):
def decode(self, startsample:int, endsample:int, data:Any) -> None:
raise NotImplementedError
#!/usr/bin/python3
import array
import configparser
import importlib
import os
import sys
import threading
import zipfile
from sigrokdecode.constants import *
def parse_samplerate(s):
if s.endswith("MHz"):
return int(s[:-3]) * 1000000
elif s.endwith("kHz"):
return int(s[:-3]) * 1000
else:
return int(s)
def load_sr(f):
z = zipfile.ZipFile(f)
sr = {}
with z.open("version", "r") as version_file:
sr["version"] = int(version_file.read().decode("ascii"))
with z.open("metadata", "r") as meta_file:
config = configparser.ConfigParser()
config.read_string(meta_file.read().decode("ascii"), source=meta_file.name)
sr["metadata"] = config
sr["devices"] = []
for section in sr["metadata"].sections():
if section.startswith("device"):
device = {}
device["driver"] = config[section].get("driver")
device["base_filename"] = config[section]["capturefile"]
device["samplerate_hz"] = parse_samplerate(config[section]["samplerate"])
device["channel_count"] = int(config[section]["total probes"])
device["unit_size"] = int(config[section]["unitsize"])
device["channel_names"] = {}
for i in range(1, device["channel_count"]+1):
digital_probe_opt_name = "probe{:d}".format(i)
if config.has_option(section, digital_probe_opt_name):
device["channel_names"][i] = config[section][digital_probe_opt_name]
analog_probe_opt_name = "analog{:d}".format(i)
if config.has_option(section, analog_probe_opt_name):
device["channel_names"][i] = config[section][analog_probe_opt_name]
sr["devices"].append(device)
typecodes = { 1: "B", 2: "H", 4: "L", 8: "Q" }
for device in sr["devices"]:
data_filenames = sorted(fname for fname in z.namelist() if fname.startswith(device["base_filename"]))
chunks = []
for filename in data_filenames:
with z.open(filename, "r") as data_file:
chunks.append(data_file.read())
device["data"] = array.array(typecodes[device["unit_size"]], b"".join(chunks))
return sr
def load_decoder_class(name, srd_dir):
path = os.path.join(srd_dir, name, "pd.py")
spec = importlib.util.spec_from_file_location("pd", path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module.Decoder
def run_decoder(decoder, metadata):
for key, value in metadata.items():
decoder.metadata(key, value)
print("starting decoder")
decoder.start()
try:
decoder.decode()
except EOFError:
# TODO: cleanup decoder
pass
print("exiting decoder")
def spawn_decoder_thread(decoder_class, metadata, options, channel_map, pin_initializer):
decoder = decoder_class()
decoder._initialize(options, channel_map, pin_initializer)
# TODO: add decoder instance to list of decoders pending input
thread = threading.Thread(target=run_decoder, args=(decoder, metadata))
thread.start()
return thread, decoder
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("input", type=argparse.FileType("rb"))
parser.add_argument("decoder")
args = parser.parse_args()
srd_dir_var = os.environ.get("SIGROKDECODE_DIR", "~/.local/share/libsigrokdecode/decoders")
srd_dir = os.path.expanduser(os.path.expandvars(srd_dir_var))
sys.path.append(srd_dir)
sr = load_sr(args.input)
decoder_class = load_decoder_class(args.decoder, srd_dir)
options = {}
for option in decoder_class.options:
options[option["id"]] = option["default"]
metadata = {
SRD_CONF_SAMPLERATE: sr["devices"][0]["samplerate_hz"]
}
pin_initializer = None
channel_map = { 0: 0 }
thread, decoder = spawn_decoder_thread(decoder_class, metadata, options, channel_map, pin_initializer)
print("Feeding data")
for device in sr["devices"]:
# Feed in artificial 1k chunks
i = 0
while (i * 1024) < len(device["data"]):
chunk = device["data"][i*1024:(i+1)*1024]
decoder._feed_chunk(chunk)
i += 1
# Signal the end of input
decoder._signal_end()
print("Signalled end")
# Wait for the thread to finish processing input
thread.join()
print("Done")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment