Skip to content

Instantly share code, notes, and snippets.

@eliezio
Created May 1, 2020 17:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eliezio/f1083c2c725909a33714473e1bf62796 to your computer and use it in GitHub Desktop.
Save eliezio/f1083c2c725909a33714473e1bf62796 to your computer and use it in GitHub Desktop.
Test SW Upgrade using ncclient
import time
import pytest
import nctest
from basetest import BaseTest
from settings import SWUG_TIMED_TRANSITION_TO
NC10_NAMESPACE = "urn:ietf:params:xml:ns:netconf:base:1.0"
SWUG_NAMESPACE = "http://onap.org/pnf-sw-upgrade"
NAMESPACES = {
"nc": NC10_NAMESPACE,
"swug": SWUG_NAMESPACE,
}
def xml_create_upgrade_package(version, initial_status, action) -> str:
return f"""<nc:config xmlns:nc="{NC10_NAMESPACE}">
<software-upgrade xmlns="{SWUG_NAMESPACE}">
<upgrade-package>
<id>sw-id-{version}</id>
<current-status>{initial_status}</current-status>
<user-label>trial software update</user-label>
<uri>sftp://127.0.0.1/test_software_2.img</uri>
<action>{action}</action>
<software-version>{version}</software-version>
<user>test_user</user>
<password>test_password</password>
</upgrade-package>
</software-upgrade>
</nc:config>"""
def xml_filter_up(version) -> str:
return f"""<nc:filter xmlns:nc="{NC10_NAMESPACE}">
<software-upgrade xmlns="{SWUG_NAMESPACE}">
<upgrade-package>
<id>sw-id-{version}</id>
</upgrade-package>
</software-upgrade>
</nc:filter>"""
def current_status(reply):
status = reply.data.xpath("/nc:rpc-reply/nc:data/swug:software-upgrade/swug:upgrade-package/swug:current-status",
namespaces=NAMESPACES)
if not status:
return None
assert len(status) == 1
return status[0].text
@pytest.mark.skip(reason="too slow")
class TestSwUpgrade(BaseTest):
""" Tests basic NETCONF operations on the turing-machine YANG module. """
@pytest.mark.parametrize(
("version", "initial_state", "action", "final_state"),
[("2.0.0", "INITIALIZED", "DOWNLOAD_NE_SW", "DOWNLOAD_COMPLETED"),
("3.0.0", "INITIALIZED", "DOWNLOAD_NE_SW", "DOWNLOAD_COMPLETED")]
)
def test_edit_config(self, version, initial_state, action, final_state):
nc = nctest.open_ssh_session()
reply = nc.lock("running")
nctest.check_reply_ok(reply)
reply = nc.lock("candidate")
nctest.check_reply_ok(reply)
config_xml = xml_create_upgrade_package(version, initial_state, action)
reply = nc.edit_config(target="candidate", config=config_xml)
nctest.check_reply_ok(reply)
reply = nc.commit()
nctest.check_reply_ok(reply)
reply = nc.unlock("candidate")
nctest.check_reply_ok(reply)
reply = nc.unlock("running")
nctest.check_reply_ok(reply)
nc.close_session()
filter_xml = xml_filter_up(version)
status = None
for _ in range(2 * SWUG_TIMED_TRANSITION_TO):
time.sleep(1)
nc = nctest.open_ssh_session()
reply = nc.get_config(source="running", filter=filter_xml)
nctest.check_reply_data(reply)
status = current_status(reply)
nc.close_session()
if status == final_state:
break
assert status == final_state
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment