Skip to content

Instantly share code, notes, and snippets.

@thalljiscience
Last active June 7, 2021 23:32
Show Gist options
  • Save thalljiscience/53de670a8422bb3d92f1eea8f05ad4d5 to your computer and use it in GitHub Desktop.
Save thalljiscience/53de670a8422bb3d92f1eea8f05ad4d5 to your computer and use it in GitHub Desktop.
minknow_api python scripts for monitoring and stopping a run
""" Get sequencing run information for a MinION position and run_id"""
import argparse
import logging
import sys
import os
from minknow_api.manager import Manager
from minknow_api.tools import protocols
def parse_args():
"""Build and execute a command line argument for querying run info
Returns:
Parsed arguments to be used when querying run info.
"""
parser = argparse.ArgumentParser(
description="""
Query run info for a running MinKNOW instance.
"""
)
parser.add_argument(
"--host",
default="localhost",
help="IP address of the machine running MinKNOW (defaults to localhost)",
)
parser.add_argument(
"--port",
help="Port to connect to on host (defaults to standard MinKNOW port based on tls setting)",
)
parser.add_argument(
"--no-tls", help="Disable tls connection", default=False, action="store_true"
)
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
parser.add_argument(
"--position",
help="position on the machine (or MinION serial number) to run the protocol at",
)
parser.add_argument(
"--run_id",
help="The run_id for to query",
)
args = parser.parse_args()
return args
def is_position_selected(position, args):
# First check for name match:
if args.position == position.name:
return True
return False
def main():
os.environ['MINKNOW_TRUSTED_CA'] = "/opt/ont/minknow/conf/rpc-certs/ca.crt"
# Parse arguments:
args = parse_args()
# Specify --verbose on the command line to get extra details about
if args.verbose:
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
# Construct a manager using the host + port provided:
manager = Manager(host=args.host, port=args.port, use_tls=not args.no_tls)
# Find positions:
positions = manager.flow_cell_positions()
filtered_positions = list(
filter(lambda pos: is_position_selected(pos, args), positions)
)
# At least one position needs to be selected:
if not filtered_positions:
print(
"No positions selected - specify `--position` or `--flow-cell-id`"
)
sys.exit(1)
for pos in filtered_positions:
# Connect to the sequencing position:
position_connection = pos.connect()
# Check if a flowcell is available for sequencing
flow_cell_info = position_connection.device.get_flow_cell_info()
if not flow_cell_info.has_flow_cell:
print("No flow cell present in position %s" % pos)
sys.exit(1)
# Query run info
print("Getting run info for %s positions" % len(filtered_positions))
for pos in filtered_positions:
# Connect to the sequencing position:
position_connection = pos.connect()
# Now query the run_id:
print("Getting info for %s" % args.run_id)
result = position_connection.protocol.get_run_info(
run_id=args.run_id
)
print("Run state: %s" % result.state)
if not result.end_time.seconds:
print("Protocol is running")
print(result); # Print the entire return stream
# Individual fields can be reported as below
"""
print("Run state: %s" % result.state)
print("Protocol ID: %s" % result.protocol_id)
print("Start Time: %s" % result.start_time)
print("End Time: %s" % result.end_time)
print("Run IDs: %s" % result.acquisition_run_ids)
print("User Info: %s" % result.user_info)
print("Device: %s" % result.device)
print("Flow cell: %s" % result.flow_cell)
print("Meta Info: %s" % result.meta_info)
"""
if __name__ == "__main__":
main()
"""Get run acquisition information for a MinION position and acquisition_run_id"""
import argparse
import logging
import sys
import os
import grpc
from minknow_api.manager import Manager
import minknow_api.statistics_pb2
def parse_args():
"""Build and execute a command line argument for querying run info
Returns:
Parsed arguments to be used when querying run info.
"""
parser = argparse.ArgumentParser(
description="""
Query run info for a running MinKNOW instance.
"""
)
parser.add_argument(
"--host",
default="localhost",
help="IP address of the machine running MinKNOW (defaults to localhost)",
)
parser.add_argument(
"--port",
help="Port to connect to on host (defaults to standard MinKNOW port based on tls setting)",
)
parser.add_argument(
"--no-tls", help="Disable tls connection", default=False, action="store_true"
)
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
parser.add_argument(
"--position",
help="position on the machine (or MinION serial number) to run the protocol at",
)
parser.add_argument(
"--run_id",
help="The run_id for to query",
)
args = parser.parse_args()
return args
def main():
os.environ['MINKNOW_TRUSTED_CA'] = "/opt/ont/minknow/conf/rpc-certs/ca.crt"
args = parse_args()
# Specify --verbose on the command line to get extra details about
if args.verbose:
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
# Construct a manager using the host + port provided:
manager = Manager(host=args.host, port=args.port, use_tls=not args.no_tls)
# Find positions:
positions = manager.flow_cell_positions()
filtered_positions = list(filter(lambda pos: pos.name == args.position, positions))
# At least one position needs to be selected:
if not filtered_positions:
print(
"No positions selected - specify `--position`"
)
sys.exit(1)
for pos in filtered_positions:
# Connect to the sequencing position:
position_connection = pos.connect()
# Check if a flowcell is available for sequencing
flow_cell_info = position_connection.device.get_flow_cell_info()
if not flow_cell_info.has_flow_cell:
print("No flow cell present in position %s" % pos)
sys.exit(1)
# Query run info
print("Getting run info for %s positions" % len(filtered_positions))
for pos in filtered_positions:
# Connect to the sequencing position:
position_connection = pos.connect()
# Now query the run_id:
print("Getting info for %s" % args.run_id)
result = position_connection.acquisition.get_acquisition_info(
run_id=args.run_id
)
print(result)
if __name__ == "__main__":
main()
"""Check and see if there are run_ids on a host and MinION position"""
import argparse
import logging
import sys
import os
from minknow_api.manager import Manager
from minknow_api.tools import protocols
def parse_args():
"""Build and execute a command line argument for listing run_ids
Returns:
Parsed arguments to be used when stopping a protocol.
"""
parser = argparse.ArgumentParser(
description="""
List active protocol runs.
"""
)
parser.add_argument(
"--host",
default="localhost",
help="IP address of the machine running MinKNOW (defaults to localhost)",
)
parser.add_argument(
"--port",
help="Port to connect to on host (defaults to standard MinKNOW port based on tls setting)",
)
parser.add_argument(
"--no-tls", help="Disable tls connection", default=False, action="store_true"
)
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
parser.add_argument(
"--position",
help="position on the machine (or MinION serial number)",
)
args = parser.parse_args()
return args
def is_position_selected(position, args):
# First check for name match:
if args.position == position.name:
return True
return False
def main():
os.environ['MINKNOW_TRUSTED_CA'] = "/opt/ont/minknow/conf/rpc-certs/ca.crt"
# Parse arguments to be passed to running protocols:
args = parse_args()
# Specify --verbose on the command line to get extra details about
if args.verbose:
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
# Construct a manager using the host + port provided:
manager = Manager(host=args.host, port=args.port, use_tls=not args.no_tls)
# Find which positions we are going to list protocols from:
positions = manager.flow_cell_positions()
filtered_positions = list(
filter(lambda pos: is_position_selected(pos, args), positions)
)
# At least one position needs to be selected:
if not filtered_positions:
print(
"No positions selected - specify `--position` or `--flow-cell-id`"
)
sys.exit(1)
protocol_identifiers = {}
for pos in filtered_positions:
# Connect to the sequencing position:
position_connection = pos.connect()
# Check if a flowcell is available for sequencing
flow_cell_info = position_connection.device.get_flow_cell_info()
if not flow_cell_info.has_flow_cell:
print("No flow cell present in position %s" % pos)
sys.exit(1)
# List run ids from the requested postitions:
for pos in filtered_positions:
# Connect to the sequencing position:
position_connection = pos.connect()
result = position_connection.protocol.list_protocol_runs()
print(result)
if __name__ == "__main__":
main()
import argparse
import logging
import sys
import os
from minknow_api.manager import Manager
from minknow_api.tools import protocols
#From acquisition.proto DataAction Enum:
STOP_DEFAULT = 0
STOP_KEEP_ALL_DATA = 1
STOP_FINISH_PROCESSING = 2
def parse_args():
"""Build and execute a command line argument for stopping a protocol
Returns:
Parsed arguments to be used when stopping a protocol.
"""
parser = argparse.ArgumentParser(
description="""
Stop a sequencing protocol in a running MinKNOW instance.
"""
)
parser.add_argument(
"--host",
default="localhost",
help="IP address of the machine running MinKNOW (defaults to localhost)",
)
parser.add_argument(
"--port",
help="Port to connect to on host (defaults to standard MinKNOW port based on tls setting)",
)
parser.add_argument(
"--no-tls", help="Disable tls connection", default=False, action="store_true"
)
parser.add_argument("--verbose", action="store_true", help="Enable debug logging")
parser.add_argument(
"--position",
help="position on the machine (or MinION serial number) to run the protocol at",
)
parser.add_argument(
"--stop_action",
help="Specify stop action (STOP_DEFAULT = FINISH_PROCESSING = finish processing, STOP_KEEP_ALL_DATA = stop basecalling an send unprocessed files to skipped, STOP_FINISH_PROCESSING = keep basecalling until all of the reads have been basecalled"
)
args = parser.parse_args()
return args
def is_position_selected(position, args):
# First check for name match:
if args.position == position.name:
return True
return False
def main():
os.environ['MINKNOW_TRUSTED_CA'] = "/opt/ont/minknow/conf/rpc-certs/ca.crt"
# Parse arguments to be passed to running protocols:
args = parse_args()
# Specify --verbose on the command line to get extra details about
if args.verbose:
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
# Construct a manager using the host + port provided:
manager = Manager(host=args.host, port=args.port, use_tls=not args.no_tls)
# Find which positions we are going to stop protocol on:
positions = manager.flow_cell_positions()
filtered_positions = list(
filter(lambda pos: is_position_selected(pos, args), positions)
)
# At least one position needs to be selected:
if not filtered_positions:
print(
"No positions selected - specify `--position` or `--flow-cell-id`"
)
sys.exit(1)
protocol_identifiers = {}
for pos in filtered_positions:
# Connect to the sequencing position:
position_connection = pos.connect()
# Check if a flowcell is available for sequencing
flow_cell_info = position_connection.device.get_flow_cell_info()
if not flow_cell_info.has_flow_cell:
print("No flow cell present in position %s" % pos)
sys.exit(1)
stop_action=STOP_DEFAULT
# Determine stop_action (aleady set to STOP_DEFAULT by default, so just determine if it needs to be overridden
if args.stop_action == "STOP_KEEP_ALL_DATA":
stop_action=STOP_KEEP_ALL_DATA
else:
if args.stop_action == "STOP_FINISH_PROCESSING":
stop_action=STOP_FINISH_PROCESSING
# Stop protocol on the requested postitions:
print("Stopping protocol on %s positions" % len(filtered_positions))
for pos in filtered_positions:
# Connect to the sequencing position:
position_connection = pos.connect()
# Find the protocol identifier for the required protocol:
print("Stopping run on position %s" % (pos.name))
print("Stopping protocol on position %s" % args.position)
result = position_connection.protocol.stop_protocol(
data_action_on_stop=stop_action
)
if __name__ == "__main__":
main()
@thalljiscience
Copy link
Author

The above is a followup to an earlier script to use the Oxford Nanopore minknow_api on an NVIDIA Jetson Xavier device to stop a running sequencing protocol programmatically as part of the creation of a library for automating nanopore sequencing control and monitoring on dedicated NVIDIA Jetson devices from a specialized analysis program on a remote PC.

After learning a bit about gRPC messaging, protobuf compilation and my unfortunately horrendous Python skills, I traced the messaging for stopping a protocol though the auto-compiled code in /usr/local/lib/python3.6/dist-packages/minknow_api/protocol_service.py and /usr/local/lib/python3.6/dist-packages/minknow_api/protocol_pb2.py. All that is really needed to stop a running sequencing protocol is to know the minion device position it is running on. The easiest way I find to create a simple stop_protocol script is to not bother editing the /usr/local/lib/python3.6/dist-packages/minknow_api/tools/protocols.py file to add a "stop_protocol" method after the "start_protocol" method (thus no need to risk corrupting any core api files either), and instead just get a connection object for a minion position and call stop_protocol directly from a small script. It also turns out to be handy to get a connection object and pass it a run_id to get real-time information about a running sequencing run. Above is a simple Python script to stop a protocol without altering any of the installed minknow_api files that bypasses the indirect call to ...tools/protocols.py used with the ONT start_protocol python example. I kept the options to specify host (defaults to localhost), port, secure/insecure and verbose logging. I've had some trouble getting gRPC calls to work from C# on a PC trying to communicate with then python minknow_api on a Jetson. I can get all the protos to compile correctly in C# and compile code with all of the correct syntax, and debug objects in memory with the correct structures apparently created, but I always get TransientFailure on an open gRPC Channel whenever I try to do something as simple as listing flow cell positions, so I am giving up on that temporarily. It seems that it would be safer to have a series of python scripts on the Jetson that can be called through SSH anyway. That way, if the minknow_api is changed, PC-side code will not need to be changed. Only scripts on a any Jetson with updated API code would need modifying.

@juhench
Copy link

juhench commented Jun 4, 2021

Hi! Thanks a lot! This is great!!! I am currently out of the office but will notify my colleague to test. I will report back. Best, J.

@thalljiscience
Copy link
Author

@juhench: In case these also help, I've had to create little Python scripts to allow some real-time run monitoring, so I added them here. I'm new to this gist stuff, and github in general, so I could only figure out how to add them as code files at the top. When I try to embed code in a comment, it always gets arbitrarily segmented in a way that makes no sense to me. I also finally got around to working with that AGX Xavier. I went ahead and flashed the eMMC with Jetpack 4.5.1 (I think that was the number) and it was pretty confusing. I had to actually manually write the boot information to direct booting to the eMMC because after flashing the OS, the AGX only booted into root console - not sure if that is supposed to happen, and SDKManager certainly said nothing about that. Anyway, the scripts added above are for getting a list of run_ids for a MinION, getting comprehensive run info for a run_id, and retrieving active sequencing run live acquisition information. The stream data appears to come back in a json-like format, but not json per se, but looks pretty easy to extract relevant info or turn the whole output into a serializable object structure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment