Skip to content

Instantly share code, notes, and snippets.

@siers
Created July 17, 2018 10:56
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save siers/ded0a4158c900495f04c3ad965f4a544 to your computer and use it in GitHub Desktop.
Save siers/ded0a4158c900495f04c3ad965f4a544 to your computer and use it in GitHub Desktop.
recursive chown uid/guid mapping for migrating ownership to LDAP
#!/usr/bin/env python
# recursive chown uid/guid mapping for migrating files' owners to
# the new LDAP ids by matching users' duplicate names in /etc/passwd and
# treating the upper ones (from `getent passwd` list) as the real ones
# Created around January, 2017
# raitis.veinbahs.lv
# This file may be freely used, modified, copied and repurposed. Public domain.
# References:
# 1: https://www.cyberciti.biz/faq/understanding-etcpasswd-file-format/
# 2: http://stackoverflow.com/a/2295368
import sys
import os
from itertools import groupby
from subprocess import Popen, PIPE
def usage():
print('Usage: ./redecorate')
print("1. `chown`s user's home directory to his new/curernt uid.")
print("2. Recursively `chown` files in /var/ from old uid/guids to the new.")
print("NOTE: Redecorate home is meant to be tounge-in-cheek.")
def assert_success(proc):
if proc.returncode != 0:
print('Failed to call(%s): %s' % (proc.returncode, proc.args))
print(proc.stderr.read())
exit(proc.returncode)
def perhaps_run(*args, **kwargs):
proc = Popen(*args, stdout=PIPE, stderr=PIPE, **kwargs)
proc.wait()
return proc
def run(*args, **kwargs):
proc = perhaps_run(*args, **kwargs)
assert_success(proc)
return proc
def call(args):
label, lambda_ = args
return lambda_()
def exec(args):
proc = perhaps_run(*args)
sys.stderr.write(proc.stderr.read().decode('utf-8'))
def proc_read(proc, pipename='stdout'):
return getattr(proc, pipename).read().decode('utf-8')
#
def find_home(user):
return user[5]
def only_ipa(users):
return [user for user in users if int(user[2]) > 1000000000]
def parse_getent(blob):
getents = [line for line in blob.split('\n') if len(line) > 0]
return [getent.split(':') for getent in getents]
#
def walk_fullpaths_with_root(top):
yield top
for root, dirs, files in os.walk(top, topdown=False):
for name in files + dirs:
yield(os.path.join(root, name))
# most important function of all
def redecorate_walk(top, uidmap, gidmap):
for name in walk_fullpaths_with_root(top):
try:
stat = os.lstat(name)
except FileNotFoundError:
continue
newuid = -1
newgid = -1
if stat.st_uid in uidmap:
newuid = uidmap[stat.st_uid]
if stat.st_gid in gidmap:
newgid = gidmap[stat.st_gid]
if newuid != -1 or newgid != -1:
args = [name, newuid, newgid]
# Reference #2
yield (['os.lchown'] + args, (lambda args_: lambda: os.lchown(*args_))(args))
def run_wrap(args, executer):
verbose = 'verbose' in sys.argv
really = 'really' in sys.argv
if verbose:
if type(args) == type((1,)):
print(args[0])
else:
print(*args)
if really:
executer(args)
# usual functional primitives
groupsort = lambda l, key: [(k, list(g)) for k, g in groupby(sorted(l, key=key), key)]
mapmap = lambda f, ls: map(lambda l: map(f, l), ls)
# getent_service_name -> [from_getent, from_files]
def oldnew_tuple(service):
new = only_ipa(parse_getent(proc_read(run(['getent', service]))))
old = parse_getent(proc_read(run(['cat', '/etc/%s' % service])))
return [new, old]
# *pline* here is a single passwd pline
# [pline] -> [pline] -> [(username, {old: pline, new: pline})]
def doubles(new, old):
zipidx = lambda idx, list: [[idx, x] for x in list]
zipusers = groupsort(zipidx('new', new) + zipidx('old', old), lambda u: u[1][0])
doubles = [zipuser for zipuser in zipusers if len(zipuser[1]) == 2]
# make old/new keys of a dict
doubles = [(name, dict(entries)) for name, entries in doubles]
return doubles
# [(entry, {old: pline, new: pline})] -> [(id, id)]
def substitutions(doubles):
for user, entries in doubles:
yield((entries['old'][2], entries['new'][2]))
users = oldnew_tuple('passwd')
ipa_users = users[0]
if len(ipa_users) == 0:
print('No high-id users detected!')
exit(1)
users = doubles(*users)
groups = doubles(*oldnew_tuple('group'))
makemap = lambda doubles: dict(mapmap(int, list(substitutions(doubles))))
uidmap = makemap(users)
gidmap = makemap(groups)
cmds = list(redecorate_walk('/var', uidmap, gidmap))
cmds += list(redecorate_walk('/etc', uidmap, gidmap))
[run_wrap(cmd, call) for cmd in cmds]
cmds = [cmd for user in ipa_users for cmd in redecorate_walk(find_home(user), uidmap, gidmap)]
[run_wrap(cmd, call) for cmd in cmds]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment