Skip to content

Instantly share code, notes, and snippets.

@danielballan
Last active November 4, 2015 00:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save danielballan/61fd3448f50027423a54 to your computer and use it in GitHub Desktop.
Save danielballan/61fd3448f50027423a54 to your computer and use it in GitHub Desktop.
Ophyd with metaclasses
"""
Example:
In [35]: import metaclass_demo as mcd
In [36]: t = mcd.Thing('asdf', ['val'])
In [39]: t.signal_names
Out[39]: ['stop', 'val']
m
t.base_name t.read t.signal_names t.trigger
t.describe t.read_fields t.stop t.val
In [38]: t.val.
t.val.connected t.val.put t.val.readback_pv
t.val.get t.val.pv
In [40]: t.val.pv
Out[40]: <PV 'asdf.VAL': not connected>
In [38]: t.describe()
Out[38]: {'val': {'source': 'asdf.VAL'}}
"""
import functools
import sys
import re
from inspect import Parameter, Signature
import epics
from ophyd.utils.epics_pvs import raise_if_disconnected
from ophyd.utils.errors import DisconnectedError
def raise_if_disconnected(fcn):
'''Decorator to catch attempted access to disconnected EPICS channels.'''
# differs from implementation in utils.py because it gives pvname, not
# object name
@functools.wraps(fcn)
def wrapper(self, *args, **kwargs):
if self.connected:
return fcn(self, *args, **kwargs)
else:
raise DisconnectedError('{} is not connected'.format(
self.pv.pvname))
return wrapper
# We could use real EpicsSignals, but to avoid confusing myself I made
# dead simple ones.
class EpicsSignalLite:
"""A simplified EpicsSignal for demo purposes only
This is a wrapper around a single, read-only epics.PV.
"""
def __init__(self, pv_name, *args, **kwargs):
self.pv = epics.PV(pv_name, *args, **kwargs)
@property
def connected(self):
return self.pv.connected
@raise_if_disconnected
def get(self, *args, **kwargs):
# pass through
return self.pv.get(*args, **kwargs)
class EpicsSignalLiteRW(EpicsSignalLite):
"""A simplified EpicsSignal for demo purposes only
This is a wrapper around a writeable PV and its readback value.
"""
def __init__(self, pv_name, *args, **kwargs):
super().__init__(pv_name, *args, **kwargs)
self.readback_pv = epics.PV('{}.RBV'.format(pv_name))
@property
def connected(self):
return self.pv.connected and self.readback_pv.connected
@raise_if_disconnected
def get(self, *args, **kwargs):
# pass through to the readback PV
return self.readback_pv.get(*args, **kwargs)
@raise_if_disconnected
def put(self, *args, **kwargs):
# pass through to the writeable PV -- potentially could check limits
# TODO: Should this return a StatusObj?
return self.pv.put(*args, **kwargs)
# There are two descriptors, Signal and SignalRW.
class Signal:
"A descriptor representing a single read-only PV"
_class = EpicsSignalLite # type of object returned by __get__
def __init__(self, pv_template, *args, **kwargs):
if '{base_name}' not in pv_template:
raise ValueError("pv_template must contain '{base_name}'")
self.pv_template = pv_template
self.args = args
self.kwargs = kwargs
def __get__(self, instance, owner):
if instance is None:
return
pv_name = self.pv_template.format(base_name=instance.base_name)
attr_name = instance._templates[self.pv_template]
if pv_name not in instance._signals:
instance._signals[attr_name] = self._class(pv_name, *self.args,
**self.kwargs)
return instance._signals[attr_name]
class SignalRW(Signal):
"A descriptor representing a writeable PV with a readback value."
_class = EpicsSignalLiteRW
# Now for the dreaded metaclasses. All classes that use Signal descriptors should
# use this SignalMeta metaclass.
class SignalMeta(type):
"""
Creates attributes for Signals by inspecting class definition
The main benefit of the metaclass is that you don't need to write:
foo = Signal('..pv..', name='foo')
We can get the name from inspecting the class, so this is sufficient:
foo = Signal('..pv..')
"""
def __new__(cls, name, bases, clsdict):
clsobj = super().__new__(cls, name, bases, clsdict)
# maps Signal attribute names to Signal classes
sig_dict = {k: v for k, v in clsdict.items() if isinstance(v, Signal)}
# maps pv templates to attribute names; this is used by Signal to
# ascertain the attribute name of each EpicsSignal it creates
clsobj._templates = {v.pv_template: k for k, v in sig_dict.items()}
# List Signal attribute names.
clsobj.signal_names = list(sig_dict.keys())
# Store EpicsSignal objects (only created once they are accessed)
clsobj._signals = {}
# BONUS (overkill?): Generate a default signature for the set method.
writable_signals = [k for k, v in sig_dict.items()
if isinstance(v, SignalRW)]
signature = Signature([Parameter(name, Parameter.POSITIONAL_OR_KEYWORD)
for name in writable_signals])
clsobj._default_sig_for_set = signature
return clsobj
# The base class uses the 'signal_names' attribute (created by SignalMeta)
# to define the bluesky interface automatically.
class Base(metaclass=SignalMeta):
"""
Base class for hardware objects
This class provides attribute access to one or more Signals, which can be
a mixture of read-only and writable. All must share the same base_name.
"""
def __init__(self, base_name, read_fields=None):
self.base_name = base_name
if read_fields is None:
read_fields = self.signal_names
self.read_fields = read_fields
def read(self):
# map names ("data keys") to actual values
return {name: getattr(self, name).get() for name in self.read_fields}
def describe(self):
return {name: {'source': getattr(self, name).pv.pvname}
for name in self.read_fields}
def stop(self):
"to be defined by subclass"
pass
def trigger(self):
"to be defined by subclass"
pass
# Now, finally, the punchline. I imagine that a hardware object class
# could look something like this. Many will require additional special
# methods, but their basic interaction with the PVs will be spelled out
# plainly and succinctly at the top.
class Thing(Base):
"Example of a hardware object"
val = SignalRW('{base_name}.VAL')
stop = SignalRW('{base_name}.STOP')
class SettableBase(Base):
"""
THIS IS EXTRA CREDIT. IT MAY NOT BE WORTH THE COMPLEXITY....
Base class for hardware objects that can be set
The entire purpose of this base class is to provide an auto-generated set
method with a signature that provides one argument per writable signal.
If you want a different signature, you
need to write you own set, and this subclass adds no utility.
"""
def set(self, *args, **kwargs):
bound = self._default_sig_for_set.bind(*args, **kwargs)
status_objs = []
for name, val in bound.arguments.items():
sig = getattr(self, name)
# TODO : Will sig.put return a status object?
status_objs.append(sig.put(val))
return status_objs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment