Skip to content

Instantly share code, notes, and snippets.

@tritium21
Last active September 18, 2015 18:32
Show Gist options
  • Save tritium21/c783f4aa786965d8f3b8 to your computer and use it in GitHub Desktop.
Save tritium21/c783f4aa786965d8f3b8 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import argparse
import collections
import datetime
import operator
import os.path
import struct
import sys
# #define UT_LINESIZE 32
# #define UT_HOSTSIZE 256
#
# struct lastlog
# {
# #if __WORDSIZE == 64 && defined __WORDSIZE_COMPAT32
# int32_t ll_time;
# #else
# __time_t ll_time;
# #endif
# char ll_line[UT_LINESIZE];
# char ll_host[UT_HOSTSIZE];
# };
LLSTRUCT = struct.Struct('=l32s256s')
LoginDefs = collections.namedtuple(
"LoginDefs",
'uid_min uid_max'
)
User = collections.namedtuple(
"User",
'user passwd uid gid gecos home shell'
)
LLRecord = collections.namedtuple(
"LLRecord",
'time line host'
)
Result = collections.namedtuple(
"Result",
'user ll'
)
def read_login_defs(fname='/etc/login.defs'):
if not os.path.exists(fname):
return
fields = [x.upper() for x in LoginDefs._fields]
d = dict.fromkeys(LoginDefs._fields)
with open(fname) as f:
for line in f:
for field in fields:
if line.startswith(field):
d[field.lower()] = int(line.split()[1].strip())
return LoginDefs(**d)
def read_passwd(fname='/etc/passwd'):
with open(fname) as f:
_users = (User(*s.strip().split(':')) for s in f)
_users = [x._replace(uid=int(x.uid)) for x in _users]
return _users
def dictify(iterable, key=None):
key = key if key is not None else operator.attrgetter('uid')
return {key(x): x for x in iterable}
def map_user_uid(iterable):
uid = operator.attrgetter('uid')
user = operator.attrgetter('user')
return {user(x): uid(x) for x in iterable}
def parse_lastlog_record(data):
time, line, host = data
time = datetime.datetime.fromtimestamp(time) if time else None
line = line.strip('\x00')
host = host.strip('\x00')
return LLRecord(time, line, host)
def get_lastlog_record(fp, uid):
seekto = LLSTRUCT.size * uid
fp.seek(seekto, 0)
bytes = fp.read(LLSTRUCT.size)
if not bytes:
return
data = LLSTRUCT.unpack(bytes)
return parse_lastlog_record(data)
def days_ago(days):
delta = datetime.timedelta(days=days)
now = datetime.datetime.now()
dt = now - delta
return dt.date()
def parse_lastlog(
passwd_path='/etc/passwd',
lastlog_path='/var/log/lastlog',
logindefs_path='/var/log/lastlog',
include_system=True,
uids=None,
before=None,
since=None,
):
passwd = read_passwd(passwd_path)
users = dictify(passwd)
useruid = map_user_uid(passwd)
if uids:
oldusers = users
users = {}
for uid in uids:
try:
uid = int(uid)
except ValueError:
uid = useruid[uid]
users[uid] = oldusers[uid]
del oldusers
if not include_system:
logindefs = read_login_defs(logindefs_path)
else:
logindefs = None
if before:
before = days_ago(int(before))
if since:
since = days_ago(int(since))
with open(lastlog_path, 'rb') as f:
for uid, user in sorted(users.items()):
uid = int(uid)
if (
logindefs
and not (logindefs.uid_min <= uid <= logindefs.uid_max)
):
continue
ll = get_lastlog_record(f, uid)
if ll is None:
continue
if (
before
and (
(ll.time and not ll.time.date() <= before)
or not ll.time
)
):
continue
if (
since
and (
(ll.time and not ll.time.date() >= since)
or not ll.time
)
):
continue
yield Result(user, ll)
def format_result(result):
fmt = "{user:<17}{line:<9}{host:<17}{time}"
data = dict(
user=result.user.user,
time=(
result.ll.time
if result.ll.time
else '** Never **'
),
line=result.ll.line,
host=result.ll.host[:16],
)
return fmt.format(**data)
def build_parser():
passwd_def = '/etc/passwd'
lastlog_def = '/var/log/lastlog'
login_def = '/etc/login.defs'
parser = argparse.ArgumentParser(
description='lastlog reimplementation',
formatter_class=(
lambda prog: argparse.HelpFormatter(prog, max_help_position=27)
),
)
parser.add_argument(
'--passwd',
metavar='PATH',
default=passwd_def,
dest='passwd_path',
help='location of passwd file. Default: {0!r}'.format(passwd_def),
)
parser.add_argument(
'--lastlog',
metavar='PATH',
default=lastlog_def,
dest='lastlog_path',
help='location of lastlog file. Default: {0!r}'.format(lastlog_def),
)
parser.add_argument(
'--logindefs',
metavar='PATH',
default=login_def,
dest='logindefs_path',
help='location of login.defs. Default: {0!r}'.format(login_def),
)
parser.add_argument(
'--no-system',
dest='include_system',
action='store_false',
help="Exclude system users from output. Default: 'False'"
)
parser.add_argument(
'-u', '--user',
metavar='LOGIN',
dest='uids',
action='append',
default=None,
help="Show only users/uids"
)
parser.add_argument(
'-b', '--before',
metavar='DAYS',
type=int,
default=None,
help="Show only logins older than DAYS"
)
parser.add_argument(
'-t', '--time',
dest='since',
metavar='DAYS',
type=int,
default=None,
help="Show only logins more recent than DAYS"
)
return parser
def main(argv):
parser = build_parser()
args = parser.parse_args(argv)
logs = parse_lastlog(**vars(args))
print "Username Port From Latest"
for log in logs:
print format_result(log)
if __name__ == '__main__':
sys.exit(main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment