Skip to content

Instantly share code, notes, and snippets.

@shreyb
Last active March 2, 2021 16:30
Show Gist options
  • Save shreyb/16cf6bb3952f81df61700777cc7841b6 to your computer and use it in GitHub Desktop.
Save shreyb/16cf6bb3952f81df61700777cc7841b6 to your computer and use it in GitHub Desktop.
This is a little script to parse the VOMS logs at Fermilab (notice the regexes look for fnal.gov VOMS servers) to grab all the production users.
import argparse
from collections import Counter, defaultdict, namedtuple
from datetime import datetime, timedelta
from functools import partial
import gzip
import pathlib
import re
# Regexes
pid_production_line_regex = re.compile('^(\w{3} \w{3}(?: ){1,2}\d{1,2} \d{2}\:\d{2}\:\d{2} \d{4})\:voms\d\.fnal\.gov\:vomsd\[(\d+)\].+Issued FQAN: .+\/Role=Production\/Capability=NULL\"$')
production_user_regex = re.compile('^(\w{3} \w{3}(?: ){1,2}\d{1,2} \d{2}\:\d{2}\:\d{2} \d{4})\:voms\d\.fnal\.gov\:vomsd\[(\d+)\].+Received request from\:.+/CN=(?:UID\:)?(.+)\,.+$')
# Other settings
PID_WINDOW = timedelta(minutes=5)
def dict_sorter_service_cert(d):
dict_regex = re.compile('^.+\.\w+\.\w+$')
service_certs = []
users = []
for key, value in d.items():
if dict_regex.match(key):
service_certs.append((key, value))
else:
users.append((key, value))
service_certs.sort()
users.sort()
return users + service_certs
def main():
parser = argparse.ArgumentParser('Parse VOMS log files to find Production role'
' requests and tally them')
parser.add_argument('filenames', type=str, help='Filenames of VOMS logfiles'
' to parse', nargs='+')
args = parser.parse_args()
production_users = []
for filename in args.filenames:
filepath = pathlib.Path(filename)
# Handle archived files as well as live files
_open_func = open
_str_func = str
if filepath.suffix == '.gz':
_open_func = partial(gzip.open, mode='rb')
_str_func = partial(str, encoding='utf-8')
with _open_func(filepath) as f:
# Get PIDs and corresponding timestamps for all Production FQAN lines
production_pids = [(match.group(2),
datetime.strptime(match.group(1), '%c'))
for line in f
for match in [pid_production_line_regex.match(_str_func(line))]
if match]
production_pids_dict = defaultdict(list)
for pid, timestamp in production_pids:
production_pids_dict[pid].append(timestamp)
f.seek(0)
# Get all possible CNs that correspond to PIDs above
Candidate = namedtuple('Candidate', ['cn', 'pid', 'timestamp'])
possible_prod_users = [Candidate(
match.group(3),
match.group(2),
datetime.strptime(match.group(1), '%c'))
for line in f
for match in [production_user_regex.match(_str_func(line))]
if match and (match.group(2) in production_pids_dict)
]
# Check CN candidates to see if timestamps match up within PID_WINDOW
for candidate in possible_prod_users:
possible_datetimes = production_pids_dict[candidate.pid]
for dt in possible_datetimes:
time_interval = (candidate.timestamp - dt).total_seconds()
if abs(time_interval) < abs(PID_WINDOW.total_seconds()):
production_users.append(candidate.cn)
production_users_count = Counter(production_users)
final_list = dict_sorter_service_cert(production_users_count)
for user, count in final_list:
print(f'{user} {count}')
if __name__=='__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment