Skip to content

Instantly share code, notes, and snippets.

@roryk
Last active January 22, 2016 14:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save roryk/4258845bc3389f7548cf to your computer and use it in GitHub Desktop.
Save roryk/4258845bc3389f7548cf to your computer and use it in GitHub Desktop.
import os
import subprocess
from argparse import ArgumentParser
def find_parallel_environment(queue):
"""Find an SGE/OGE parallel environment for running multicore jobs in specified queue.
"""
base_queue = os.path.splitext(queue)[0]
queue = base_queue + ".q"
available_pes = []
for name in subprocess.check_output(["qconf", "-spl"]).strip().split():
if name:
for line in subprocess.check_output(["qconf", "-sp", name]).split("\n"):
print name, line
if _has_parallel_environment(line):
if (_queue_can_access_pe(name, queue) or _queue_can_access_pe(name, base_queue)):
available_pes.append(name)
if len(available_pes) == 0:
raise ValueError("Could not find an SGE environment configured for parallel execution. "
"See %s for SGE setup instructions." %
"https://blogs.oracle.com/templedf/entry/configuring_a_new_parallel_environment")
else:
return _prioritize_pes(available_pes)
def _has_parallel_environment(line):
if line.startswith("allocation_rule"):
if line.find("$pe_slots") >= 0 or line.find("$fill_up") >= 0:
return True
return False
def _prioritize_pes(choices):
"""Prioritize and deprioritize paired environments based on names.
We're looking for multiprocessing friendly environments, so prioritize ones with SMP
in the name and deprioritize those with MPI.
"""
# lower scores = better
ranks = {"smp": -1, "mpi": 1}
sort_choices = []
for n in choices:
# Identify if it fits in any special cases
special_case = False
for k, val in ranks.items():
if n.lower().find(k) >= 0:
sort_choices.append((val, n))
special_case = True
break
if not special_case: # otherwise, no priority/de-priority
sort_choices.append((0, n))
sort_choices.sort()
return sort_choices[0][1]
def _parseSGEConf(data):
"""Handle SGE multiple line output nastiness.
From: https://github.com/clovr/vappio/blob/master/vappio-twisted/vappio_tx/load/sge_queue.py
"""
lines = data.split('\n')
multiline = False
ret = {}
for line in lines:
line = line.strip()
if line:
if not multiline:
key, value = line.split(' ', 1)
value = value.strip().rstrip('\\')
ret[key] = value
else:
# Making use of the fact that the key was created
# in the previous iteration and is stil lin scope
ret[key] += line
multiline = (line[-1] == '\\')
return ret
def _queue_can_access_pe(pe_name, queue):
"""Check if a queue has access to a specific parallel environment, using qconf.
"""
try:
queue_config = _parseSGEConf(subprocess.check_output(["qconf", "-sq", queue]))
except:
print "Error parsing SGEConfig."
return False
for test_pe_name in queue_config["pe_list"].split():
print "SGEConfig parsed, checking %s for a match to %s." %(test_pe_name, pe_name)
test_pe_name = test_pe_name.split(",")[0].strip()
if test_pe_name == pe_name:
return True
return False
if __name__ == "__main__":
parser = ArgumentParser()
parser.add_argument("queue", help="Name of queue to use.")
args = parser.parse_args()
find_parallel_environment(args.queue)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment