Skip to content

Instantly share code, notes, and snippets.

@kongou-ae
Created April 4, 2014 15:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kongou-ae/9976453 to your computer and use it in GitHub Desktop.
Save kongou-ae/9976453 to your computer and use it in GitHub Desktop.
BaseTutorial
cisco@onepk:~/study-onepk$ cat ../onePK-sdk-1.2.0.173/python/tutorials/BaseTutorial.py
#!/usr/bin/env python
# Copyright (c) 2010-2013 by Cisco Systems, Inc.
#
# THIS SAMPLE CODE IS PROVIDED "AS IS" WITHOUT ANY EXPRESS OR IMPLIED WARRANTY
# BY CISCO SOLELY FOR THE PURPOSE of PROVIDING PROGRAMMING EXAMPLES.
# CISCO SHALL NOT BE HELD LIABLE FOR ANY USE OF THE SAMPLE CODE IN ANY
# APPLICATION.
#
# Redistribution and use of the sample code, with or without
# modification, are permitted provided that the following conditions
# are met:
# Redistributions of source code must retain the above disclaimer.
"""
The goal of the BaseTutorial is to show how to create an application and connect to a network element to obtain a
session handle. The BaseTutorial provides base class functions for reading test properties, parsing command line
properties, and network element connectivity.
@author The onePK Team (onepk-feedback@cisco.com)
"""
from onep.element.NetworkApplication import NetworkApplication
from onep.interfaces import InterfaceStatus
from onep.element import SessionConfig
from onep.interfaces import NetworkInterface
import getopt
import sys
import getpass
import onep.interfaces.InterfaceFilter
import os
import socket
from onep.core.util import tlspinning
# START SNIPPET: createLogger
import logging
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger('onep:BaseTutorial')
logger.setLevel(logging.INFO)
# END SNIPPET: createLogger
class BaseTutorial(object):
# START SNIPPET: variables
element_hostname = None
username = None
password = None
transport = "tls"
network_element = None
session_handle = None
root_cert_path = None
client_cert_path = None
client_key_path = None
key_passphrase = None
tls_pinning_file = None
args = None
# END SNIPPET: variables
def __init__(self, args):
"""
Instantiates the Logger instance using the classname of the implementing class.
"""
self.args = args
def connect(self, applicationName):
"""
Obtains a NetworkApplication instance, sets the name to applicationName,
gets a network element for the hostname or address in the command line
arguments or tutorial.properties file and then tries to connect to the
Network Element with the username and password supplied, or from the
tutorial.properties file.
@param applicationName: The NetworkApplication name is set to this value.
@return True if the connection succeeded without exception, else false.
@throws OnepException
"""
# START SNIPPET: init_myapp
network_application = NetworkApplication.get_instance()
# END SNIPPET: init_myapp
# START SNIPPET: name_myapp
network_application.name = applicationName
# END SNIPPET: name_myapp
# START SNIPPET: get_ne_opt1
self.network_element = network_application.get_network_element(self.element_hostname)
# END SNIPPET: get_ne_opt1
if self.network_element == None:
logger.error("Failed to get network element")
sys.exit(1)
logger.info("We have a NetworkElement : " + self.network_element.__str__())
# START SNIPPET: connect
session_config = SessionConfig(SessionConfig.SessionTransportMode.TLS) #default is TLS
if self.transport.lower() == "tipc" or self.transport == "2":
session_config = SessionConfig(SessionConfig.SessionTransportMode.TIPC)
#Set all the TLS properties in session_config
session_config.ca_certs = self.root_cert_path
session_config.keyfile = self.client_key_path
session_config.certfile = self.client_cert_path
session_config.set_tls_pinning(self.tls_pinning_file, PinningHandler(self.tls_pinning_file))
self.session_handle = self.network_element.connect(self.username, self.password, session_config)
# END SNIPPET: connect
if self.session_handle == None:
# START SNIPPET: loggerError
logger.error("Failed to connect to NetworkElement - ")
# END SNIPPET: loggerError
return False
logger.info("Successful connection to NetworkElement - " )
return True
def disconnect(self):
"""
Disconnects the application from the Network Element.
@return True if the disconnect succeeded without an exception, else false if the application failed to disconnect
from the Network Element.
"""
try:
if self.network_element.is_connected():
self.network_element.disconnect()
except Exception, e:
logger.error("Failed to disconnect from Network Element")
logger.error(e)
return False
return True
def get_element_hostname(self):
"""
Gets the IP address or hostname of the Network Element we are interested in.
@return The element_hostname.
"""
return self.element_hostname
# START SNIPPET: getLogger
def get_logger(self):
"""
Implements the logger, which sends all enabled log messages.
@return The logger.
"""
return self.logger
# END SNIPPET: getLogger
def get_network_element(self):
"""
Gets the NetworkElement that is being connected to by the application.
@return The NetworkElement that is being connected to by the application.
"""
return self.network_element
def get_session_handle(self):
"""
Gets the session_handle that identifies the connection to this Network Element.
@return The session_handle that identifies the connection to this NetworkElement.
"""
return self.session_handle
def get_username(self):
"""
Gets the username on whose behalf the connection will be made. The username is specified via the command line or
in the tutorial.properties file.
@return The username that is specified via the command line or in the tutorial.properties file.
"""
return self.username
def get_password(self):
"""
Gets the password with which connection will be made. The password is specified via the command line or
in the tutorial.properties file.
@return The password that is specified via the command line or in the tutorial.properties file.
"""
return self.password
def get_transport(self):
"""
Gets the transport mode with which connection will be made. The transport is specified via the command line or
in the tutorial.properties file.
@return The transport that is specified via the command line or in the tutorial.properties file.
"""
return self.transport
def get_usage(self):
return " Usage: -a <address or FQDN> [-t <transport>] [-C <client cert file>] \n [-K <client private key file>] \n [-R <root certificates file>] \n [-P <tls pinning file>]"
def parse_command_line(self, skip_login_for_tipc=True):
"""
Parse the command line options.
@param args The args string passed into the main(...) method.
@return true if parsing the command line succeeds, false otherwise.
"""
try:
opts, args = getopt.getopt(self.args[1:],"ha:t:R:C:K:P:",["hostname=","transport=", "rootcert=", "clientcert=", "key=", "tlspinningfile="])
except getopt.GetoptError as err:
print str(err)
logger.info(self.get_usage())
sys.exit(2)
"""
* options:
* -a, --hostname <network element address or FQDN>
* -t, --transport <transport type> default is tls
* -C, --clientcert <client certificate file>
* -K, --clientkey <client private key file>
* -R, --rootcert <root certificates file>
* -P, --pinfile <tls pinning file>
"""
for option, arg in opts:
if option == '-h':
logger.info(self.get_usage())
sys.exit()
elif option in ("-a", "--hostname"):
self.element_hostname = arg
elif option in ("-t", "--transport"):
self.transport = arg
elif option in ("-R", "--rootcert"):
self.root_cert_path = arg
elif option in ("-C", "--clientcert"):
self.client_cert_path = arg
elif option in ("-K", "--key"):
self.client_key_path = arg
elif option in ("-P", "--pinfile"):
self.tls_pinning_file = arg
if self.element_hostname == None:
logger.error(self.get_usage())
return False
if self.transport != "tipc" or not skip_login_for_tipc:
self.username = raw_input('Enter Username : ')
self.password = getpass.getpass('Enter Password : ')
if self.username == None or self.password == None:
logger.error("Username and password are required.")
return False
return True
def set_element_hostname(self, element_hostname):
"""
"""
self.element_hostname = element_hostname
def set_logger(self, logger):
"""
"""
self.logger = logger
def set_network_element(self, network_element):
"""
"""
self.network_element = network_element
def set_password(self, password):
"""
"""
self.password = password
def set_session_handle(self, session_handle):
"""
"""
self.session_handle = session_handle
def set_username(self, username):
"""
"""
self.username = username
def get_all_interfaces(self):
"""
"""
interfaceList = None
try:
interfaceList = self.network_element.get_interface_list(onep.interfaces.InterfaceFilter())
except Exception, e:
logger.error(e)
return interfaceList
def get_an_interface(self):
"""
Get an interface which has an IP addr set and which is not the interface that the application is connected to
"""
interfaceList = None
networkInterface = None
skipFirst = False
try:
interfaceList = self.network_element.get_interface_list(onep.interfaces.InterfaceFilter())
elemAddress = socket.gethostbyname(self.network_element.host_address)
for networkInterface in interfaceList:
if networkInterface.interface_type != NetworkInterface.InterfaceTypes.ONEP_IF_TYPE_TUNNEL:
addresses = networkInterface.get_address_list()
if addresses and elemAddress not in addresses:
if skipFirst:
return networkInterface
else:
skipFirst = True
except Exception, e:
logger.error(e)
return None
def get_an_up_interface(self):
"""
Get an interface which has its LINK status as UP
"""
interfaceList = None
networkInterface = None
try:
interfaceList = self.network_element.get_interface_list(onep.interfaces.InterfaceFilter())
for networkInterface in interfaceList:
if networkInterface.get_status().link == InterfaceStatus.InterfaceState.ONEP_IF_STATE_OPER_UP:
return networkInterface
except Exception, e:
logger.error(e)
return None
class PinningHandler(tlspinning.TLSUnverifiedElementHandler):
def __init__(self, pinning_file):
self.pinning_file = pinning_file
def handle_verify(self, host,hashtype, finger_print, changed):
"""
Callback to the app to determine whether to add a host to pinning DB
Upon receipt of a certificate which fails to match based on server-name or
IP address, and for which there is no match in the pinning database,
this callback asks the application whether to accept the
connection and/or whether to add the server to the pinning database.
By default, the connection will be terminated and the pinning db will
remain unchanged.
"""
if changed:
msg = "\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
msg +="WARNING: THE CERTIFICATE PRESENTED BY REMOTE HOST '%s'\n IS DIFFERENT FROM THE ONE PREVIOUSLY ACCEPTED" %(host)
msg +="\n@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@"
else:
msg = "WARNING: Certificate presented by remote host '%s' is not verified."%(host)
msg += "\n\nThe %s fingerprint sent by the remote host(%s) is:\n%s" %(hashtype, host, finger_print)
msg += "\n\nYou MUST verify the certificate on remote host before proceeding! \n"
msg += "\nChoose from following options:"
if self.pinning_file:
prompt = "\nAccept and Pin (p), Accept Once (o), Reject (r) (default) :"
else:
prompt = "\nAccept Once (o), Reject (r) (default) :"
sys.stdout.write(msg)
self.decision = raw_input(prompt)
while True:
if not self.decision or self.decision.lower() == 'r':
return tlspinning.DecisionType.REJECT
elif self.decision.lower() == 'p' and self.pinning_file:
return tlspinning.DecisionType.ACCEPT_AND_PIN
elif self.decision.lower() == 'o':
return tlspinning.DecisionType.ACCEPT_ONCE
else:
self.decision = raw_input(prompt)
if __name__ == '__main__':
tutorial = BaseTutorial(sys.argv)
logger.info("Reading arguments...")
if not tutorial.parse_command_line():
logger.error("Error in parsing arguments")
sys.exit(1)
logger.info("Connecting to Network Element...")
#try:
tutorial.connect("BaseTutorial")
#except Exception, e:
#logger.error("Error in connecting to network element - %s", e)
#logger.error(e)
tutorial.disconnect()
logger.info("Done.")
cisco@onepk:~/study-onepk$
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment