Created
December 25, 2020 11:59
-
-
Save unkcpz/da0774b054b656e0f82b22e0ad7d4d5e to your computer and use it in GitHub Desktop.
script which reproduce the memory leak issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
########################################################################### | |
# Copyright (c), The AiiDA team. All rights reserved. # | |
# This file is part of the AiiDA code. # | |
# # | |
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core # | |
# For further information on the license, see the LICENSE.txt file # | |
# For further information please visit http://www.aiida.net # | |
########################################################################### | |
# pylint: disable=no-name-in-module | |
"""Tests to run with a running daemon.""" | |
import subprocess | |
import sys | |
import time | |
from aiida.common import exceptions | |
from aiida.engine import run, submit | |
from aiida.engine.daemon.client import get_daemon_client | |
from aiida.engine.persistence import ObjectLoader | |
from aiida.manage.caching import enable_caching | |
from aiida.orm import CalcJobNode, load_node, Int, Str, List, Dict, load_code | |
from aiida.plugins import CalculationFactory, WorkflowFactory | |
from workchains import ( | |
NestedWorkChain, DynamicNonDbInput, DynamicDbInput, DynamicMixedInput, ListEcho, CalcFunctionRunnerWorkChain, | |
WorkFunctionRunnerWorkChain, NestedInputNamespace, SerializeWorkChain, ArithmeticAddBaseWorkChain | |
) | |
CODENAME_ADD = 'add@localhost' | |
CODENAME_DOUBLER = 'doubler' | |
TIMEOUTSECS = 4 * 60 # 4 minutes | |
NUMBER_CALCULATIONS = 10 # Number of calculations to submit | |
NUMBER_WORKCHAINS = 20 # Number of workchains to submit | |
def print_daemon_log(): | |
"""Print daemon log.""" | |
daemon_client = get_daemon_client() | |
daemon_log = daemon_client.daemon_log_file | |
print(f"Output of 'cat {daemon_log}':") | |
try: | |
print(subprocess.check_output( | |
['cat', f'{daemon_log}'], | |
stderr=subprocess.STDOUT, | |
)) | |
except subprocess.CalledProcessError as exception: | |
print(f'Note: the command failed, message: {exception}') | |
def jobs_have_finished(pks): | |
"""Check if jobs with given pks have finished.""" | |
finished_list = [load_node(pk).is_terminated for pk in pks] | |
node_list = [load_node(pk) for pk in pks] | |
num_finished = len([_ for _ in finished_list if _]) | |
for node in node_list: | |
if not node.is_terminated: | |
print(f'not terminated: {node.pk} [{node.process_state}]') | |
print(f'{num_finished}/{len(finished_list)} finished') | |
return False not in finished_list | |
def print_report(pk): | |
"""Print the process report for given pk.""" | |
print(f"Output of 'verdi process report {pk}':") | |
try: | |
print(subprocess.check_output( | |
['verdi', 'process', 'report', f'{pk}'], | |
stderr=subprocess.STDOUT, | |
)) | |
except subprocess.CalledProcessError as exception: | |
print(f'Note: the command failed, message: {exception}') | |
def validate_process_functions(expected_results): | |
"""Validate the calcfunction and workfunction.""" | |
valid = True | |
for pk, expected_result in expected_results.items(): | |
calc = load_node(pk) | |
if not calc.is_finished_ok: | |
print(f'Calc<{pk}> not finished ok: process_state<{calc.process_state}> exit_status<{calc.exit_status}>') | |
print_report(pk) | |
valid = False | |
try: | |
actual_result = calc.outputs.result | |
except exceptions.NotExistent: | |
print(f'Could not retrieve `result` output for process<{pk}>') | |
print_report(pk) | |
valid = False | |
if actual_result != expected_result: | |
print(f'* UNEXPECTED VALUE {actual_result} for calc pk={pk}: I expected {expected_result}') | |
valid = False | |
return valid | |
def validate_calculations(expected_results): | |
"""Validate the calculations.""" | |
valid = True | |
actual_dict = {} | |
for pk, expected_dict in expected_results.items(): | |
calc = load_node(pk) | |
if not calc.is_finished_ok: | |
print(f'Calc<{pk}> not finished ok: process_state<{calc.process_state}> exit_status<{calc.exit_status}>') | |
print_report(pk) | |
valid = False | |
try: | |
actual_dict = calc.outputs.output_parameters.get_dict() | |
except exceptions.NotExistent: | |
print(f'Could not retrieve `output_parameters` node for Calculation<{pk}>') | |
print_report(pk) | |
valid = False | |
try: | |
actual_dict['retrieved_temporary_files'] = dict(actual_dict['retrieved_temporary_files']) | |
except KeyError: | |
# If the retrieval fails we simply pass as the following check of the actual value will fail anyway | |
pass | |
if actual_dict != expected_dict: | |
print(f'* UNEXPECTED VALUE {actual_dict} for calc pk={pk}: I expected {expected_dict}') | |
valid = False | |
return valid | |
def validate_workchains(expected_results): | |
"""Validate the workchains.""" | |
valid = True | |
for pk, expected_value in expected_results.items(): | |
this_valid = True | |
try: | |
calc = load_node(pk) | |
actual_value = calc.outputs.output | |
except (exceptions.NotExistent, AttributeError) as exception: | |
print( | |
'* UNABLE TO RETRIEVE VALUE for workchain pk={}: I expected {}, I got {}: {}'.format( | |
pk, expected_value, type(exception), exception | |
) | |
) | |
valid = False | |
this_valid = False | |
actual_value = None | |
# I check only if this_valid, otherwise calc could not exist | |
if this_valid and not calc.is_finished_ok: | |
print( | |
'Calculation<{}> not finished ok: process_state<{}> exit_status<{}>'.format( | |
pk, calc.process_state, calc.exit_status | |
) | |
) | |
print_report(pk) | |
valid = False | |
this_valid = False | |
# I check only if this_valid, otherwise actual_value could be unset | |
if this_valid and actual_value != expected_value: | |
print( | |
'* UNEXPECTED VALUE {}, type {} for workchain pk={}: I expected {}, type {}'.format( | |
actual_value, type(actual_value), pk, expected_value, type(expected_value) | |
) | |
) | |
valid = False | |
this_valid = False | |
return valid | |
def validate_cached(cached_calcs): | |
""" | |
Check that the calculations with created with caching are indeed cached. | |
""" | |
valid = True | |
for calc in cached_calcs: | |
if not calc.is_finished_ok: | |
print( | |
'Cached calculation<{}> not finished ok: process_state<{}> exit_status<{}>'.format( | |
calc.pk, calc.process_state, calc.exit_status | |
) | |
) | |
print_report(calc.pk) | |
valid = False | |
if '_aiida_cached_from' not in calc.extras or calc.get_hash() != calc.get_extra('_aiida_hash'): | |
print(f'Cached calculation<{calc.pk}> has invalid hash') | |
print_report(calc.pk) | |
valid = False | |
if isinstance(calc, CalcJobNode): | |
original_calc = load_node(calc.get_extra('_aiida_cached_from')) | |
files_original = original_calc.list_object_names() | |
files_cached = calc.list_object_names() | |
if not files_cached: | |
print(f'Cached calculation <{calc.pk}> does not have any raw inputs files') | |
print_report(calc.pk) | |
valid = False | |
if not files_original: | |
print( | |
'Original calculation <{}> does not have any raw inputs files after being cached from.'.format( | |
original_calc.pk | |
) | |
) | |
valid = False | |
if set(files_original) != set(files_cached): | |
print( | |
'different raw input files [{}] vs [{}] for original<{}> and cached<{}> calculation'.format( | |
set(files_original), set(files_cached), original_calc.pk, calc.pk | |
) | |
) | |
valid = False | |
return valid | |
def launch_calculation(code, counter, inputval): | |
""" | |
Launch calculations to the daemon through the Process layer | |
""" | |
process, inputs, expected_result = create_calculation_process(code=code, inputval=inputval) | |
calc = submit(process, **inputs) | |
print(f'[{counter}] launched calculation {calc.uuid}, pk={calc.pk}') | |
return calc, expected_result | |
def run_calculation(code, counter, inputval): | |
""" | |
Run a calculation through the Process layer. | |
""" | |
process, inputs, expected_result = create_calculation_process(code=code, inputval=inputval) | |
_, calc = run.get_node(process, **inputs) | |
print(f'[{counter}] ran calculation {calc.uuid}, pk={calc.pk}') | |
return calc, expected_result | |
def create_calculation_process(code, inputval): | |
""" | |
Create the process and inputs for a submitting / running a calculation. | |
""" | |
TemplatereplacerCalculation = CalculationFactory('templatereplacer') | |
parameters = Dict(dict={'value': inputval}) | |
template = Dict( | |
dict={ | |
# The following line adds a significant sleep time. | |
# I set it to 1 second to speed up tests | |
# I keep it to a non-zero value because I want | |
# To test the case when AiiDA finds some calcs | |
# in a queued state | |
# 'cmdline_params': ["{}".format(counter % 3)], # Sleep time | |
'cmdline_params': ['1'], | |
'input_file_template': '{value}', # File just contains the value to double | |
'input_file_name': 'value_to_double.txt', | |
'output_file_name': 'output.txt', | |
'retrieve_temporary_files': ['triple_value.tmp'] | |
} | |
) | |
options = { | |
'resources': { | |
'num_machines': 1 | |
}, | |
'max_wallclock_seconds': 5 * 60, | |
'withmpi': False, | |
'parser_name': 'templatereplacer.doubler', | |
} | |
expected_result = {'value': 2 * inputval, 'retrieved_temporary_files': {'triple_value.tmp': str(inputval * 3)}} | |
inputs = { | |
'code': code, | |
'parameters': parameters, | |
'template': template, | |
'metadata': { | |
'options': options, | |
} | |
} | |
return TemplatereplacerCalculation, inputs, expected_result | |
def run_arithmetic_add(): | |
"""Run the `ArithmeticAddCalculation`.""" | |
ArithmeticAddCalculation = CalculationFactory('arithmetic.add') | |
code = load_code(CODENAME_ADD) | |
inputs = { | |
'x': Int(1), | |
'y': Int(2), | |
'code': code, | |
} | |
# Normal inputs should run just fine | |
results, node = run.get_node(ArithmeticAddCalculation, **inputs) | |
assert node.is_finished_ok, node.exit_status | |
assert results['sum'] == 3 | |
def run_base_restart_workchain(): | |
"""Run the `AddArithmeticBaseWorkChain` a few times for various inputs.""" | |
code = load_code(CODENAME_ADD) | |
inputs = { | |
'add': { | |
'x': Int(1), | |
'y': Int(2), | |
'code': code, | |
} | |
} | |
# Normal inputs should run just fine | |
results, node = run.get_node(ArithmeticAddBaseWorkChain, **inputs) | |
assert node.is_finished_ok, node.exit_status | |
assert len(node.called) == 1 | |
assert 'sum' in results | |
assert results['sum'].value == 3 | |
# With one input negative, the sum will be negative which will fail the calculation, but the error handler should | |
# fix it, so the second calculation should finish successfully | |
inputs['add']['y'] = Int(-4) | |
results, node = run.get_node(ArithmeticAddBaseWorkChain, **inputs) | |
assert node.is_finished_ok, node.exit_status | |
assert len(node.called) == 2 | |
assert 'sum' in results | |
assert results['sum'].value == 5 | |
# The silly sanity check aborts the workchain if the sum is bigger than 10 | |
inputs['add']['y'] = Int(10) | |
results, node = run.get_node(ArithmeticAddBaseWorkChain, **inputs) | |
assert not node.is_finished_ok, node.process_state | |
assert node.exit_status == ArithmeticAddBaseWorkChain.exit_codes.ERROR_TOO_BIG.status, node.exit_status # pylint: disable=no-member | |
assert len(node.called) == 1 | |
# Check that overriding default handler enabled status works | |
inputs['add']['y'] = Int(1) | |
inputs['handler_overrides'] = Dict(dict={'disabled_handler': True}) | |
results, node = run.get_node(ArithmeticAddBaseWorkChain, **inputs) | |
assert not node.is_finished_ok, node.process_state | |
assert node.exit_status == ArithmeticAddBaseWorkChain.exit_codes.ERROR_ENABLED_DOOM.status, node.exit_status # pylint: disable=no-member | |
assert len(node.called) == 1 | |
def run_multiply_add_workchain(): | |
"""Run the `MultiplyAddWorkChain`.""" | |
MultiplyAddWorkChain = WorkflowFactory('arithmetic.multiply_add') | |
code = load_code(CODENAME_ADD) | |
inputs = { | |
'x': Int(1), | |
'y': Int(2), | |
'z': Int(3), | |
'code': code, | |
} | |
# Normal inputs should run just fine | |
results, node = run.get_node(MultiplyAddWorkChain, **inputs) | |
assert node.is_finished_ok, node.exit_status | |
assert len(node.called) == 2 | |
assert 'result' in results | |
assert results['result'].value == 5 | |
def main(): | |
"""Launch a bunch of calculation jobs and workchains.""" | |
# pylint: disable=too-many-locals,too-many-statements,too-many-branches | |
expected_results_process_functions = {} | |
expected_results_calculations = {} | |
expected_results_workchains = {} | |
code_doubler = load_code(CODENAME_DOUBLER) | |
# Submitting the Calculations the new way directly through the launchers | |
print(f'Submitting {NUMBER_CALCULATIONS} calculations to the daemon') | |
for counter in range(1, NUMBER_CALCULATIONS + 1): | |
inputval = counter | |
calc, expected_result = launch_calculation(code=code_doubler, counter=counter, inputval=inputval) | |
expected_results_calculations[calc.pk] = expected_result | |
# Submitting the Workchains | |
print(f'Submitting {NUMBER_WORKCHAINS} workchains to the daemon') | |
for index in range(NUMBER_WORKCHAINS): | |
inp = Int(index) | |
submit(NestedWorkChain, inp=inp) | |
print("Submitting a workchain with 'submit'.") | |
builder = NestedWorkChain.get_builder() | |
input_val = 4 | |
builder.inp = Int(input_val) | |
proc = submit(builder) | |
expected_results_workchains[proc.pk] = input_val | |
print('Submitting a workchain with a nested input namespace.') | |
value = Int(-12) | |
pk = submit(NestedInputNamespace, foo={'bar': {'baz': value}}).pk | |
print('Submitting a workchain with a dynamic non-db input.') | |
value = [4, 2, 3] | |
pk = submit(DynamicNonDbInput, namespace={'input': value}).pk | |
expected_results_workchains[pk] = value | |
print('Submitting a workchain with a dynamic db input.') | |
value = 9 | |
pk = submit(DynamicDbInput, namespace={'input': Int(value)}).pk | |
expected_results_workchains[pk] = value | |
print('Submitting a workchain with a mixed (db / non-db) dynamic input.') | |
value_non_db = 3 | |
value_db = Int(2) | |
pk = submit(DynamicMixedInput, namespace={'inputs': {'input_non_db': value_non_db, 'input_db': value_db}}).pk | |
expected_results_workchains[pk] = value_non_db + value_db | |
print('Submitting the serializing workchain') | |
pk = submit(SerializeWorkChain, test=Int).pk | |
expected_results_workchains[pk] = ObjectLoader().identify_object(Int) | |
print('Submitting the ListEcho workchain.') | |
list_value = List() | |
list_value.extend([1, 2, 3]) | |
pk = submit(ListEcho, list=list_value).pk | |
expected_results_workchains[pk] = list_value | |
print('Submitting a WorkChain which contains a workfunction.') | |
value = Str('workfunction test string') | |
pk = submit(WorkFunctionRunnerWorkChain, input=value).pk | |
expected_results_workchains[pk] = value | |
print('Submitting a WorkChain which contains a calcfunction.') | |
value = Int(1) | |
pk = submit(CalcFunctionRunnerWorkChain, input=value).pk | |
expected_results_workchains[pk] = Int(2) | |
calculation_pks = sorted(expected_results_calculations.keys()) | |
workchains_pks = sorted(expected_results_workchains.keys()) | |
process_functions_pks = sorted(expected_results_process_functions.keys()) | |
pks = calculation_pks + workchains_pks + process_functions_pks | |
print('Wating for end of execution...') | |
start_time = time.time() | |
exited_with_timeout = True | |
while time.time() - start_time < TIMEOUTSECS: | |
time.sleep(15) # Wait a few seconds | |
# Print some debug info, both for debugging reasons and to avoid | |
# that the test machine is shut down because there is no output | |
print('#' * 78) | |
print(f'####### TIME ELAPSED: {time.time() - start_time} s') | |
print('#' * 78) | |
print("Output of 'verdi process list -a':") | |
try: | |
print(subprocess.check_output( | |
['verdi', 'process', 'list', '-a'], | |
stderr=subprocess.STDOUT, | |
)) | |
except subprocess.CalledProcessError as exception: | |
print(f'Note: the command failed, message: {exception}') | |
print("Output of 'verdi daemon status':") | |
try: | |
print(subprocess.check_output( | |
['verdi', 'daemon', 'status'], | |
stderr=subprocess.STDOUT, | |
)) | |
except subprocess.CalledProcessError as exception: | |
print(f'Note: the command failed, message: {exception}') | |
if jobs_have_finished(pks): | |
print('Calculation terminated its execution') | |
exited_with_timeout = False | |
break | |
if exited_with_timeout: | |
print_daemon_log() | |
print('') | |
print(f'Timeout!! Calculation did not complete after {TIMEOUTSECS} seconds') | |
sys.exit(2) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment