Created
November 26, 2018 11:49
-
-
Save jsm222/2b7db15eaebe69111793359c9a585823 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import yaml | |
import ipaddress | |
import requests | |
import argparse | |
from getpass import getpass | |
from concurrent.futures import ThreadPoolExecutor, wait, as_completed | |
class NeutronSecRuleTest: | |
def __init__(self,test_ipaddr): | |
self.test_ipaddr=ipaddress.IPv4Address(test_ipaddr.decode('ascii')) | |
sec_group_id=None | |
neutron_url=None | |
def login(self,auth_url,username,password,project_name,domain_id): | |
req = { | |
"auth": { | |
"identity": { | |
"methods": [ | |
"password" | |
], | |
"password": { | |
"user": { | |
"name": username, | |
"domain": { | |
"id": domain_id | |
}, | |
"password": password | |
} | |
} | |
}, | |
"scope": { | |
"project": { | |
"name": project_name, | |
"domain": { | |
"id": domain_id | |
} | |
} | |
} | |
} | |
} | |
rsp = requests.post("{0}/auth/tokens".format(auth_url), json=req) | |
if(rsp.status_code==401): | |
print(rsp.json()) | |
return False | |
json = rsp.json() | |
authToken = rsp.headers["X-Subject-Token"] | |
self.headers={"X-Auth-Token":authToken} | |
for c in json["token"]["catalog"]: | |
if c["type"] =="network": | |
for ep in c["endpoints"]: | |
if(ep["interface"]=="public"): | |
self.neutron_url = ep["url"] | |
return True | |
def rule_to_heat_yaml(self,rule): | |
d=dict(rule["security_group_rule"]) | |
d["security_group"]=dict(get_resource="sec_group") | |
d.pop("security_group_id") | |
return dict(type="OS::Neutron::SecurityGroupRule",properties=d) | |
def test_rule_factory(self,remote_ip_prefix): | |
return {"security_group_rule": {"direction": "egress", | |
"port_range_min": 80, | |
"ethertype": "IPv4", | |
"port_range_max": 80, | |
"protocol": "tcp", | |
"security_group_id": self.sec_group_id, | |
"remote_ip_prefix": str(remote_ip_prefix), | |
"description": "egressto{0}port{1}".format(str(remote_ip_prefix),"80") | |
} | |
} | |
def createOrFindSecGroup(self,sec_group_name): | |
rsp = requests.get("{0}/v2.0/security-groups?name={1}".format(self.neutron_url,sec_group_name),headers=self.headers) | |
sec_groups = rsp.json()["security_groups"] | |
if len(sec_groups)==0: | |
rsp1 = requests.post("{0}/v2.0/security-groups".format(self.neutron_url),headers=self.headers,json={"security_group":{"name":sec_group_name}}) | |
self.sec_group_id = rsp1.json()["security_group"]["id"] | |
else: | |
self.sec_group_id=sec_groups[0]["id"] | |
return self.sec_group_id is not None | |
def runTest(self,amount_of_rules,sec_group_name): | |
rules=[] | |
if self.createOrFindSecGroup(sec_group_name): | |
i=0 | |
while i<amount_of_rules: | |
rules.append(self.test_rule_factory(self.test_ipaddr+i)) | |
i+=1 | |
pool = ThreadPoolExecutor(amount_of_rules) | |
futures = [pool.submit(requests.post,"{0}/v2.0/security-group-rules".format(self.neutron_url),json=rule,headers=self.headers) for rule in rules] | |
results = [r.result() for r in as_completed(futures)] | |
for r in results: | |
print r | |
def make_heat_template(self,file_out,sec_group_name,amount_of_rules): | |
with open(file_out, "w") as w: | |
y = {} | |
y["heat_template_version"] = "2015-04-30" | |
y["resources"] = dict(sec_group=dict(type="OS::Neutron::SecurityGroup", properties=dict( | |
dict(description=sec_group_name, name=sec_group_name)))) | |
i = 0 | |
while i < amount_of_rules: | |
rule = self.test_rule_factory(self.test_ipaddr+i) | |
no = "Rule{0}".format(str(i)) | |
i += 1 | |
y["resources"][no] = self.rule_to_heat_yaml(rule) | |
w.write(yaml.dump(y)) | |
parser = argparse.ArgumentParser(description='Test bulk creation of Neutron security group rules') | |
subparsers = parser.add_subparsers(help='Choose subcommand',dest="command") | |
parser_a = subparsers.add_parser('concurrent', help='concurrent is for posting diretcly to neutron') | |
parser_b = subparsers.add_parser('heat', help='heat for generating a heat template for manual testing') | |
parser_a.add_argument('--auth-url', dest='auth_url', action='store', | |
help='Your keystone auth url',required=True,default=None) | |
parser_a.add_argument('--username', dest='username', action='store', | |
help='Your keystone username',required=True) | |
parser_a.add_argument('--project-name', dest='project_name', action='store', | |
help='Your keystone project name',required=True) | |
parser_a.add_argument('--domain-id', dest='domain_id', action='store', | |
help='Your keystone projects domain id',required=True) | |
for p in [parser_a,parser_b]: | |
p.add_argument('--base-ip', dest='base_ip', action='store', | |
help='Base ip for security group rule defaults to 192.168.0.1',required=False, default="192.168.0.1") | |
p.add_argument('--security-group-name', dest='sec_group_name', action='store', | |
help='Your test security group name defaults to NeutronSecurityGroupRuleTest',required=False, default="NeutronSecurityGroupRuleTest") | |
p.add_argument('--rules-amount', dest='amount_of_rules', action='store',type=int, | |
help='Amount of security group rules to try to create defaults to 200',required=False, default=200) | |
parser_b.add_argument('--file-out', dest='file_out', action='store', | |
help='File name to output test heat template for manual heat test',required=True, default=None) | |
args = parser.parse_args() | |
nsrt = NeutronSecRuleTest(args.base_ip) | |
if args.command == "concurrent": | |
password = getpass("Enter your keystone password:") | |
if nsrt.login(args.auth_url,args.username,password,args.project_name,args.domain_id): | |
nsrt.runTest(args.amount_of_rules,args.sec_group_name) | |
else: | |
nsrt.make_heat_template(args.file_out,args.sec_group_name,args.amount_of_rules) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment