-
-
Save richardingham/f2117b9dc7504d6e1942 to your computer and use it in GitHub Desktop.
Control sequence for two-step experiments.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from octopus.transport.basic import serial, tcp | |
from octopus.manufacturer import vapourtec, knauer, thalesnano, vici, mt | |
from octopus.data.manipulation import Max, Min | |
from octopus.sequence.control import StateMonitor | |
from octopus.sequence.util import Looping, Dependent | |
from octopus.image import source, tracker | |
from octopus.runtime import * | |
r = vapourtec.R2R4(tcp("192.168.15.156", 9002)) | |
h = thalesnano.HCube(tcp("192.168.15.156", 9004)) | |
v = vici.MultiValve(tcp("192.168.15.153", 9006)) | |
k = knauer.K120(tcp("192.168.15.156", 9002)) | |
t = tracker.SingleBlobTracker(source.cv_webcam(0)) | |
ir = mt.ICIR(tcp("192.168.15.242", 8124), stream_names = ["product"]) | |
title("Two-step Hydration and Hydrogenation Experiments") | |
id("hydrog-twostep-reservoir") | |
# | |
# Experiment Parameters | |
# | |
ir_start_trigger = 200 | |
ir_low_trigger = 170 | |
min_height_trigger = 70 | |
start_height_trigger = 100 | |
max_height_trigger = 360 | |
knauer_flow_rate = 100 | |
vapourtec_flow_rate = 100 | |
column_dead_volume = 3000 | |
hcube_dead_volume = 3000 | |
connecting_tube_vol = 1000 | |
sms_phone_no = "447941123456" | |
# | |
# Logic for SMS notification | |
# | |
smsnotifier = sms.ClockworkSMS("enter-api-key-here") | |
def sms_notify (msg): | |
msg = "2 step hydrog experiment: %s" % msg | |
smsnotifier.notify(sms_phone_no, msg) | |
# | |
# Define parameters to be displayed on remote monitoring interface. | |
# | |
ui( | |
traces = [{ | |
"title": "Vapourtec Pressures", | |
"unit": "", | |
"traces": [r.pressure, r.pump2.pressure], | |
"maxtime": 30 * 60 | |
}, { | |
"title": "H-Cube Pressures", | |
"unit": "", | |
"traces": [h.inlet_pressure, h.system_pressure], | |
"maxtime": 30 * 60 | |
}, { | |
"title": "IR output", | |
"unit": "mAU", | |
"traces": [ir.product], | |
"maxtime": 30 * 60 | |
}], | |
properties = [ | |
r.power, | |
r.pump2.target, | |
r.heater3.temp, | |
r.output, | |
t.height, | |
t.image, | |
k.rate, | |
h.hydrogen_mode, | |
h.system_pressure_target, | |
h.system_pressure, | |
h.column_temperature_target, | |
h.column_temperature, | |
h.state, | |
h.message, | |
v.position | |
] | |
) | |
# | |
# Logic to discard output when parameters are not within acceptable levels. | |
# | |
class StateFlip (Looping, Dependent): | |
interval = 0.5 | |
def __init__ (self, pass_fn = None, fail_fn = None): | |
Looping.__init__(self) | |
# Depending on the implementation of Python, | |
# __builtins__ can be a module or its dict. | |
try: | |
b_set = __builtins__.set | |
except AttributeError: | |
b_set = __builtins__["set"] | |
self.tests = b_set() | |
self.pass_fn = pass_fn | |
self.fail_fn = fail_fn | |
self._steps = [] | |
self._triggered = False | |
def add (self, test): | |
self.tests.add(test) | |
def remove (self, test): | |
self.tests.discard(test) | |
def _iterate (self): | |
if (not self._triggered) and (not all(self.tests)): | |
try: | |
step = self.fail_fn() | |
except TypeError: | |
pass | |
step.reset() | |
step.run() | |
self._steps.append(step) | |
self._triggered = True | |
elif (not self._triggered) and (not all(self.tests)): | |
try: | |
step = self.pass_fn() | |
except TypeError: | |
pass | |
step.reset() | |
step.run() | |
self._steps.append(step) | |
self._triggered = False | |
def _cancel (self): | |
from twisted.internet import defer | |
Looping._cancel(self) | |
d = [] | |
for step in self._steps: | |
try: | |
d.append(step.cancel()) | |
except: | |
pass | |
return defer.getherResults(d) | |
# | |
# State Monitor Definition | |
# | |
collect_monitor = StateFlip( | |
pass_fn = sequence( | |
set(r.output, "collect"), | |
log("Vapourtec parameters OK"), | |
), | |
fail_fn = sequence( | |
set(r.output, "waste"), | |
log("Vapourtec parameters not OK"), | |
) | |
) | |
# Collection should resume as soon as IR response is OK. | |
collect_monitor.add(ir.product > ir_low_trigger) | |
# Temperature and pressure must have been OK for one column volume for collection to | |
# be allowed. | |
collect_monitor.add(Min(r.heater3.temp, column_dead_volume / vapourtec_flow_rate * 60) > 95) | |
collect_monitor.add(Max(r.pump2.airlock, column_dead_volume / vapourtec_flow_rate * 60) < 10000) | |
# The Vapourtec R2 publishes an "airlock" parameter which goes very high | |
# if there is a drop in pressure - usually this indicates an air bubble | |
# in the pump. | |
vapourtec_monitor = StateMonitor() | |
vapourtec_monitor.add(r.pump2.airlock < 10000) | |
vapourtec_monitor.step = sequence( | |
log("Airlock problem in R2"), | |
call(sms_notify, "Airlock problem in R2"), | |
wait("5m") | |
) | |
hcube_monitor = StateMonitor() | |
hcube_monitor.add(h.system_pressure >= 4) | |
hcube_monitor.step = sequence( | |
log("Pressure loss at H-Cube"), | |
call(sms_notify, "Pressure loss at H-Cube"), | |
wait("5m") | |
) | |
# | |
# Generate the control sequence for a hydrogenation reaction with | |
# a particular set of conditions. | |
# | |
_current_state = [25, None, 200] # temp, pressure, rate | |
def reaction (name, pressure, temp, rate, valve_pos): | |
global _current_state | |
old_state, _current_state = _current_state, [temp, pressure, rate] | |
# Determine which parameters need to change | |
change_t = (old_state[0] != temp) | |
change_p = (old_state[1] != pressure) | |
change_r = (old_state[2] != rate) | |
# A pressure change requires the H-Cube to be switched off and on again. | |
if change_p: | |
# Define a sequence to carry out the pressure change | |
if pressure == "full": | |
change_h2 = sequence( | |
set(h.system_pressure_target, 0), | |
set(h.hydrogen_mode, "full"), | |
) | |
pressure_wait = (h.state == "running") | |
else: | |
change_h2 = sequence( | |
set(h.hydrogen_mode, "controlled"), | |
set(h.system_pressure_target, pressure), | |
) | |
pressure_wait = (h.system_pressure >= pressure) | |
change_step = sequence( | |
log("Switch off hydrogen to change pressure"), | |
call(h.stop_keep_hydrogen), | |
log("Change pressure to %s ..." % pressure), | |
change_h2, | |
log("Change temp to %s" % temp), | |
set(h.column_temperature_target, temp), | |
log("Change rate to %s" % rate), | |
set(k.target, rate), | |
log("Starting H-Cube"), | |
parallel( | |
wait(((hcube_dead_volume / rate) + 1) * 60), | |
sequence( | |
call(h.start_hydrogenation), | |
do_if(h.state != "running", | |
log("Please check that the H-cube is running OK") | |
), | |
wait_until(pressure_wait), | |
), | |
), | |
) | |
else: | |
# Changing rate is easy - just wait one column volume | |
if change_r: | |
a = sequence( | |
log("Change rate to %s" % rate), | |
set(k.target, rate), | |
log("Wait one dead volume"), | |
wait(((hcube_dead_volume / rate) + 1) * 60), | |
) | |
else: | |
a = log("Rate not changed") | |
# Alter temperature - need to wait until temperature has | |
# changed before waiting a column volume. | |
if change_t: | |
if temp > old_state[0]: | |
# Wait to heat up | |
temp_wait = h.column_temperature >= temp | |
else: | |
# Wait to cool down | |
temp_wait = h.column_temperature < temp + 5 | |
b = sequence( | |
log("Change temp to %s" % temp), | |
set(h.column_temperature_target, temp), | |
log("Waiting for temp. stability"), | |
wait_until(temp_wait), | |
) | |
else: | |
b = log("Temp not changed") | |
# These two can be carried out in parallel | |
# to minimise waiting. | |
change_step = parallel(a, b) | |
return sequence( | |
log("Checking volume of reservoir before next run"), | |
do_if(t.height < start_height_trigger, sequence( | |
log("Stopping H-cube until reservoir refills"), | |
call(h.stop_keep_hydrogen), | |
call(hcube_monitor.pause), # Stopping pump will trigger error. | |
set(k.power, "off"), | |
wait_until(t.height > start_height_trigger), | |
log("Restarting"), | |
set(k.power, "on"), | |
wait("30s"), | |
call(hcube_monitor.resume), | |
)), | |
log_output(name), | |
log("Preparing reaction #%s" % name), | |
change_step, | |
log("Starting reaction #%s" % name), | |
wait(((dead_volume / rate) + 1) * 60), | |
log("Taking a sample now"), | |
set(v.position, valve_pos), | |
wait("10m"), | |
log("Collection complete"), | |
# Move valve back to waste | |
set(v.position, 1), | |
) | |
# | |
# Experiment control sequence | |
# | |
# A wait timer that will be used in the sequence | |
# Defining it here as a variable allows it to be | |
# cancelled from a different part of the sequence. | |
wait_for_reagent_to_be_used_up = wait("12h") | |
setup = sequence( | |
log("Starting up..."), | |
set(r.pressure_limit, 25000), | |
set(r.pump2.target, 200), | |
set(r.heater3.target, 100), | |
set(r.power, "on"), | |
wait_until(r.heater3.temp > 95), | |
) | |
# Main reaction sequence | |
rxn = parallel( | |
sequence( | |
# Waiting for conversion to intermediate | |
wait_until(ir.product > ir_start_trigger), | |
log("Collecting intermediate"), | |
set(r.output, "collect"), | |
call(rxn.dependents.add, collect_monitor), | |
# Flush vapourtec reactor | |
wait_until(r.pump2.input == "solvent"), | |
wait((column_dead_volume / vapourtec_flow_rate * 60) + (connecting_tube_vol / vapourtec_flow_rate * 60)), | |
cancel(collect_monitor), | |
cancel(vapourtec_monitor), | |
set(r.output, "waste"), | |
set(r.heater3.target, 25), | |
wait("20m"), | |
set(r.power, "off"), | |
), | |
sequence( | |
log("Injecting reagent"), | |
set(r.pump2.input, "reagent"), | |
wait_for_reagent_to_be_used_up, | |
log("Stop injecting reagent"), | |
set(r.pump2.input, "solvent"), | |
), | |
sequence( | |
# Wait for reservoir to fill up | |
wait_until(t.height > start_height_trigger), | |
set(v.position, 1) | |
log("Starting knauer pump"), | |
set(k.rate, 100), | |
set(k.power, "on"), | |
wait("30s"), | |
call(rxn.dependents.add, hcube_monitor), | |
## First Run: | |
reaction(1 , "full", 100, 100, 2), | |
reaction(2 , 20 , 100, 100, 3), | |
reaction(3 , 20 , 40, 200, 4), | |
reaction(4 , "full", 40, 100, 5), | |
reaction(5 , 20 , 40, 200, 6), | |
reaction(6 , 20 , 40, 100, 7), | |
reaction(7 , 20 , 100, 200, 8), | |
reaction(8 , "full", 40, 200, 9), | |
## Second Run: | |
#reaction(9 , 20 , 40, 100, 2), | |
#reaction(10, "full", 100, 100, 3), | |
#reaction(11, "full", 40, 100, 4), | |
#reaction(12, "full", 100, 200, 5), | |
#reaction(13, "full", 100, 200, 6), | |
#reaction(14, 20 , 100, 100, 7), | |
#reaction(15, "full", 40, 200, 8), | |
#reaction(16, 20 , 100, 200, 9), | |
log("Complete, releasing hydrogen"), | |
call(h.stop_release_hydrogen), | |
cancel(wait_for_reagent_to_be_used_up), | |
log("Flushing"), | |
set(k.target, 200), | |
wait("20m"), | |
set(k.power, "off") | |
) | |
) | |
rxn.dependents.add(vapourtec_monitor) | |
run(sequence( | |
setup, | |
rxn | |
)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment