Skip to content

Instantly share code, notes, and snippets.

@JohnGoertz
Last active January 23, 2022 23:18
Show Gist options
  • Save JohnGoertz/71f8e8a2fd5aee056983b75609a97d5a to your computer and use it in GitHub Desktop.
Save JohnGoertz/71f8e8a2fd5aee056983b75609a97d5a to your computer and use it in GitHub Desktop.
Wrappers and utility functions for working with Opentrons. Adds volume tracking for wells, enables navigation between wells through arithmetic operations and retrieval of wells in rectangular region of Labware, and fixes `new_tip='always'` behavior.
import warnings
def flatten_list(lst):
flat = [el for grp in lst for el in grp] if type(lst[0]) in [list, np.ndarray] else lst
return flat if type(flat[0]) not in [list, np.ndarray] else flatten_list(flat)
def dilute(lgCi, lgCf, Vf):
"""Determines transferred and diluent volumes for serial dilution from exp(lgCi) to each of exp(lgCf) with final volume Vf.
Parameters
----------
lgCi: float
Log10 initial concentration.
lgCf: list of float
Serial final concentrations.
Vf: float or list of float
Final volume for each well after dilution.
Returns
-------
Vt: list of float
Volumes of source material to be transferred into or between each well.
Vd: list of float
Volumes of diluent to be added to each well prior to serial dilution.
Vi: list of float
Intermediate volumes for each well after dilution prior to aspiration.
Examples
--------
>>> dilute(9, [8, 7, 6, 5], 30)
(array([3.333, 3.33 , 3.3 , 3. ]),
array([29.997, 29.97 , 29.7 , 27. ]),
array([33.33, 33.3 , 33. , 30. ]))
Make 25x stocks from 10 uM source for specified nM final concentrations
>>> source = np.log10(1e4)
>>> dilutions = [np.log10(d*25) for d in [200, 100, 50, 25, 10]]
>>> volumes = [20+12.9*2] + [20+12.9*3 for _ in range(3)] + [20+12.9*2]
>>> dilute(source, dilutions, volumes)
(array([19.9325, 22.065 , 19.93 , 15.66 , 7.12 ]),
array([19.9325, 22.065 , 19.93 , 15.66 , 10.68 ]),
array([39.865, 44.13 , 39.86 , 31.32 , 17.8 ]))
"""
n = len(lgCf)
# Final volume for each well
Vf = np.full_like(lgCf, Vf) if not (isinstance(Vf, list) or isinstance(Vf, np.ndarray)) else Vf
# Final concentration for each well
lgCf = np.array(lgCf, dtype=float)
# Concentration of "preceding" well
pre = np.hstack([lgCi, lgCf[:-1]])
# Dilution factor for each well
Vf_Vi = 10 ** (lgCf - pre)
# Transfer, diluent, intermediate volumes
Vt, Vd, Vi = [np.zeros_like(lgCf) for _ in range(3)]
Vi[-1] = Vf[-1]
# Work backwards
for i in range(n-1, -1, -1):
Vt[i] = Vi[i] * Vf_Vi[i]
Vd[i] = Vi[i] - Vt[i]
if i>0:
Vi[i-1] = Vf[i-1] + Vt[i]
return Vt, Vd, Vi
class Wrapper:
"""Mixin for wrapping opentrons base class. Stores reference to base object as _"""
def __init__(self, obj):
self._ = obj
def __getattr__(self, name):
return getattr(self._, name)
def __repr__(self):
return str(self._)
class Location(Wrapper):
def __init__(self, location, parent):
super().__init__(location)
self.parent = parent
@property
def volume(self):
return self.parent.volume
@volume.setter
def volume(self, value):
self.parent.volume = value
class Well(Wrapper):
"""Component of parent labware with volume tracking. Arithmetic operations allow navigation to neighboring wells.
Volume is specified in microliters.
See Labware wrapper for usage examples.
"""
def __init__(self, labware, parent, name, volume=0, log=False):
super().__init__(labware)
self._volume = volume
self._parent = parent
self._name = name
self.log = log
def __add__(self, val):
"""Return well of parent labware `val` columns to the right: B2+1=B3"""
assert type(val) is int
return self.parent[f'{self.row}{self.col + val}']
def __radd__(self, val):
"""Return well of parent labware `val` columns to the right: B2+1=B3"""
return self.__add__(val)
def __sub__(self, val):
"""Return well of parent labware `val` columns to the left: B2-1=B1"""
assert type(val) is int
return self.parent[f'{self.row}{self.col - val}']
def __mul__(self, val):
"""Return well of parent labware `val` rows down: B2*1=C2"""
assert type(val) is int
return self.parent[f'{chr(ord(self.row) + val)}{self.col}']
def __rmul__(self, val):
"""Return well of parent labware `val` rows down: B2*1=C2"""
return self.__mul__(val)
def __truediv__(self, val):
"""Return well of parent labware `val` rows up: B2/1=A2"""
assert type(val) is int
return self.parent[f'{chr(ord(self.row) - val)}{self.col}']
def __repr__(self):
return str(self._) + f' with {np.around(self.volume, 1)} μL'
@property
def volume(self):
return self._volume
@volume.setter
def volume(self, value):
assert value > 0, 'Requested action exceeds volume in well ' + str(self)
self._volume = value
if self.log:
print(self)
@property
def parent(self):
return self._parent
@property
def name(self):
return self._name
@property
def row(self):
return self._name[0]
@property
def col(self):
return int(self._name[1:])
@property
def well_number(self):
row = (ord(self.well_name[0]) - ord('A'))
col = int(self.well_name[1:])
number = row * len(self.parent.columns()) + col
return number
def bottom(self, *args, **kwargs):
return Location(self._.bottom(*args, **kwargs), self)
def set_volume(self, volume):
"""Arbitrarily specify volume of well."""
self.volume = volume
return self
def height(self, volume=None):
labware = self.parent._.load_name
volume = self.volume if volume == None else volume
if labware == 'opentrons_24_tuberack_nest_2ml_snapcap':
v = np.array([100, 150, 200, 300, 400, 500, 700, 1000, 2000])
h = np.array([4., 5., 6., 7.5, 9., 11, 14, 19, 34.5])
height = np.interp(volume, v, h)
if volume < v.min():
warnings.warn(f'Height is uncalibrated for volumes beneath {v.min()} μL.')
if volume > v.max():
height = 0.0158858 * volume + 2.85463 # from linear regression disregarding 100 and 150 uL
elif labware == 'opentrons_15_tuberack_falcon_15ml_conical':
v = np.array([0.5, 1, 2, 3, 4, 5, 6, 7, 8])
h = np.array([14.6, 19.7, 26.3, 34.0, 41.3, 48.1, 55.3, 61.3, 67.6])
height = volume*6.875*1e-3 + 13.325 # from linear regression disregarding 100 and 150 uL
if volume < 1000:
height = np.interp(volume, v, h)
if volume < v.min():
warnings.warn(f'Height is uncalibrated for volumes beneath {v.min()} μL.')
else:
warnings.warn(f'Height is poorly calibrated for volumes beneath 1000 μL.')
elif labware == 'agilenttubestrip_96_wellplate_200ul':
v = np.array([250, 200, 150, 100, 75, 50, 25, 10])
h = 20.76+np.array([-3.78, -6.04, -8.38, -10.72, -12.22, -13.95, -16.23, -17.92])
height = np.interp(volume, v[::-1], h[::-1])
if volume < v.min():
warnings.warn(f'Height is uncalibrated for volumes beneath {v.min()} μL.')
if volume > v.max():
warnings.warn(f'Height is uncalibrated for volumes above {v.max()} μL.')
elif (labware == 'opentrons_10_tuberack_falcon_4x50ml_6x15ml_conical') \
and (self.col in [3, 4]):
v = np.array([50, 40, 30, 20, 10, 5])*1000
h = 114-np.array([14.3, 32.6, 50.6, 68.7, 87.0, 96.0])
height = np.interp(volume, v[::-1], h[::-1])
if volume < v.min():
warnings.warn(f'Height is uncalibrated for volumes beneath {v.min()} μL.')
if volume > v.max():
warnings.warn(f'Height is uncalibrated for volumes above {v.max()} μL.')
else:
height = volume / (np.pi * (self.diameter / 2) ** 2)
return height
def level(self, z=0, volume=None):
return self.bottom(z=self.height(volume) / 1.1 + z)
class Labware(Wrapper):
"""Plate or some such. Something with wells.
Examples
--------
>>> P11 = Labware(protocol.load_labware('waters_96_wellplate_700ul', '4', label='P11')).set_volume(500)
>>> P11.range('A1','B2', by='row')
[[P11['A1'], P11['A2']], [P11['B1'], P11['B2']]]
>>> P11.range('A1', r=2, c=3)
[[P11['A1'], P11['A2'], P11['A3']], [P11['B1'], P11['B2'], P11['B3']]]
>>> P11.range('A1','B2', by='col')
[[P11['A1'], P11['B1']], [P11['A2'], P11['B2']]]
>>> P11.range_flat('A1','B2')
[P11['A1'], P11['A2'], P11['B1'], P11['B2']]
>>> P11['A1'] + 5
P11['A6']
"""
def __init__(self, labware):
super().__init__(labware)
self._wells = {well.well_name: Well(well, parent=self, name=well.well_name) for well in self._.wells()}
def __getitem__(self, key):
return self._wells[key]
def wells(self):
return [self[well.well_name] for well in self._.wells()]
def wells_by_name(self):
return self._wells
def rows(self):
return [[self[well.well_name] for well in row] for row in self._.rows()]
def columns(self):
return [[self[well.well_name] for well in col] for col in self._.columns()]
def set_volume(self, volume):
for well in self.wells():
well.set_volume(volume)
return self
def range(self, start_well, end_well=None, r=None, c=None, by='row'):
"""Get all wells in a rectangular region of labware, e.g. A2 through D8.
Specify upper-left corner as `start_well`, then either the bottom-right corner as `end_well` or a certain
number of rows `r` and columns `c`.
Returns a list-of-lists of wells, where each inner list represents one row (`by='row'`) or one column (`by='col'`).
"""
assert (end_well == None) ^ (r == None) & (end_well == None) ^ (
c == None), 'Specify either end_well or rows and columns, not both'
start_row = ord(start_well[0])
start_col = int(start_well[1:])
if end_well is not None:
end_row = ord(end_well[0])
end_col = int(end_well[1:])
if by == 'row':
wells = [[self[chr(row) + str(col)]
for col in range(start_col, end_col + 1)] for row in range(start_row, end_row + 1)]
elif by == 'col':
wells = [[self[chr(row) + str(col)]
for row in range(start_row, end_row + 1)] for col in range(start_col, end_col + 1)]
else:
if by == 'row':
wells = [[self[chr(start_row + row) + str(start_col + col)]
for col in range(c)] for row in range(r)]
if by == 'col':
wells = [[self[chr(start_row + row) + str(start_col + col)]
for row in range(r)] for col in range(c)]
return wells
def range_flat(self, *args, **kwargs):
"""Behaves as `Labware.range`, but returns a flattened list rather than a nested list."""
lst_of_lsts = self.range(*args, **kwargs)
return [well for lst in lst_of_lsts for well in lst]
class Pipette(Wrapper):
"""Refactors default high-level pipette behavior.
Specifying `new_tip='always'` uses a new tip *per aspiration* from the source well, rather than simply once per
destination well. Critical for avoiding contamination of stocks.
Aspirate/dispense update well volume appropriately. Adds 'slow' option, useful for wells with silicone caps.
Initialize with `max_volume` as an integer and the api protocol. E.g., if you define your protocol as
`def run(protocol: protocol_api.ProtocolContext)`, you should pass `protocol=protocol` when initializing.
Examples
--------
p200 = Pipette(protocol.load_instrument('p300_single_gen2', 'right', tip_racks=[tiprack_200]),
max_volume=200, protocol=protocol)
"""
def __init__(self, pipette, max_volume, protocol):
super(Pipette, self).__init__(pipette)
self.max_volume = max_volume
self.protocol = protocol
def aspirate(self, volume, well, rate=1, slow=False, touch_tip=False):
"""Aspirates from specified well, updating well volume.
'Slow' moves to top of well quickly, descends to aspirate location quickly, then slowly retracts. Useful for
wells with silicone caps.
"""
well.volume -= volume
if slow:
top = well.top() if type(well) is not Location else well.parent.top()
self.move_to(top)
self._.aspirate(volume, well._, rate)
self.protocol.max_speeds['Z'] = 10
self.move_to(top)
self.protocol.max_speeds['Z'] = 400
else:
self._.aspirate(volume, well._, rate)
if touch_tip: self.touch_tip()
def dispense(self, volume, well, rate=1, slow=False, touch_tip=False):
"""Dispenses into specified well, updating well volume.
'Slow' moves to top of well quickly, descends to dispense location quickly, then slowly retracts. Useful for
wells with silicone caps.
"""
well.volume += volume
if slow:
top = well.top() if type(well) is not Location else well.parent.top()
self.move_to(top)
self._.dispense(volume, well._, rate)
self.protocol.max_speeds['Z'] = 10
self.move_to(top)
self.protocol.max_speeds['Z'] = 400
else:
self._.dispense(volume, well._, rate)
if touch_tip: self.touch_tip()
def mix(self, repetitions, volume, location, rate=1):
self._.mix(repetitions, volume, location._, rate)
def one_to_one(self, volume, source, dest, rate=1, slow=False, new_tip='always', touch_tip=False):
assert all(type(input) is not list for input in [volume, source, dest])
# print(f'Transfering {volume} μL from {source} to {dest}')
# print(f'One-to-one transfer of {volume} from {source} to {dest}')
if volume > self.max_volume:
n = int(np.ceil(volume / self.max_volume))
# print(f'Splitting transfer of {volume} from {source} to {dest} into {n} transfers')
if new_tip == 'once':
self.pick_up_tip()
new_tip = False
for _ in range(n):
self.one_to_one(volume / n, source, dest, rate, slow, new_tip, touch_tip)
else:
if new_tip in ['always', 'once']: self.pick_up_tip()
self.aspirate(volume, source, rate, slow in ['source', 'both', True], touch_tip)
self.dispense(volume, dest, rate, slow in ['dest', 'both', True], touch_tip)
if new_tip in ['always', 'once']: self.drop_tip()
def one_to_many(self, volume, source, dests, rate=1, slow=False, new_tip='once', touch_tip=False):
max_v = self.max_volume
n_dests = len(dests)
tot_v = volume * n_dests
# print(f'One-to-many transfer of {volume} from {source} to {n_dests} destinations')
if (tot_v > max_v) & (n_dests > 1):
grp_sz = int(np.floor(max_v / volume)) if max_v > volume else 1
n_grps = int(np.ceil(n_dests / grp_sz))
grps = np.array_split(dests, n_grps)
# if new_tip == 'once':
# self.pick_up_tip()
# once = True
# new_tip = 'never'
# else:
# once = False
# print(f'Splitting transfer of {volume} from {source} to {n_dests} destinations into {n_grps} groups')
for grp in grps:
if new_tip == 'once':
self.pick_up_tip()
self.one_to_many(volume, source, list(grp), rate, slow, 'never', touch_tip)
self.drop_tip()
else:
self.one_to_many(volume, source, list(grp), rate, slow, new_tip, touch_tip)
# if once:
# self.drop_tip()
else:
if new_tip == 'once':
self.pick_up_tip()
if (volume > max_v) | (new_tip == 'always'):
if new_tip == 'once': new_tip = False
# print(f'Transfering {volume} from {source} to each of {n_dests} destinations')
for dest in dests:
self.one_to_one(volume, source, dest, rate, slow, new_tip, touch_tip)
else:
# print(f'Splitting transfer of {tot_v} from {source} across {n_dests} destinations with {volume} each')
self.aspirate(tot_v, source, rate, slow in ['source', 'both', True], touch_tip)
for dest in dests:
self.dispense(volume, dest, rate, slow in ['dest', 'both', True], touch_tip)
if new_tip == 'once':
self.drop_tip()
def transfer(self, volume, source, dest, **kwargs):
v_is_l = type(volume) in [list, np.ndarray]
s_is_l = type(source) in [list, np.ndarray]
d_is_l = type(dest) in [list, np.ndarray]
if all([v_is_l, s_is_l, d_is_l]):
assert len(volume) == len(dest) & len(volume) == len(source)
for vol, src, dst in zip(volume, source, dest):
self.one_to_one(vol, src, dst, **kwargs)
elif all([v_is_l, s_is_l, ~d_is_l]):
assert len(volume) == len(source)
for vol, src in zip(volume, source):
self.one_to_one(vol, src, dest, **kwargs)
elif all([v_is_l, ~s_is_l, d_is_l]):
assert len(volume) == len(dest)
for vol, dst in zip(volume, dest):
self.one_to_one(vol, source, dst, **kwargs)
elif all([~v_is_l, s_is_l, d_is_l]):
assert len(source) == len(dest)
for src, dst in zip(source, dest):
self.one_to_one(volume, src, dst, **kwargs)
elif all([~v_is_l, ~s_is_l, d_is_l]):
self.one_to_many(volume, source, dest, **kwargs)
elif all([~v_is_l, s_is_l, ~d_is_l]):
for src in source:
self.one_to_one(volume, src, dest, **kwargs)
elif ~all([v_is_l, s_is_l, d_is_l]):
self.one_to_one(volume, source, dest, **kwargs)
def distribute_and_mix(self, source, dests, dist_volume, mix_volume, n=3, z=2, rate=1, slow=False, new_tip='once'):
if new_tip == 'once': self.pick_up_tip()
dests = dests if type(dests) is list else [dests]
self.aspirate(dist_volume * len(dests), source, rate, slow in ['source', 'both', True])
for dest in dests:
self.dispense(dist_volume, dest.bottom(z=z), rate, slow in ['dest', 'both', True])
for dest in dests[::-1]:
self.mix(n, mix_volume, dest.bottom(z=z), rate=5)
self.touch_tip()
if new_tip == 'once': self.drop_tip()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment