Skip to content

Instantly share code, notes, and snippets.

@CJ-Wright
Forked from chiahaoliu/customized_per_step.py
Last active July 16, 2018 20:36
Show Gist options
  • Save CJ-Wright/458c9455b930cde9acdb72120ae14538 to your computer and use it in GitHub Desktop.
Save CJ-Wright/458c9455b930cde9acdb72120ae14538 to your computer and use it in GitHub Desktop.
customized per_step to collect light and dark frames at each motor point
# ACQ SIDE
# ======================================================
import bluesky.plans as bp
import bluesky.plan_stubs as bps
from bluesky.utils import Msg, short_uid as _short_uid
from bluesky.preprocessors import subs_wrapper
from bluesky.simulators import summarize_plan
from bluesky.callbacks import LiveTable
from xpdacq.xpdacq_conf import xpd_configuration
from xpdacq.xpdacq import glbl, CustomizedRunEngine
from xpdacq.beamtime import _configure_area_det
xrun = CustomizedRunEngine()
save_kwargs = {}
shutter = xpd_configuration['shutter']
sh_open = glbl['shutter_conf']['open']
sh_close = glbl['shutter_conf']['close']
sample_motor = spinner_goniohead.X
sub_sample_dict = {
-4.0: {
'name': 'sample1',
'exposure': .1,
'wait': 5},
-6.0: {
'name': 'sample2',
'exposure': .1,
'wait': 5},
-8.0: {'name': 'sample3',
'exposure': .1,
'wait': 5}
}
bt.samples['Trevor_July_1']['sub_samples'] = sub_sample_dict
heater_dict = {(4, 30): 1, (30, 80): 2, (80, 600): 3}
# vendor blusky nd_per_step
def light_dark_nd_step(detectors, step, pos_cache):
"""
Inner loop of an N-dimensional step scan
This is the default function for ``per_step`` param`` in ND plans.
Parameters
----------
detectors : iterable
devices to read
step : dict
mapping motors to positions in this step
pos_cache : dict
mapping motors to their last-set positions
"""
def move():
yield Msg('checkpoint')
grp = _short_uid('set')
for motor, pos in step.items():
if pos == pos_cache[motor]:
# This step does not move this motor.
continue
if motor == cryostat1:
for (low, hi), heater_pos in heater_dict.items():
if low < pos <= hi:
yield Msg('set', cryostat1.heater, heater_pos, group=grp)
yield Msg('set', motor, pos, group=grp)
pos_cache[motor] = pos
yield Msg('wait', None, group=grp)
motors = step.keys()
yield from bps.abs_set(shutter, sh_close, wait=True)
yield from move()
yield from bps.trigger_and_read(list(detectors) + list(motors) +
[shutter], name='dark')
yield from bps.abs_set(shutter, sh_open, wait=True)
yield from bps.trigger_and_read(list(detectors) + list(motors) +
[shutter])
yield from bps.abs_set(shutter, sh_close, wait=True)
inner_dict = sub_sample_dict.get(sample_motor.get().user_readback,
{'exposure': .1, 'wait': 1})
_configure_area_det(inner_dict['exposure'])
yield from bps.sleep(inner_dict['wait'])
plan = bp.grid_scan([pe1c, shutter, Det_1_Z, cryostat1, sample_motor],
cryostat, 5, 500, 499,
Det_1_Z, 1227, 1727, 2, True,
sample_motor, -4, -8, 3, True,
per_step=light_dark_nd_step)
xrun(2, plan, saxs_waxs_conf={1227.0: 'PDF',
1727.0: 'XRD'},
subs=[LiveTable([pe1c, shutter, Det_1_Z, cryostat1, sample_motor])])
# AN SIDE
# =================================================================================
save_kwargs['string'] = (''
'{base_folder}/{folder_prefix}/'
'{analysis_stage}/'
'{human_timestamp}_'
'[temp_{raw_event[data][cryostat_T]:1.2f}'
'{raw_descriptor[data_keys][cryostat_T][units]}]_'
'[dx_{raw_event[data][diff_x]:1.3f}'
'{raw_descriptor[data_keys][diff_x][units]}]_'
'[dy_{raw_event[data][diff_y]:1.3f}'
'{raw_descriptor[data_keys][diff_y][units]}]_'
'[z_{raw_event[data][Det_1_Z]:1.3f}'
'{raw_descriptor[data_keys][Det_1_Z][units]}]_'
'[gx_{raw_event[data][spinner_goniohead_X]:1.3f}'
'{raw_descriptor[data_keys][spinner_goniohead_X][units]}]_'
'{raw_start[uid]:.6}_'
'{raw_event[seq_num]:04d}{ext}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment