Last active
September 18, 2015 18:32
-
-
Save tritium21/c783f4aa786965d8f3b8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding:utf-8 -*- | |
import argparse | |
import collections | |
import datetime | |
import operator | |
import os.path | |
import struct | |
import sys | |
# #define UT_LINESIZE 32 | |
# #define UT_HOSTSIZE 256 | |
# | |
# struct lastlog | |
# { | |
# #if __WORDSIZE == 64 && defined __WORDSIZE_COMPAT32 | |
# int32_t ll_time; | |
# #else | |
# __time_t ll_time; | |
# #endif | |
# char ll_line[UT_LINESIZE]; | |
# char ll_host[UT_HOSTSIZE]; | |
# }; | |
LLSTRUCT = struct.Struct('=l32s256s') | |
LoginDefs = collections.namedtuple( | |
"LoginDefs", | |
'uid_min uid_max' | |
) | |
User = collections.namedtuple( | |
"User", | |
'user passwd uid gid gecos home shell' | |
) | |
LLRecord = collections.namedtuple( | |
"LLRecord", | |
'time line host' | |
) | |
Result = collections.namedtuple( | |
"Result", | |
'user ll' | |
) | |
def read_login_defs(fname='/etc/login.defs'): | |
if not os.path.exists(fname): | |
return | |
fields = [x.upper() for x in LoginDefs._fields] | |
d = dict.fromkeys(LoginDefs._fields) | |
with open(fname) as f: | |
for line in f: | |
for field in fields: | |
if line.startswith(field): | |
d[field.lower()] = int(line.split()[1].strip()) | |
return LoginDefs(**d) | |
def read_passwd(fname='/etc/passwd'): | |
with open(fname) as f: | |
_users = (User(*s.strip().split(':')) for s in f) | |
_users = [x._replace(uid=int(x.uid)) for x in _users] | |
return _users | |
def dictify(iterable, key=None): | |
key = key if key is not None else operator.attrgetter('uid') | |
return {key(x): x for x in iterable} | |
def map_user_uid(iterable): | |
uid = operator.attrgetter('uid') | |
user = operator.attrgetter('user') | |
return {user(x): uid(x) for x in iterable} | |
def parse_lastlog_record(data): | |
time, line, host = data | |
time = datetime.datetime.fromtimestamp(time) if time else None | |
line = line.strip('\x00') | |
host = host.strip('\x00') | |
return LLRecord(time, line, host) | |
def get_lastlog_record(fp, uid): | |
seekto = LLSTRUCT.size * uid | |
fp.seek(seekto, 0) | |
bytes = fp.read(LLSTRUCT.size) | |
if not bytes: | |
return | |
data = LLSTRUCT.unpack(bytes) | |
return parse_lastlog_record(data) | |
def days_ago(days): | |
delta = datetime.timedelta(days=days) | |
now = datetime.datetime.now() | |
dt = now - delta | |
return dt.date() | |
def parse_lastlog( | |
passwd_path='/etc/passwd', | |
lastlog_path='/var/log/lastlog', | |
logindefs_path='/var/log/lastlog', | |
include_system=True, | |
uids=None, | |
before=None, | |
since=None, | |
): | |
passwd = read_passwd(passwd_path) | |
users = dictify(passwd) | |
useruid = map_user_uid(passwd) | |
if uids: | |
oldusers = users | |
users = {} | |
for uid in uids: | |
try: | |
uid = int(uid) | |
except ValueError: | |
uid = useruid[uid] | |
users[uid] = oldusers[uid] | |
del oldusers | |
if not include_system: | |
logindefs = read_login_defs(logindefs_path) | |
else: | |
logindefs = None | |
if before: | |
before = days_ago(int(before)) | |
if since: | |
since = days_ago(int(since)) | |
with open(lastlog_path, 'rb') as f: | |
for uid, user in sorted(users.items()): | |
uid = int(uid) | |
if ( | |
logindefs | |
and not (logindefs.uid_min <= uid <= logindefs.uid_max) | |
): | |
continue | |
ll = get_lastlog_record(f, uid) | |
if ll is None: | |
continue | |
if ( | |
before | |
and ( | |
(ll.time and not ll.time.date() <= before) | |
or not ll.time | |
) | |
): | |
continue | |
if ( | |
since | |
and ( | |
(ll.time and not ll.time.date() >= since) | |
or not ll.time | |
) | |
): | |
continue | |
yield Result(user, ll) | |
def format_result(result): | |
fmt = "{user:<17}{line:<9}{host:<17}{time}" | |
data = dict( | |
user=result.user.user, | |
time=( | |
result.ll.time | |
if result.ll.time | |
else '** Never **' | |
), | |
line=result.ll.line, | |
host=result.ll.host[:16], | |
) | |
return fmt.format(**data) | |
def build_parser(): | |
passwd_def = '/etc/passwd' | |
lastlog_def = '/var/log/lastlog' | |
login_def = '/etc/login.defs' | |
parser = argparse.ArgumentParser( | |
description='lastlog reimplementation', | |
formatter_class=( | |
lambda prog: argparse.HelpFormatter(prog, max_help_position=27) | |
), | |
) | |
parser.add_argument( | |
'--passwd', | |
metavar='PATH', | |
default=passwd_def, | |
dest='passwd_path', | |
help='location of passwd file. Default: {0!r}'.format(passwd_def), | |
) | |
parser.add_argument( | |
'--lastlog', | |
metavar='PATH', | |
default=lastlog_def, | |
dest='lastlog_path', | |
help='location of lastlog file. Default: {0!r}'.format(lastlog_def), | |
) | |
parser.add_argument( | |
'--logindefs', | |
metavar='PATH', | |
default=login_def, | |
dest='logindefs_path', | |
help='location of login.defs. Default: {0!r}'.format(login_def), | |
) | |
parser.add_argument( | |
'--no-system', | |
dest='include_system', | |
action='store_false', | |
help="Exclude system users from output. Default: 'False'" | |
) | |
parser.add_argument( | |
'-u', '--user', | |
metavar='LOGIN', | |
dest='uids', | |
action='append', | |
default=None, | |
help="Show only users/uids" | |
) | |
parser.add_argument( | |
'-b', '--before', | |
metavar='DAYS', | |
type=int, | |
default=None, | |
help="Show only logins older than DAYS" | |
) | |
parser.add_argument( | |
'-t', '--time', | |
dest='since', | |
metavar='DAYS', | |
type=int, | |
default=None, | |
help="Show only logins more recent than DAYS" | |
) | |
return parser | |
def main(argv): | |
parser = build_parser() | |
args = parser.parse_args(argv) | |
logs = parse_lastlog(**vars(args)) | |
print "Username Port From Latest" | |
for log in logs: | |
print format_result(log) | |
if __name__ == '__main__': | |
sys.exit(main(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment