Created
July 17, 2018 10:56
-
-
Save siers/ded0a4158c900495f04c3ad965f4a544 to your computer and use it in GitHub Desktop.
recursive chown uid/guid mapping for migrating ownership to LDAP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# recursive chown uid/guid mapping for migrating files' owners to | |
# the new LDAP ids by matching users' duplicate names in /etc/passwd and | |
# treating the upper ones (from `getent passwd` list) as the real ones | |
# Created around January, 2017 | |
# raitis.veinbahs.lv | |
# This file may be freely used, modified, copied and repurposed. Public domain. | |
# References: | |
# 1: https://www.cyberciti.biz/faq/understanding-etcpasswd-file-format/ | |
# 2: http://stackoverflow.com/a/2295368 | |
import sys | |
import os | |
from itertools import groupby | |
from subprocess import Popen, PIPE | |
def usage(): | |
print('Usage: ./redecorate') | |
print("1. `chown`s user's home directory to his new/curernt uid.") | |
print("2. Recursively `chown` files in /var/ from old uid/guids to the new.") | |
print("NOTE: Redecorate home is meant to be tounge-in-cheek.") | |
def assert_success(proc): | |
if proc.returncode != 0: | |
print('Failed to call(%s): %s' % (proc.returncode, proc.args)) | |
print(proc.stderr.read()) | |
exit(proc.returncode) | |
def perhaps_run(*args, **kwargs): | |
proc = Popen(*args, stdout=PIPE, stderr=PIPE, **kwargs) | |
proc.wait() | |
return proc | |
def run(*args, **kwargs): | |
proc = perhaps_run(*args, **kwargs) | |
assert_success(proc) | |
return proc | |
def call(args): | |
label, lambda_ = args | |
return lambda_() | |
def exec(args): | |
proc = perhaps_run(*args) | |
sys.stderr.write(proc.stderr.read().decode('utf-8')) | |
def proc_read(proc, pipename='stdout'): | |
return getattr(proc, pipename).read().decode('utf-8') | |
# | |
def find_home(user): | |
return user[5] | |
def only_ipa(users): | |
return [user for user in users if int(user[2]) > 1000000000] | |
def parse_getent(blob): | |
getents = [line for line in blob.split('\n') if len(line) > 0] | |
return [getent.split(':') for getent in getents] | |
# | |
def walk_fullpaths_with_root(top): | |
yield top | |
for root, dirs, files in os.walk(top, topdown=False): | |
for name in files + dirs: | |
yield(os.path.join(root, name)) | |
# most important function of all | |
def redecorate_walk(top, uidmap, gidmap): | |
for name in walk_fullpaths_with_root(top): | |
try: | |
stat = os.lstat(name) | |
except FileNotFoundError: | |
continue | |
newuid = -1 | |
newgid = -1 | |
if stat.st_uid in uidmap: | |
newuid = uidmap[stat.st_uid] | |
if stat.st_gid in gidmap: | |
newgid = gidmap[stat.st_gid] | |
if newuid != -1 or newgid != -1: | |
args = [name, newuid, newgid] | |
# Reference #2 | |
yield (['os.lchown'] + args, (lambda args_: lambda: os.lchown(*args_))(args)) | |
def run_wrap(args, executer): | |
verbose = 'verbose' in sys.argv | |
really = 'really' in sys.argv | |
if verbose: | |
if type(args) == type((1,)): | |
print(args[0]) | |
else: | |
print(*args) | |
if really: | |
executer(args) | |
# usual functional primitives | |
groupsort = lambda l, key: [(k, list(g)) for k, g in groupby(sorted(l, key=key), key)] | |
mapmap = lambda f, ls: map(lambda l: map(f, l), ls) | |
# getent_service_name -> [from_getent, from_files] | |
def oldnew_tuple(service): | |
new = only_ipa(parse_getent(proc_read(run(['getent', service])))) | |
old = parse_getent(proc_read(run(['cat', '/etc/%s' % service]))) | |
return [new, old] | |
# *pline* here is a single passwd pline | |
# [pline] -> [pline] -> [(username, {old: pline, new: pline})] | |
def doubles(new, old): | |
zipidx = lambda idx, list: [[idx, x] for x in list] | |
zipusers = groupsort(zipidx('new', new) + zipidx('old', old), lambda u: u[1][0]) | |
doubles = [zipuser for zipuser in zipusers if len(zipuser[1]) == 2] | |
# make old/new keys of a dict | |
doubles = [(name, dict(entries)) for name, entries in doubles] | |
return doubles | |
# [(entry, {old: pline, new: pline})] -> [(id, id)] | |
def substitutions(doubles): | |
for user, entries in doubles: | |
yield((entries['old'][2], entries['new'][2])) | |
users = oldnew_tuple('passwd') | |
ipa_users = users[0] | |
if len(ipa_users) == 0: | |
print('No high-id users detected!') | |
exit(1) | |
users = doubles(*users) | |
groups = doubles(*oldnew_tuple('group')) | |
makemap = lambda doubles: dict(mapmap(int, list(substitutions(doubles)))) | |
uidmap = makemap(users) | |
gidmap = makemap(groups) | |
cmds = list(redecorate_walk('/var', uidmap, gidmap)) | |
cmds += list(redecorate_walk('/etc', uidmap, gidmap)) | |
[run_wrap(cmd, call) for cmd in cmds] | |
cmds = [cmd for user in ipa_users for cmd in redecorate_walk(find_home(user), uidmap, gidmap)] | |
[run_wrap(cmd, call) for cmd in cmds] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment