Skip to content

Instantly share code, notes, and snippets.

@StuartLittlefair
Created May 8, 2017 12:01
Show Gist options
  • Save StuartLittlefair/81b27f5cfb7d257d1a9394368edf6b68 to your computer and use it in GitHub Desktop.
Save StuartLittlefair/81b27f5cfb7d257d1a9394368edf6b68 to your computer and use it in GitHub Desktop.
Autoscheduling for ultracam runs
import numpy as np
from trm import observing
from astropy.coordinates import SkyCoord, EarthLocation
from astropy import units as u
from astropy.time import Time
from astroplan import Observer, ObservingBlock, FixedTarget
from astroplan.scheduling import (Schedule, Transitioner, TransitionBlock,
PriorityScheduler, SequentialScheduler)
from astroplan.constraints import (AirmassConstraint, AtNightConstraint, Constraint,
MoonSeparationConstraint, MoonIlluminationConstraint)
# make a phase constraint
class PhaseConstraint(Constraint):
def __init__(self, t0, P, kind='barycentric',
unit='mjd',
min=None, max=None,
boolean_constraint=True):
self.min = min if min is not None else 0.0
self.max = max if max is not None else 1.0
# recast on range 0 to 1
self.min -= np.floor(self.min).astype(int)
self.max -= np.floor(self.max).astype(int)
self.kind = kind
self.unit = unit
self.t0 = t0
self.P = P
self.boolean_constraint = boolean_constraint
def compute_constraint(self, times, observer, targets):
if self.kind not in ['barycentric', 'heliocentric', 'utc']:
raise ValueError('Unrecognised timescale')
if self.kind == 'barycentric':
time_in_right_scale = times.tdb + times.light_travel_time(
targets, kind='barycentric', location=observer.location
)
elif self.kind == 'heliocentric':
time_in_right_scale = times + times.light_travel_time(
targets, kind='heliocentric', location=observer.location
)
else:
time_in_right_scale = times
values = (time_in_right_scale.jd if self.unit == 'jd' else
time_in_right_scale.mjd)
phase = (values - self.t0)/self.P
frac_phase = phase - np.floor(phase).astype(int)
mask = np.where(self.max > self.min,
(frac_phase > self.min) & (frac_phase < self.max),
(frac_phase > self.min) | (frac_phase < self.max))
return mask
def read_trm_files(target_file, info_file, range_file):
peinfo = {}
count = 0
name = None
with open(target_file) as tf:
for line in tf:
if not line.startswith('#') and not line.isspace():
count += 1
if count == 1:
name = line.strip()
elif count == 2:
rastr, decstr = line.split()
elif count == 3:
try:
eph = observing.Ephemeris(line)
peinfo[name] = {'ra': rastr, 'dec': decstr,
'ephemeris': eph}
except:
peinfo[name] = {'ra': rastr, 'dec': decstr,
'ephemeris': None}
finally:
count = 0
count = 0
with open(info_file) as inf:
for line in inf:
if not line.startswith('#') and not line.isspace():
count += 1
if count == 1:
name = line.strip()
elif count == 2:
fields = line.split()
priority, lf, dur, nvisits = (
int(fields[0])+1, float(fields[1]),
float(fields[2]), int(fields[3])
)
peinfo[name].update(
{'moon': lf, 'duration': dur, 'visits': nvisits, 'pri': priority}
)
count = 0
prinfo = {}
count = 0
with open(range_file) as rf:
for line in rf:
if line.startswith('#') or line.isspace():
if count:
if name in peinfo:
prinfo[name] = pr
else:
print(name, 'not found in position/ephemeris file and will be skipped.')
count = 0
else:
count += 1
if count == 1:
name = line.strip()
pr = observing.Prange(name)
elif count > 1:
pr.add(line)
return peinfo, prinfo
def create_observing_blocks(target_file, info_file, range_file):
peinfo, prinfo = read_trm_files(target_file, info_file, range_file)
blocks = []
for name in peinfo:
if 'duration' not in peinfo[name]:
print('{} not found in info file and will be skipped'.format(name))
continue
try:
coo = SkyCoord(
peinfo[name]['ra'],
peinfo[name]['dec'],
unit=(u.hour, u.deg)
)
target = FixedTarget(coo, name=name)
# add moon constraint
mic = MoonIlluminationConstraint(max=peinfo[name]['moon'])
# if there's an ephemeris and no specified, duration use that
if ('ephemeris' in peinfo[name] and peinfo[name]['ephemeris'] is not None and
peinfo[name]['duration'] == 0.0):
# add OBs for the each phase constraint
for rng in prinfo[name].prange:
# rng = prinfo[name].prange[0]
minval, maxval, _, _, _ = rng
eph = peinfo[name]['ephemeris']
t0, p = eph.coeff
assert eph.time in ['BMJD', 'HMJD', 'HJD',
'BJD']
if eph.time == 'BMJD':
kind = 'barycentric'
unit = 'mjd'
elif eph.time == 'HMJD':
kind = 'heliocentric'
unit = 'mjd'
elif eph.time == 'HJD':
kind = 'heliocentric'
unit = 'jd'
else:
kind = 'barycentric'
unit = 'jd'
pc = PhaseConstraint(t0, p,
min=minval,
max=maxval,
kind=kind,
unit=unit)
# phase constraint defines the allowed
# observable phases. if this is equal to the duration
# we have numerical precision issues and the block
# is probably never scheduled. Here we shorten
# the duration by a minute to allow scheduling
block = ObservingBlock(
target,
p*(maxval-minval)*u.d - 1*u.minute,
peinfo[name]['pri'],
{'acquisition': name},
[mic, pc]
)
blocks.extend([block]*peinfo[name]['visits'])
else:
# add OBs with no phase constraint
# add moon constraint
block = ObservingBlock(
target,
peinfo[name]['duration']*u.min,
peinfo[name]['pri'],
{'acquisition': name},
[mic])
blocks.extend([block]*peinfo[name]['visits'])
except Exception as err:
print(name)
print(peinfo[name])
print(prinfo)
print(str(err))
raise
return blocks
def write_switch_file(schedule):
obs_blocks = [b for b in schedule.scheduled_blocks if isinstance(b, ObservingBlock)]
transitions = [b for b in schedule.scheduled_blocks if isinstance(b, TransitionBlock)]
switch_times = [transition.start_time for transition in transitions]
switch_times.insert(0, obs_blocks[0].start_time)
switch_times.append(obs_blocks[-1].end_time)
names = [b.target.name for b in obs_blocks]
names.append('None')
durations = [transition.duration.to(u.min).value for transition in transitions]
durations.insert(0, 0)
durations.append(10)
for n, t, d in zip(names, switch_times, durations):
tstring = t.iso.split()[1]
tstring_fields = tstring.split(':')
tstring = ':'.join(tstring_fields[:-1])
print('{} | {} | {}'.format(
n, tstring, int(d)
))
if __name__ == "__main__":
import argparse
import datetime as dt
TNT = EarthLocation(lat=18.574, lon=98.482, height=2449.)
WHT = EarthLocation(lat=28.76063889, lon=-17.88163889, height=2332.)
VLT = EarthLocation(lat=-24.6250000, lon=-70.40275000, height=2635.)
NTT = EarthLocation(lat=-29.2589167, lon=-70.73375000, height=2375.)
scopes = dict(TNT=TNT, WHT=WHT, VLT=VLT, NTT=NTT)
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
# positional
parser.add_argument(
'stardata',
help='general file containing positions and ephemerides. It should have the same format of my C++ ephemeris file.')
parser.add_argument('telescope', help='Telescope name, e.g. WHT, VLT')
parser.add_argument('info',
help='general info on stars (priority, lunar limits, etc')
parser.add_argument(
'pranges',
help='file of stars to include and phase ranges of interest. Each entry should start with the name of the ' +
'star on its own on a line, and then follow it with the phase ranges in the form:\np1 p2 col lw\nwhere ' +
'p1 p2 is the range of phase to indicate, col is the pgplot colour index (2 = red, etc), lw is the ' +
'pgplot line width. It is OK to have p1 > p2; the programme will map it into a positive range.')
parser.add_argument('date', help='date at start of night, e.g. "01 Aug 2012"')
parser.add_argument('-s', '--scheduler', default='priority',
help='Scheduler to use. Options are sequential or priority. ' +
'Sequential scheduling runs through the night, scheduling the best OB at any time. ' +
'Priority scheduling schedules the OBs in priority order')
args = parser.parse_args()
if args.scheduler not in ['priority', 'sequential']:
raise ValueError('Unrecognised scheduler')
# get noon before and after
midnight_on_night_starting = Time(dt.datetime.strptime(args.date, "%d %b %Y"))
noon_before = midnight_on_night_starting + 12*u.hour
noon_after = midnight_on_night_starting + 36*u.hour
global_constraints = [AirmassConstraint(max=2.2, boolean_constraint=False),
AtNightConstraint.twilight_nautical(),
MoonSeparationConstraint(30*u.deg)]
# create observing blocks for each target (xN if multiple visits OK)
blocks = create_observing_blocks(args.stardata, args.info, args.pranges)
# create a transitioner
slew_rate = 1.0*u.deg/u.second
transition_components = dict(
acquisition=dict(default=10*u.minute),
filter=dict(default=10*u.minute)
)
transitioner = Transitioner(slew_rate, transition_components)
observer = Observer(location=scopes[args.telescope])
pscheduler = PriorityScheduler(constraints=global_constraints,
observer=observer,
transitioner=transitioner)
sscheduler = SequentialScheduler(constraints=global_constraints,
observer=observer,
transitioner=transitioner)
schedule = Schedule(noon_before, noon_after)
if args.scheduler == 'priority':
pscheduler(blocks, schedule)
elif args.scheduler == 'sequential':
sscheduler(blocks, schedule)
print('')
write_switch_file(schedule)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment