Skip to content

Instantly share code, notes, and snippets.

@perfecto25
Last active August 31, 2023 05:45
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 6 You must be signed in to fork a gist
  • Save perfecto25/f7a682cb5dca17bdd9c0bd5aa6394bd1 to your computer and use it in GitHub Desktop.
Save perfecto25/f7a682cb5dca17bdd9c0bd5aa6394bd1 to your computer and use it in GitHub Desktop.
saltstack sudo module
# Custom Execution Module - SUDO ACCESS
# tested for Centos 7
import salt
import time
import os
import logging
import re
from datetime import datetime, timedelta
import subprocess
from salt.exceptions import CommandExecutionError
log = logging.getLogger(__name__)
smtp_server = 'mail.company.com' #__opts__['master']
notify = 'admin@company.com' # enable email notification, set to False to disable email
sudo_dir = '/etc/sudoers.d'
sudo_file = sudo_dir + '/salt_sudo'
# get path to salt-call command
salt_call = ''.join(subprocess.check_output(['which', 'salt-call'])).split()[0]
def _clean_sudo_file(filename):
'''
remove blanks and duplicates from sudo_file
'''
with open(filename) as handle:
lines = handle.readlines()
log.warning(lines)
with open(filename, 'w') as handle:
lines = list(set(lines)) # remove duplicates
lines = filter(lambda x: x.strip(), lines)
handle.writelines(lines)
def _notify(username, whoran, target, comment, return_msg):
'''
send email to sysadmins when 'sudo.grant' is executed
'''
# get full name of requested user
fullname = __salt__['user.info'](username)['fullname']
if not fullname:
fullname = username
if notify:
try:
linebr = '-' * 50
__salt__['smtp.send_msg']('{0}'.format(notify), \
'ALERT {timestamp}: \n\n{whoran} has ran "salt {target} sudo.grant {username}" to request sudo access for: {fullname}\n\nComment: {comment}\n{linebr}\n{msg}'
.format(whoran=whoran, target=target, username=username, fullname=fullname, timestamp=time.strftime("%Y-%m-%d %H:%M"), comment=comment, msg=return_msg, linebr=linebr), \
'User Sudoers access requested', 'saltstack-alert@{0}'.format(target), smtp_server, False)
except Exception as e:
log.exception('Error emailing salt-sudo status: %s' % str(e))
else:
log.warning('Email notifications are disabled for salt-sudo')
def _check_span_input(span):
'''
checks span input for proper format
'''
ret = {}
span = re.sub("[^a-zA-Z0-9]+", "", str(span).lower())
# get number of days or hours
try:
span_num = int(re.findall(r'\d+', span)[0])
except IndexError:
raise CommandExecutionError("incorrect span input, use day or hour format, ie: span=1d")
except Exception as e:
log.exception("error processing span input: %s" % (str(e)))
raise CommandExecutionError("error processing span input: %s" % (str(e)))
# get hour or day
span_hd = re.findall(r'\D', span)
try:
hd = str(span_hd[0])
except IndexError:
raise CommandExecutionError("incorrect span input, use day or hour format, ie: span=1d")
if not span[0].isdigit():
raise CommandExecutionError("span must start with a digit for # of hours or days, ie: span=1d")
if not span[-1].isalpha() or len(span_hd) > 1:
raise CommandExecutionError("span must end with 'h' or 'd', ie: span=1d")
if not any( [hd.endswith('h'), hd.endswith('d')] ):
raise CommandExecutionError("span must end with 'h' or 'd', ie: span=1d or span=5h")
if hd == 'h' and any([span_num <= 0, span_num > 24]):
raise CommandExecutionError("span in hours must be between 1h and 24h")
if hd == 'd' and any([span_num <= 0, span_num > 365]):
raise CommandExecutionError("span in days must be between 1d and 365d")
if not any([hd, span_num]):
raise CommandExecutionError("incorrect span input, use day or hour format, ie: span=1d")
ret['hd'] = hd
ret['span_num'] = span_num
return ret
def _create_span_cron(hd, span_num, username):
'''
sets a cron to auto revoke sudo after a span timelimit
'''
ret = {}
# add a cron to revoke sudo, but also to delete the cron itself after it runs
cmd = '{salt_call} file.line "{sudo_file}" match="^{username}" mode="delete"; {salt_call} cron.rm_job "root" {salt_call} identifier="salt_sudo_{username}"'.format(
salt_call=salt_call,
sudo_file=sudo_file,
username=username)
if hd == 'h':
sched = datetime.now() + timedelta(hours=span_num)
try:
__salt__['cron.set_job']('root', sched.minute, sched.hour, '*', '*', '*', cmd, identifier='salt_sudo_{}'.format(username))
except Exception as e:
raise CommandExecutionError("Error scheduling revocation cron, %s" % str(e))
else:
sched = datetime.now() + timedelta(days=span_num)
try:
__salt__['cron.set_job']('root', sched.minute, sched.hour, sched.day, sched.month, '*', cmd, identifier='salt_sudo_{}'.format(username))
except Exception as e:
raise CommandExecutionError("Error scheduling revocation cron, %s" % str(e))
def grant(username, comment, span=None, **kwargs):
'''
'sudo.grant' - opens up Sudoers access for specified user \n
usage: salt <target> sudo.grant <username> <comment>
> salt nycweb01 sudo.grant jsmith 'fixing disk space'
to limit span of sudo access, you can pass span=<lifetime> parameter (# of days or hours)
> salt nycweb01 sudo.grant jsmith 'fixing disk space' span=1d (or 1h)
'''
target = kwargs['__pub_tgt']
# get username of user who ran 'sudo.grant'
whoran = kwargs['__pub_user']
# set final output message to user
return_msg = "User '{0}' has been added to Sudoers group, they can now run root-level commands using 'sudo cmd'\n\nSSH as '{0}@{1}' \
\n\nTo revoke sudo access, run 'salt {1} sudo.revoke {0}'".format(username, target)
# span of sudo access (hours or days)
if span:
ret = _check_span_input(span)
if ret['hd'] == 'h':
hd = 'hours'
else:
hd = 'days'
# create a cron to revoke sudo
cron = _create_span_cron(ret['hd'], ret['span_num'], username)
return_msg = return_msg + '\n\n* {0} has been given sudo access for {1} {2}'.format(username, ret['span_num'], hd)
# check if user exists on target
if not __salt__['user.info'](username):
return "user account '%s' does not exist on %s." % (username, target)
# check if sudoers.d dir exists,
if not os.path.isdir(sudo_dir):
os.mkdir(sudo_dir)
if not os.path.exists(sudo_file):
os.mknod(sudo_file)
# add user to sudoers
try:
__salt__['file.line'](sudo_file, '{0} ALL=(ALL) NOPASSWD:ALL\n'.format(username), match='^{0}'.format(username), mode='insert', location='end')
except Exception as e:
log.exception("Error adding %s: %s" % (sudo_file, str(e)))
raise CommandExecutionError("error adding user to saltsudo: %s" % str(e))
_clean_sudo_file(sudo_file)
_notify(username, whoran, target, comment, return_msg)
log.warning('user "{0}" running sudo.grant for {1}'.format(whoran, username))
return return_msg
def revoke(user=None):
'''
sudo.revoke - removes a user from temporary sudo group, disables sudoers access
usage: salt <target> sudo.revoke joe # removes a user from 'saltsudo' group
usage: salt <target> sudo.revoke # removes all 'saltsudo' user access on the host
'''
if user:
if os.path.exists(sudo_file):
# remove user from sudo_file
try:
__salt__['file.line'](sudo_file, match='^{0}'.format(user), mode='delete')
except Exception as e:
log.exception("Error revoking sudo access for user %s: %s" % (user, str(e)))
raise CommandExecutionError("error revoking sudo access for user %s: %s" % (user, str(e)))
# remove user from crontab
__salt__['cron.rm_job']('root', salt_call, identifier='salt_sudo_{}'.format(user))
return "user %s has been removed from temporary sudo" % user
else:
__salt__['cron.rm_job']('root', salt_call, identifier='salt_sudo_{}'.format(user))
return "user %s has already been removed from sudo access" % user
else:
try:
# truncate sudo_file to zero
f = open(sudo_file, 'w')
f.close()
except Exception:
pass
# remove all sudo crons
crons = __salt__['cron.list_tab']('root')
for cron in crons['crons']:
if 'salt_sudo' in cron['identifier']:
__salt__['cron.rm_job']('root', salt_call, identifier='salt_sudo_{}'.format(user))
return "all temporary sudo access has been removed."
def status():
'''
returns all users with temporary sudo access
salt <target> sudo.status
'''
if not os.path.exists(sudo_file):
return "no temporary sudo access available."
else:
tempusers = []
with open(sudo_file) as f:
lines = f.read().splitlines()
for line in lines:
tempusers.append(line.split()[0])
tempusers = filter(None, tempusers) # remove blank values
tempusers = list(set(tempusers)) # remove duplicates
return "the following users have temporary sudo access: %s" % tempusers
@umair1983
Copy link

Hi

How deploy this script in SUSE Manager 4

Thanks

@perfecto25
Copy link
Author

I played with this some and when I first tested after loading the module I received the below error. After some playing with the script (I'm not a python expert) I found that the line checking for salt-call path was failing. Small modification with quotes and it's now working for me.

# python3 /var/cache/salt/minion/extmods/modules/sudo.py Traceback (most recent call last): File "/var/cache/salt/minion/extmods/modules/sudo.py", line 22, in <module> salt_call = ''.join(subprocess.check_output(['which', 'salt-call'])).split()[0] TypeError: sequence item 0: expected str instance, int found
After modifying quotes on 'salt-call = ' variable I was able to move forward with testing the script.
salt_call = ''.join(subprocess.check_output(['which', 'salt-call'])).split()[0]
to
salt_call = ".join(subprocess.check_output(['which', 'salt-call'])).split()[0]"

Yes, this wasnt tested for Py3

is it working now with py3?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment