Last active
March 2, 2021 16:30
-
-
Save shreyb/16cf6bb3952f81df61700777cc7841b6 to your computer and use it in GitHub Desktop.
This is a little script to parse the VOMS logs at Fermilab (notice the regexes look for fnal.gov VOMS servers) to grab all the production users.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from collections import Counter, defaultdict, namedtuple | |
from datetime import datetime, timedelta | |
from functools import partial | |
import gzip | |
import pathlib | |
import re | |
# Regexes | |
pid_production_line_regex = re.compile('^(\w{3} \w{3}(?: ){1,2}\d{1,2} \d{2}\:\d{2}\:\d{2} \d{4})\:voms\d\.fnal\.gov\:vomsd\[(\d+)\].+Issued FQAN: .+\/Role=Production\/Capability=NULL\"$') | |
production_user_regex = re.compile('^(\w{3} \w{3}(?: ){1,2}\d{1,2} \d{2}\:\d{2}\:\d{2} \d{4})\:voms\d\.fnal\.gov\:vomsd\[(\d+)\].+Received request from\:.+/CN=(?:UID\:)?(.+)\,.+$') | |
# Other settings | |
PID_WINDOW = timedelta(minutes=5) | |
def dict_sorter_service_cert(d): | |
dict_regex = re.compile('^.+\.\w+\.\w+$') | |
service_certs = [] | |
users = [] | |
for key, value in d.items(): | |
if dict_regex.match(key): | |
service_certs.append((key, value)) | |
else: | |
users.append((key, value)) | |
service_certs.sort() | |
users.sort() | |
return users + service_certs | |
def main(): | |
parser = argparse.ArgumentParser('Parse VOMS log files to find Production role' | |
' requests and tally them') | |
parser.add_argument('filenames', type=str, help='Filenames of VOMS logfiles' | |
' to parse', nargs='+') | |
args = parser.parse_args() | |
production_users = [] | |
for filename in args.filenames: | |
filepath = pathlib.Path(filename) | |
# Handle archived files as well as live files | |
_open_func = open | |
_str_func = str | |
if filepath.suffix == '.gz': | |
_open_func = partial(gzip.open, mode='rb') | |
_str_func = partial(str, encoding='utf-8') | |
with _open_func(filepath) as f: | |
# Get PIDs and corresponding timestamps for all Production FQAN lines | |
production_pids = [(match.group(2), | |
datetime.strptime(match.group(1), '%c')) | |
for line in f | |
for match in [pid_production_line_regex.match(_str_func(line))] | |
if match] | |
production_pids_dict = defaultdict(list) | |
for pid, timestamp in production_pids: | |
production_pids_dict[pid].append(timestamp) | |
f.seek(0) | |
# Get all possible CNs that correspond to PIDs above | |
Candidate = namedtuple('Candidate', ['cn', 'pid', 'timestamp']) | |
possible_prod_users = [Candidate( | |
match.group(3), | |
match.group(2), | |
datetime.strptime(match.group(1), '%c')) | |
for line in f | |
for match in [production_user_regex.match(_str_func(line))] | |
if match and (match.group(2) in production_pids_dict) | |
] | |
# Check CN candidates to see if timestamps match up within PID_WINDOW | |
for candidate in possible_prod_users: | |
possible_datetimes = production_pids_dict[candidate.pid] | |
for dt in possible_datetimes: | |
time_interval = (candidate.timestamp - dt).total_seconds() | |
if abs(time_interval) < abs(PID_WINDOW.total_seconds()): | |
production_users.append(candidate.cn) | |
production_users_count = Counter(production_users) | |
final_list = dict_sorter_service_cert(production_users_count) | |
for user, count in final_list: | |
print(f'{user} {count}') | |
if __name__=='__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment