Skip to content

Instantly share code, notes, and snippets.

@sp3c73r2038
Last active May 6, 2022 10:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sp3c73r2038/1c9dfc42a67364db12f1d7cb636d7f16 to your computer and use it in GitHub Desktop.
Save sp3c73r2038/1c9dfc42a67364db12f1d7cb636d7f16 to your computer and use it in GitHub Desktop.
minimal api client for fortigate vpn local user
# -*- coding: utf-8 -*-
import logging
import requests
LOGGER = logging.getLogger(__name__)
def join(*args):
return '/'.join(list(map(lambda x: x.strip('/'), args)))
class FortiAPI:
"""
a simplified version of local user/group management API client
always interactive with local storage type
"""
def __init__(self, url, username, password, tlsCert=None):
self.url = url
self.username = username
self.password = password
self.session = requests.Session()
if tlsCert is not None:
LOGGER.info('>> using tls cert: %s', tlsCert)
self.session.verify = tlsCert
self.login()
def _handleResp(self, resp, prompt='request failed', expected={200,}):
# handle csrf token
for cookie in resp.cookies:
if cookie.name == 'ccsrftoken':
v = cookie.value[1:-1]
self.session.headers.update({'X-CSRFTOKEN': v})
if resp.status_code not in expected:
raise RuntimeError('{}: {}'.format(prompt, resp.text))
def _handleJSON(self, resp, default=None, single=False):
rt = resp.json().get('results', default)
if isinstance(rt, list) and single:
rt = rt[0]
return rt
def login(self):
url = join(self.url, 'logincheck')
resp = self.session.post(url, {
'username': self.username,
'secretkey': self.password,
})
self._handleResp(resp)
def getUserSchema(self):
url = join(self.url, '/api/v2/cmdb/user/local')
resp = self.session.get(url, params={'action': 'schema'})
self._handleResp(resp)
return self._handleJSON(resp)
def getUsers(self):
url = join(self.url, '/api/v2/cmdb/user/local')
resp = self.session.get(url)
self._handleResp(resp)
return self._handleJSON(resp)
def getUser(self, name):
if not name:
raise ValueError('name must not be empty')
url = join(self.url, '/api/v2/cmdb/user/local', name)
resp = self.session.get(url)
self._handleResp(resp, prompt='cannot get user')
return self._handleJSON(resp, single=True)
def getGroups(self):
url = join(self.url, '/api/v2/cmdb/user/group')
resp = self.session.get(url)
self._handleResp(resp)
return self._handleJSON(resp)
def getGroup(self, name):
if not name:
raise ValueError('name must not be empty')
url = join(self.url, '/api/v2/cmdb/user/group', name)
resp = self.session.get(url)
self._handleResp(resp, prompt='cannot get group')
return self._handleJSON(resp, single=True)
def addUserToGroup(self, user, group):
if not user or not group:
raise ValueError('user and group must not be empty')
# check user existence
self.getUser(user)
g = self.getGroup(group)
memberSet = set(map(lambda x: x['name'], g['member']))
if user in memberSet:
LOGGER.info('user %s already in group %s', user, group)
return
g['member'].append({
'name': user,
'q_origin_key': user,
})
url = join(self.url, '/api/v2/cmdb/user/group', group)
resp = self.session.put(url, json=g)
self._handleResp(resp, prompt='add user to group failed')
def removeUserFromGroup(self, user, group):
if not user or not group:
raise ValueError('user and group must not be empty')
# check user existence
self.getUser(user)
g = self.getGroup(group)
memberSet = set(map(lambda x: x['name'], g['member']))
if user not in memberSet:
LOGGER.info('user %s is already absent from group %s', user, group)
return
memberSet.remove(user)
member = list(map(lambda x: {'name': x, 'q_origin_key': x}, sorted(list(memberSet))))
g['member'] = member
url = join(self.url, '/api/v2/cmdb/user/group', group)
resp = self.session.put(url, json=g)
self._handleResp(resp, prompt='remove user from group failed')
def createUser(self, user, email, password):
if not user or not email or not password:
raise ValueError('user, email and password must not be empty')
try:
self.getUser(user)
LOGGER.info('>> user %s already exists', user)
return
except RuntimeError:
pass
url = join(self.url, '/api/v2/cmdb/user/local')
data = {
'name': user,
'email-to': email,
'passwd': password,
'type': 'password',
}
resp = self.session.post(url, json=data)
self._handleResp(resp, prompt='create user failed')
body = resp.json()
if body.get('status') != 'success':
raise RuntimeError('create user failed: {}'.format(body))
def deleteUser(self, user):
try:
self.getUser(user)
except RuntimeError:
LOGGER.info(">> user %s is already absent", user)
return
url = join(self.url, '/api/v2/cmdb/user/local', user)
resp = self.session.delete(url)
self._handleResp(resp, prompt='delete user failed')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment