Skip to content

Instantly share code, notes, and snippets.

@jsm222
Created November 26, 2018 11:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jsm222/2b7db15eaebe69111793359c9a585823 to your computer and use it in GitHub Desktop.
Save jsm222/2b7db15eaebe69111793359c9a585823 to your computer and use it in GitHub Desktop.
import yaml
import ipaddress
import requests
import argparse
from getpass import getpass
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
class NeutronSecRuleTest:
def __init__(self,test_ipaddr):
self.test_ipaddr=ipaddress.IPv4Address(test_ipaddr.decode('ascii'))
sec_group_id=None
neutron_url=None
def login(self,auth_url,username,password,project_name,domain_id):
req = {
"auth": {
"identity": {
"methods": [
"password"
],
"password": {
"user": {
"name": username,
"domain": {
"id": domain_id
},
"password": password
}
}
},
"scope": {
"project": {
"name": project_name,
"domain": {
"id": domain_id
}
}
}
}
}
rsp = requests.post("{0}/auth/tokens".format(auth_url), json=req)
if(rsp.status_code==401):
print(rsp.json())
return False
json = rsp.json()
authToken = rsp.headers["X-Subject-Token"]
self.headers={"X-Auth-Token":authToken}
for c in json["token"]["catalog"]:
if c["type"] =="network":
for ep in c["endpoints"]:
if(ep["interface"]=="public"):
self.neutron_url = ep["url"]
return True
def rule_to_heat_yaml(self,rule):
d=dict(rule["security_group_rule"])
d["security_group"]=dict(get_resource="sec_group")
d.pop("security_group_id")
return dict(type="OS::Neutron::SecurityGroupRule",properties=d)
def test_rule_factory(self,remote_ip_prefix):
return {"security_group_rule": {"direction": "egress",
"port_range_min": 80,
"ethertype": "IPv4",
"port_range_max": 80,
"protocol": "tcp",
"security_group_id": self.sec_group_id,
"remote_ip_prefix": str(remote_ip_prefix),
"description": "egressto{0}port{1}".format(str(remote_ip_prefix),"80")
}
}
def createOrFindSecGroup(self,sec_group_name):
rsp = requests.get("{0}/v2.0/security-groups?name={1}".format(self.neutron_url,sec_group_name),headers=self.headers)
sec_groups = rsp.json()["security_groups"]
if len(sec_groups)==0:
rsp1 = requests.post("{0}/v2.0/security-groups".format(self.neutron_url),headers=self.headers,json={"security_group":{"name":sec_group_name}})
self.sec_group_id = rsp1.json()["security_group"]["id"]
else:
self.sec_group_id=sec_groups[0]["id"]
return self.sec_group_id is not None
def runTest(self,amount_of_rules,sec_group_name):
rules=[]
if self.createOrFindSecGroup(sec_group_name):
i=0
while i<amount_of_rules:
rules.append(self.test_rule_factory(self.test_ipaddr+i))
i+=1
pool = ThreadPoolExecutor(amount_of_rules)
futures = [pool.submit(requests.post,"{0}/v2.0/security-group-rules".format(self.neutron_url),json=rule,headers=self.headers) for rule in rules]
results = [r.result() for r in as_completed(futures)]
for r in results:
print r
def make_heat_template(self,file_out,sec_group_name,amount_of_rules):
with open(file_out, "w") as w:
y = {}
y["heat_template_version"] = "2015-04-30"
y["resources"] = dict(sec_group=dict(type="OS::Neutron::SecurityGroup", properties=dict(
dict(description=sec_group_name, name=sec_group_name))))
i = 0
while i < amount_of_rules:
rule = self.test_rule_factory(self.test_ipaddr+i)
no = "Rule{0}".format(str(i))
i += 1
y["resources"][no] = self.rule_to_heat_yaml(rule)
w.write(yaml.dump(y))
parser = argparse.ArgumentParser(description='Test bulk creation of Neutron security group rules')
subparsers = parser.add_subparsers(help='Choose subcommand',dest="command")
parser_a = subparsers.add_parser('concurrent', help='concurrent is for posting diretcly to neutron')
parser_b = subparsers.add_parser('heat', help='heat for generating a heat template for manual testing')
parser_a.add_argument('--auth-url', dest='auth_url', action='store',
help='Your keystone auth url',required=True,default=None)
parser_a.add_argument('--username', dest='username', action='store',
help='Your keystone username',required=True)
parser_a.add_argument('--project-name', dest='project_name', action='store',
help='Your keystone project name',required=True)
parser_a.add_argument('--domain-id', dest='domain_id', action='store',
help='Your keystone projects domain id',required=True)
for p in [parser_a,parser_b]:
p.add_argument('--base-ip', dest='base_ip', action='store',
help='Base ip for security group rule defaults to 192.168.0.1',required=False, default="192.168.0.1")
p.add_argument('--security-group-name', dest='sec_group_name', action='store',
help='Your test security group name defaults to NeutronSecurityGroupRuleTest',required=False, default="NeutronSecurityGroupRuleTest")
p.add_argument('--rules-amount', dest='amount_of_rules', action='store',type=int,
help='Amount of security group rules to try to create defaults to 200',required=False, default=200)
parser_b.add_argument('--file-out', dest='file_out', action='store',
help='File name to output test heat template for manual heat test',required=True, default=None)
args = parser.parse_args()
nsrt = NeutronSecRuleTest(args.base_ip)
if args.command == "concurrent":
password = getpass("Enter your keystone password:")
if nsrt.login(args.auth_url,args.username,password,args.project_name,args.domain_id):
nsrt.runTest(args.amount_of_rules,args.sec_group_name)
else:
nsrt.make_heat_template(args.file_out,args.sec_group_name,args.amount_of_rules)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment