Skip to content

Instantly share code, notes, and snippets.

@jamesallman
Created May 16, 2019 15:35
Show Gist options
  • Save jamesallman/b7a460bac407137657876c71b4a55ed5 to your computer and use it in GitHub Desktop.
Save jamesallman/b7a460bac407137657876c71b4a55ed5 to your computer and use it in GitHub Desktop.
import sys
from pprint import pprint
import requests
class FGT(object):
"""
Base class to provide access to FGT APIs:
. Monitor API
. CMDB API
Script will start a session by login into the FGT
All subsequent calls will use the session's cookies and CSRF token
"""
def __init__(self, host, verify=False, verbose=False):
self.host = host
self.url_prefix = 'https://' + self.host
self.verify = verify
self.verbose = verbose
self.session = requests.session() # use single session for all requests
def update_csrf(self):
# Retrieve server csrf and update session's headers
for cookie in self.session.cookies:
if cookie.name == 'ccsrftoken':
csrftoken = cookie.value[1:-1] # token stored as a list
self.session.headers.update({'X-CSRFTOKEN': csrftoken})
def login(self, username, password):
url = self.url_prefix + '/logincheck'
res = self.session.post(url, data={'username': username, 'secretkey': password}, verify=self.verify)
if res.text.find('error') != -1:
# Found some error in the response, consider login failed
print 'LOGIN fail'
else:
print 'LOGIN success'
# Update session's csrftoken
self.update_csrf()
def logout(self):
url = self.url_prefix + '/logout'
res = self.session.post(url)
print 'LOGOUT'
def get(self, url_postfix, params=None, data=None):
url = self.url_prefix + url_postfix
res = self.session.get(url, params=params, data=data)
self.update_csrf() # update session's csrf
return self.check_response(res)
def post(self, url_postfix, params=None, data=None):
url = self.url_prefix + url_postfix
res = self.session.post(url, params=params, data=`data`)
self.update_csrf() # update session's csrf
return self.check_response(res)
def put(self, url_postfix, params=None, data=None):
url = self.url_prefix + url_postfix
res = self.session.put(url, params=params, data=`data`)
self.update_csrf() # update session's csrf
return self.check_response(res)
def delete(self, url_postfix, params=None, data=None):
url = self.url_prefix + url_postfix
res = self.session.delete(url, params=params, data=`data`)
self.update_csrf() # update session's csrf
return self.check_response(res)
def check_response(self, response):
if self.verbose:
print '{0} {1}'.format(response.request.method,
response.request.url)
# Check response status, content and compare with original request
if response.status_code == 200:
# Success code, now check json response
try:
# Retrieve json data
res = response.json()
except:
if self.verbose:
print 'Fail invalid JSON response'
print response.text
raise
else:
# Check if json data is empty
if not res:
if self.verbose:
print "JSON data is empty"
print response.text
raise RuntimeError
# Check status
if 'status' in res:
if res['status'] != 'success':
if self.verbose:
print 'JSON error {0}\n{1}'.format(res['error'], res)
raise RuntimeError
# Check http_status if any
if 'http_status' in res:
if res['http_status'] != 200:
if self.verbose:
print 'JSON error {0}\n{1}'.format(res['error'], res)
raise RuntimeError
# Check http method
if 'http_method' in res:
if res['http_method'] != response.request.method:
if self.verbose:
print 'Incorrect METHOD request {0},\
response {1}'.format(response.request.method,
res['http_method'])
raise RuntimeError
# Check results
if 'results' in res:
# print res['results']
if not res['results']:
if self.verbose:
print 'Results is empty'
raise RuntimeError
# Check vdom
# Check path
# Check name
# Check action
# All pass
if self.verbose:
print 'Succeed with status: {0}'.format(response.status_code)
pprint(res)
else:
try:
# Retrieve json data
res = response.json()
except:
if self.verbose:
print 'Fail with status: {0}'.format(response.status_code)
raise
else:
if self.verbose:
print 'Fail with status: {0}'.format(response.status_code)
# print response.json()
finally:
if self.verbose:
print response.text
return res
if __name__ == '__main__':
ip = '192.168.0.100'
username = 'admin'
password = 'fortinet'
vdom = 'root'
fgt = FGT(ip)
try:
fgt.login(username, password)
res = fgt.put('/api/v2/cmdb/wireless-controller/vap/TEST-WIFI', data={
'passphrase': 'password'
})
pprint(res)
except:
print sys.exc_info()[0]
finally:
fgt.logout()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment