Last active
November 4, 2015 00:38
-
-
Save danielballan/61fd3448f50027423a54 to your computer and use it in GitHub Desktop.
Ophyd with metaclasses
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example: | |
In [35]: import metaclass_demo as mcd | |
In [36]: t = mcd.Thing('asdf', ['val']) | |
In [39]: t.signal_names | |
Out[39]: ['stop', 'val'] | |
m | |
t.base_name t.read t.signal_names t.trigger | |
t.describe t.read_fields t.stop t.val | |
In [38]: t.val. | |
t.val.connected t.val.put t.val.readback_pv | |
t.val.get t.val.pv | |
In [40]: t.val.pv | |
Out[40]: <PV 'asdf.VAL': not connected> | |
In [38]: t.describe() | |
Out[38]: {'val': {'source': 'asdf.VAL'}} | |
""" | |
import functools | |
import sys | |
import re | |
from inspect import Parameter, Signature | |
import epics | |
from ophyd.utils.epics_pvs import raise_if_disconnected | |
from ophyd.utils.errors import DisconnectedError | |
def raise_if_disconnected(fcn): | |
'''Decorator to catch attempted access to disconnected EPICS channels.''' | |
# differs from implementation in utils.py because it gives pvname, not | |
# object name | |
@functools.wraps(fcn) | |
def wrapper(self, *args, **kwargs): | |
if self.connected: | |
return fcn(self, *args, **kwargs) | |
else: | |
raise DisconnectedError('{} is not connected'.format( | |
self.pv.pvname)) | |
return wrapper | |
# We could use real EpicsSignals, but to avoid confusing myself I made | |
# dead simple ones. | |
class EpicsSignalLite: | |
"""A simplified EpicsSignal for demo purposes only | |
This is a wrapper around a single, read-only epics.PV. | |
""" | |
def __init__(self, pv_name, *args, **kwargs): | |
self.pv = epics.PV(pv_name, *args, **kwargs) | |
@property | |
def connected(self): | |
return self.pv.connected | |
@raise_if_disconnected | |
def get(self, *args, **kwargs): | |
# pass through | |
return self.pv.get(*args, **kwargs) | |
class EpicsSignalLiteRW(EpicsSignalLite): | |
"""A simplified EpicsSignal for demo purposes only | |
This is a wrapper around a writeable PV and its readback value. | |
""" | |
def __init__(self, pv_name, *args, **kwargs): | |
super().__init__(pv_name, *args, **kwargs) | |
self.readback_pv = epics.PV('{}.RBV'.format(pv_name)) | |
@property | |
def connected(self): | |
return self.pv.connected and self.readback_pv.connected | |
@raise_if_disconnected | |
def get(self, *args, **kwargs): | |
# pass through to the readback PV | |
return self.readback_pv.get(*args, **kwargs) | |
@raise_if_disconnected | |
def put(self, *args, **kwargs): | |
# pass through to the writeable PV -- potentially could check limits | |
# TODO: Should this return a StatusObj? | |
return self.pv.put(*args, **kwargs) | |
# There are two descriptors, Signal and SignalRW. | |
class Signal: | |
"A descriptor representing a single read-only PV" | |
_class = EpicsSignalLite # type of object returned by __get__ | |
def __init__(self, pv_template, *args, **kwargs): | |
if '{base_name}' not in pv_template: | |
raise ValueError("pv_template must contain '{base_name}'") | |
self.pv_template = pv_template | |
self.args = args | |
self.kwargs = kwargs | |
def __get__(self, instance, owner): | |
if instance is None: | |
return | |
pv_name = self.pv_template.format(base_name=instance.base_name) | |
attr_name = instance._templates[self.pv_template] | |
if pv_name not in instance._signals: | |
instance._signals[attr_name] = self._class(pv_name, *self.args, | |
**self.kwargs) | |
return instance._signals[attr_name] | |
class SignalRW(Signal): | |
"A descriptor representing a writeable PV with a readback value." | |
_class = EpicsSignalLiteRW | |
# Now for the dreaded metaclasses. All classes that use Signal descriptors should | |
# use this SignalMeta metaclass. | |
class SignalMeta(type): | |
""" | |
Creates attributes for Signals by inspecting class definition | |
The main benefit of the metaclass is that you don't need to write: | |
foo = Signal('..pv..', name='foo') | |
We can get the name from inspecting the class, so this is sufficient: | |
foo = Signal('..pv..') | |
""" | |
def __new__(cls, name, bases, clsdict): | |
clsobj = super().__new__(cls, name, bases, clsdict) | |
# maps Signal attribute names to Signal classes | |
sig_dict = {k: v for k, v in clsdict.items() if isinstance(v, Signal)} | |
# maps pv templates to attribute names; this is used by Signal to | |
# ascertain the attribute name of each EpicsSignal it creates | |
clsobj._templates = {v.pv_template: k for k, v in sig_dict.items()} | |
# List Signal attribute names. | |
clsobj.signal_names = list(sig_dict.keys()) | |
# Store EpicsSignal objects (only created once they are accessed) | |
clsobj._signals = {} | |
# BONUS (overkill?): Generate a default signature for the set method. | |
writable_signals = [k for k, v in sig_dict.items() | |
if isinstance(v, SignalRW)] | |
signature = Signature([Parameter(name, Parameter.POSITIONAL_OR_KEYWORD) | |
for name in writable_signals]) | |
clsobj._default_sig_for_set = signature | |
return clsobj | |
# The base class uses the 'signal_names' attribute (created by SignalMeta) | |
# to define the bluesky interface automatically. | |
class Base(metaclass=SignalMeta): | |
""" | |
Base class for hardware objects | |
This class provides attribute access to one or more Signals, which can be | |
a mixture of read-only and writable. All must share the same base_name. | |
""" | |
def __init__(self, base_name, read_fields=None): | |
self.base_name = base_name | |
if read_fields is None: | |
read_fields = self.signal_names | |
self.read_fields = read_fields | |
def read(self): | |
# map names ("data keys") to actual values | |
return {name: getattr(self, name).get() for name in self.read_fields} | |
def describe(self): | |
return {name: {'source': getattr(self, name).pv.pvname} | |
for name in self.read_fields} | |
def stop(self): | |
"to be defined by subclass" | |
pass | |
def trigger(self): | |
"to be defined by subclass" | |
pass | |
# Now, finally, the punchline. I imagine that a hardware object class | |
# could look something like this. Many will require additional special | |
# methods, but their basic interaction with the PVs will be spelled out | |
# plainly and succinctly at the top. | |
class Thing(Base): | |
"Example of a hardware object" | |
val = SignalRW('{base_name}.VAL') | |
stop = SignalRW('{base_name}.STOP') | |
class SettableBase(Base): | |
""" | |
THIS IS EXTRA CREDIT. IT MAY NOT BE WORTH THE COMPLEXITY.... | |
Base class for hardware objects that can be set | |
The entire purpose of this base class is to provide an auto-generated set | |
method with a signature that provides one argument per writable signal. | |
If you want a different signature, you | |
need to write you own set, and this subclass adds no utility. | |
""" | |
def set(self, *args, **kwargs): | |
bound = self._default_sig_for_set.bind(*args, **kwargs) | |
status_objs = [] | |
for name, val in bound.arguments.items(): | |
sig = getattr(self, name) | |
# TODO : Will sig.put return a status object? | |
status_objs.append(sig.put(val)) | |
return status_objs |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment