Skip to content

Instantly share code, notes, and snippets.

Last active November 4, 2015 00:38
Show Gist options
  • Save danielballan/61fd3448f50027423a54 to your computer and use it in GitHub Desktop.
Save danielballan/61fd3448f50027423a54 to your computer and use it in GitHub Desktop.
Ophyd with metaclasses
In [35]: import metaclass_demo as mcd
In [36]: t = mcd.Thing('asdf', ['val'])
In [39]: t.signal_names
Out[39]: ['stop', 'val']
t.base_name t.signal_names t.trigger
t.describe t.read_fields t.stop t.val
In [38]: t.val.
t.val.connected t.val.put t.val.readback_pv
t.val.get t.val.pv
In [40]: t.val.pv
Out[40]: <PV 'asdf.VAL': not connected>
In [38]: t.describe()
Out[38]: {'val': {'source': 'asdf.VAL'}}
import functools
import sys
import re
from inspect import Parameter, Signature
import epics
from ophyd.utils.epics_pvs import raise_if_disconnected
from ophyd.utils.errors import DisconnectedError
def raise_if_disconnected(fcn):
'''Decorator to catch attempted access to disconnected EPICS channels.'''
# differs from implementation in because it gives pvname, not
# object name
def wrapper(self, *args, **kwargs):
if self.connected:
return fcn(self, *args, **kwargs)
raise DisconnectedError('{} is not connected'.format(
return wrapper
# We could use real EpicsSignals, but to avoid confusing myself I made
# dead simple ones.
class EpicsSignalLite:
"""A simplified EpicsSignal for demo purposes only
This is a wrapper around a single, read-only epics.PV.
def __init__(self, pv_name, *args, **kwargs):
self.pv = epics.PV(pv_name, *args, **kwargs)
def connected(self):
return self.pv.connected
def get(self, *args, **kwargs):
# pass through
return self.pv.get(*args, **kwargs)
class EpicsSignalLiteRW(EpicsSignalLite):
"""A simplified EpicsSignal for demo purposes only
This is a wrapper around a writeable PV and its readback value.
def __init__(self, pv_name, *args, **kwargs):
super().__init__(pv_name, *args, **kwargs)
self.readback_pv = epics.PV('{}.RBV'.format(pv_name))
def connected(self):
return self.pv.connected and self.readback_pv.connected
def get(self, *args, **kwargs):
# pass through to the readback PV
return self.readback_pv.get(*args, **kwargs)
def put(self, *args, **kwargs):
# pass through to the writeable PV -- potentially could check limits
# TODO: Should this return a StatusObj?
return self.pv.put(*args, **kwargs)
# There are two descriptors, Signal and SignalRW.
class Signal:
"A descriptor representing a single read-only PV"
_class = EpicsSignalLite # type of object returned by __get__
def __init__(self, pv_template, *args, **kwargs):
if '{base_name}' not in pv_template:
raise ValueError("pv_template must contain '{base_name}'")
self.pv_template = pv_template
self.args = args
self.kwargs = kwargs
def __get__(self, instance, owner):
if instance is None:
pv_name = self.pv_template.format(base_name=instance.base_name)
attr_name = instance._templates[self.pv_template]
if pv_name not in instance._signals:
instance._signals[attr_name] = self._class(pv_name, *self.args,
return instance._signals[attr_name]
class SignalRW(Signal):
"A descriptor representing a writeable PV with a readback value."
_class = EpicsSignalLiteRW
# Now for the dreaded metaclasses. All classes that use Signal descriptors should
# use this SignalMeta metaclass.
class SignalMeta(type):
Creates attributes for Signals by inspecting class definition
The main benefit of the metaclass is that you don't need to write:
foo = Signal('..pv..', name='foo')
We can get the name from inspecting the class, so this is sufficient:
foo = Signal('..pv..')
def __new__(cls, name, bases, clsdict):
clsobj = super().__new__(cls, name, bases, clsdict)
# maps Signal attribute names to Signal classes
sig_dict = {k: v for k, v in clsdict.items() if isinstance(v, Signal)}
# maps pv templates to attribute names; this is used by Signal to
# ascertain the attribute name of each EpicsSignal it creates
clsobj._templates = {v.pv_template: k for k, v in sig_dict.items()}
# List Signal attribute names.
clsobj.signal_names = list(sig_dict.keys())
# Store EpicsSignal objects (only created once they are accessed)
clsobj._signals = {}
# BONUS (overkill?): Generate a default signature for the set method.
writable_signals = [k for k, v in sig_dict.items()
if isinstance(v, SignalRW)]
signature = Signature([Parameter(name, Parameter.POSITIONAL_OR_KEYWORD)
for name in writable_signals])
clsobj._default_sig_for_set = signature
return clsobj
# The base class uses the 'signal_names' attribute (created by SignalMeta)
# to define the bluesky interface automatically.
class Base(metaclass=SignalMeta):
Base class for hardware objects
This class provides attribute access to one or more Signals, which can be
a mixture of read-only and writable. All must share the same base_name.
def __init__(self, base_name, read_fields=None):
self.base_name = base_name
if read_fields is None:
read_fields = self.signal_names
self.read_fields = read_fields
def read(self):
# map names ("data keys") to actual values
return {name: getattr(self, name).get() for name in self.read_fields}
def describe(self):
return {name: {'source': getattr(self, name).pv.pvname}
for name in self.read_fields}
def stop(self):
"to be defined by subclass"
def trigger(self):
"to be defined by subclass"
# Now, finally, the punchline. I imagine that a hardware object class
# could look something like this. Many will require additional special
# methods, but their basic interaction with the PVs will be spelled out
# plainly and succinctly at the top.
class Thing(Base):
"Example of a hardware object"
val = SignalRW('{base_name}.VAL')
stop = SignalRW('{base_name}.STOP')
class SettableBase(Base):
Base class for hardware objects that can be set
The entire purpose of this base class is to provide an auto-generated set
method with a signature that provides one argument per writable signal.
If you want a different signature, you
need to write you own set, and this subclass adds no utility.
def set(self, *args, **kwargs):
bound = self._default_sig_for_set.bind(*args, **kwargs)
status_objs = []
for name, val in bound.arguments.items():
sig = getattr(self, name)
# TODO : Will sig.put return a status object?
return status_objs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment