-
-
Save richardingham/0a58a291bad2e3b9009f to your computer and use it in GitHub Desktop.
Control sequence for single step hydrolysis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from octopus.transport.basic import serial, tcp | |
from octopus.manufacturer import vapourtec, knauer, thalesnano, vici, mt | |
from octopus.data.manipulation import Max, Min | |
from octopus.sequence.control import StateMonitor | |
from octopus.sequence.util import Looping, Dependent | |
from octopus.image import source, tracker | |
from octopus.runtime import * | |
title("Zirconia Hydrolysis") | |
id("zirconia-hydrolysis") | |
r = vapourtec.R2R4(tcp("192.168.15.156", 9002)) | |
ir = mt.ICIR(tcp("192.168.15.3", 8124), stream_names = ["product"]) | |
# | |
# Experiment Parameters | |
# | |
ir_start_trigger = variable(100, "start", "start") | |
ir_low_trigger = variable(80, "low", "low") | |
vapourtec_flow_rate = 200 | |
column_dead_volume = 5000 | |
sms_phone_no = "447941123456" | |
# | |
# Logic for SMS notification | |
# | |
smsnotifier = sms.ClockworkSMS("enter-api-key-here") | |
def sms_notify (msg): | |
msg = "2 step hydrog experiment: %s" % msg | |
smsnotifier.notify(sms_phone_no, msg) | |
# | |
# Logic to discard output when parameters are not within acceptable levels. | |
# | |
# StateFlip is a monitor which observes a set of expressions. If one of these | |
# tests fails, then fail_fn is called. When all of the expressions pass again, | |
# pass_fn is called. This sequence runs in a loop. | |
class StateFlip (Looping, Dependent): | |
interval = 0.5 | |
def __init__ (self, pass_fn = None, fail_fn = None): | |
Dependent.__init__(self) | |
Looping.__init__(self) | |
# Depending on the implementation of Python, | |
# __builtins__ can be a module or its dict. | |
try: | |
b_set = __builtins__.set | |
except AttributeError: | |
b_set = __builtins__["set"] | |
self.tests = b_set() | |
self.pass_fn = pass_fn | |
self.fail_fn = fail_fn | |
self._step = None | |
self._ok = False | |
def add (self, test): | |
self.tests.add(test) | |
def remove (self, test): | |
self.tests.discard(test) | |
def _iterate (self): | |
step = None | |
if (self._ok) and (not all(self.tests)): | |
try: | |
step = self.fail_fn() | |
except TypeError: | |
pass | |
self._ok = False | |
elif (not self._ok) and all(self.tests): | |
try: | |
step = self.pass_fn() | |
except TypeError: | |
pass | |
self._ok = True | |
if step is not None: | |
if self._step is not None: | |
try: | |
self._step.cancel() | |
self._step.log -= self.log | |
except: | |
pass | |
step.log += self.log | |
step.reset() | |
step.run() | |
self._step = step | |
def _cancel (self): | |
Looping._cancel(self) | |
try: | |
self._step.log -= self.log | |
return d.append(self._step.cancel()) | |
except: | |
pass | |
# | |
# State Monitor Definition | |
# | |
collect_monitor = StateFlip( | |
pass_fn = sequence( | |
set(r.output, "collect"), | |
log("Vapourtec parameters OK"), | |
), | |
fail_fn = sequence( | |
set(r.output, "waste"), | |
log("Vapourtec parameters not OK"), | |
) | |
) | |
# Collection should resume as soon as IR response is OK. | |
collect_monitor.add(ir.product > ir_low_trigger) | |
# Temperature and pressure must have been OK for one column volume for collection to | |
# be allowed. | |
collect_monitor.add(Min(r.heater3.temp, column_dead_volume / vapourtec_flow_rate * 60) > 95) | |
collect_monitor.add(Max(r.pump2.airlock, column_dead_volume / vapourtec_flow_rate * 60) < 10000) | |
# The Vapourtec R2 publishes an "airlock" parameter which goes very high | |
# if there is a drop in pressure - usually this indicates an air bubble | |
# in the pump. | |
vapourtec_monitor = StateMonitor() | |
vapourtec_monitor.add(r.pump2.airlock < 10000) | |
vapourtec_monitor.step = sequence( | |
log("Airlock problem in R2"), | |
call(sms_notify, "Airlock problem in R2"), | |
wait("5m") # Send SMS max once every 5 min! | |
) | |
# | |
# Experiment control sequence | |
# | |
long_wait = wait("8h") | |
monitored_sequence = sequence( | |
log("Injecting reagent"), | |
set(r.pump2.input, "reagent"), | |
long_wait, | |
log("Stop injecting reagent"), | |
set(r.pump2.input, "solvent") | |
) | |
monitored_sequence.dependents.add(vapourtec_monitor) | |
monitored_sequence.dependents.add(collect_monitor) | |
rxn = sequence( | |
log("Starting up..."), | |
set(r.pressure_limit, 25000), | |
set(r.heater3.target, 100), | |
set(r.pump2.target, vapourtec_flow_rate), | |
set(r.power, "on"), | |
wait_until(r.heater3.temp > 95), | |
wait_until(ir.product > ir_start_trigger), | |
set(r.output, "collect"), | |
monitored_sequence, | |
wait_until(ir.product < ir_low_trigger), | |
wait("20m"), | |
set(r.power, "off"), | |
log("Complete") | |
) | |
run(rxn) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment