-
-
Save richardingham/31f6f8efa47771c2ed02 to your computer and use it in GitHub Desktop.
Control sequence for two-step hydrogenation with reservoir
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from octopus.transport.basic import serial, tcp | |
from octopus.manufacturer import vapourtec, knauer, thalesnano, vici, mt | |
from octopus.data.manipulation import Max, Min | |
from octopus.sequence.control import StateMonitor | |
from octopus.sequence.util import Looping, Dependent | |
from octopus.image import source, tracker | |
from octopus.notifier import sms | |
from octopus.runtime import * | |
title("Two-step Hydration and Hydrogenation") | |
id("hydrog-twostep") | |
r = vapourtec.R2R4(tcp("192.168.15.156", 9002)) | |
k = knauer.K120(serial("/dev/ttyUSB0", baudrate = 9600)) | |
h = thalesnano.HCube(serial("/dev/ttyUSB1", baudrate = 9600)) | |
v = vici.MultiValve(serial("/dev/ttyUSB2", baudrate = 9600)) | |
t = tracker.SingleBlobTracker(source.cv_webcam(0)) | |
ir = mt.ICIR(tcp("192.168.15.3", 8124), stream_names = ["product"]) | |
# | |
# Experiment Parameters | |
# | |
ir_start_trigger = 200 | |
ir_low_trigger = 170 | |
min_height_trigger = 70 | |
start_height_trigger = 100 | |
max_height_trigger = 360 | |
knauer_flow_rate = 100 | |
vapourtec_flow_rate = 100 | |
column_dead_volume = 3000 | |
hcube_dead_volume = 3000 | |
connecting_tube_vol = 1000 | |
sms_phone_no = "447941123456" | |
# | |
# Logic for SMS notification | |
# | |
smsnotifier = sms.ClockworkSMS("enter-api-key-here") | |
def sms_notify (msg): | |
msg = "2 step hydrog experiment: %s" % msg | |
smsnotifier.notify(sms_phone_no, msg) | |
# | |
# Logic to discard output when parameters are not within acceptable levels. | |
# | |
class StateFlip (Looping, Dependent): | |
interval = 0.5 | |
def __init__ (self, pass_fn = None, fail_fn = None): | |
Looping.__init__(self) | |
# Depending on the implementation of Python, | |
# __builtins__ can be a module or its dict. | |
try: | |
b_set = __builtins__.set | |
except AttributeError: | |
b_set = __builtins__["set"] | |
self.tests = b_set() | |
self.pass_fn = pass_fn | |
self.fail_fn = fail_fn | |
self._steps = [] | |
self._triggered = False | |
def add (self, test): | |
self.tests.add(test) | |
def remove (self, test): | |
self.tests.discard(test) | |
def _iterate (self): | |
if (not self._triggered) and (not all(self.tests)): | |
try: | |
step = self.fail_fn() | |
except TypeError: | |
pass | |
step.reset() | |
step.run() | |
self._steps.append(step) | |
self._triggered = True | |
elif (not self._triggered) and (not all(self.tests)): | |
try: | |
step = self.pass_fn() | |
except TypeError: | |
pass | |
step.reset() | |
step.run() | |
self._steps.append(step) | |
self._triggered = False | |
def _cancel (self): | |
from twisted.internet import defer | |
Looping._cancel(self) | |
d = [] | |
for step in self._steps: | |
try: | |
d.append(step.cancel()) | |
except: | |
pass | |
return defer.getherResults(d) | |
# | |
# State Monitor Definition | |
# | |
output_valve_to_waste = set(v.position, 1) | |
output_valve_to_collect = set(v.position, 2) | |
intermediate_valve_to_waste = set(r.output, "waste") | |
intermediate_valve_to_collect = set(r.output, "collect") | |
h_cube_monitor = StateFlip( | |
pass_fn = sequence( | |
output_valve_to_collect, | |
log("H-Cube parameters OK"), | |
), | |
fail_fn = sequence( | |
output_valve_to_waste, | |
log("H-Cube parameters not OK"), | |
) | |
) | |
h_cube_monitor.add(Min(h.column_temperature, hcube_dead_volume / knauer_flow_rate * 60) >= 95) | |
vapourtec_monitor = StateFlip( | |
pass_fn = sequence( | |
intermediate_valve_to_collect, | |
log("Vapourtec parameters OK"), | |
), | |
fail_fn = sequence( | |
intermediate_valve_to_waste, | |
log("Vapourtec parameters not OK"), | |
) | |
) | |
vapourtec_monitor.add(Min(r.heater3.temp, column_dead_volume / vapourtec_flow_rate * 60) > 95) | |
vapourtec_monitor.add(Max(r.pump2.airlock, column_dead_volume / vapourtec_flow_rate * 60) < 10) | |
vapourtec_monitor.add(ir.product > ir_low_trigger) | |
system_monitor = StateMonitor() | |
system_monitor.add(h.system_pressure >= 4) | |
system_monitor.add(r.pump2.airlock < 10) | |
system_monitor.step = sequence( | |
log("Loss of pressure in system"), | |
call(sms_notify, "Loss of pressure in system"), | |
wait("5m") | |
) | |
# | |
# Experiment control sequence | |
# | |
hydrog = sequence( | |
wait_until(r.pump2.input == "solvent"), | |
log("Washing column"), | |
cancel(vapourtec_monitor), | |
wait((column_dead_volume / vapourtec_flow_rate * 60) + (connecting_tube_vol / vapourtec_flow_rate * 60)), | |
parallel( | |
sequence( | |
set(r.output, "waste"), | |
wait("1m"), | |
set(r.power, "off") | |
), | |
sequence( | |
log("Using up remaining intermediate"), | |
wait_until(t.height < min_height_trigger), | |
cancel(h_cube_monitor), | |
log("End of hydrogenation"), | |
call(h.stop_release_hydrogen), | |
set(k.power, "off") | |
) | |
) | |
) | |
hydrog.dependents.add(h_cube_monitor) | |
hydrog.dependents.add(vapourtec_monitor) | |
hydrog.dependents.add(system_monitor) | |
rxn = sequence( | |
log("Starting up..."), | |
set(r.heater3.target, 100), | |
set(r.pump2.target, 100), | |
set(h.hydrogen_mode, "full"), | |
set(h.column_temperature_target, 100), | |
set(r.power, "on"), | |
wait_until(r.heater3.temp > 95), | |
parallel( | |
sequence( | |
log("Injecting reagent"), | |
set(r.pump2.input, "reagent"), | |
wait("2h"), | |
log("Stop injecting reagent"), | |
set(r.pump2.input, "solvent"), | |
), | |
sequence( | |
log("Waiting for conversion to intermediate"), | |
wait_until(ir.product > ir_start_trigger), | |
log("Collecting intermediate"), | |
set(r.output, "collect"), | |
wait_until(t.height > start_height_trigger), | |
log("Starting knauer pump"), | |
set(k.rate, knauer_flow_rate), | |
set(k.power, "on"), | |
wait("30s"), | |
log("Starting hydrogenation"), | |
call(h.start_hydrogenation), | |
log("Waiting for temperature"), | |
wait_until(h.column_temperature >= 99), | |
log("Waiting for stability"), | |
wait_until(h.message == "Stable."), | |
hydrog, | |
) | |
) | |
) | |
run(rxn) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment