Created
August 26, 2023 00:45
-
-
Save signalnine/1596185ec661a7fc658174fbbe755b5b to your computer and use it in GitHub Desktop.
reconstruct crontab from syslog
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
from datetime import datetime | |
from collections import Counter | |
def infer_cron_schedule(timestamps): | |
# Sort timestamps | |
timestamps.sort() | |
# Calculate intervals in seconds | |
intervals = [(timestamps[i+1] - timestamps[i]).seconds for i in range(len(timestamps)-1)] | |
# Identify unique intervals | |
unique_intervals = list(Counter(intervals).keys()) | |
# Check if job runs every minute | |
if all(interval <= 70 for interval in intervals): | |
return "* * * * *" | |
# Check if job runs every N minutes | |
if len(unique_intervals) == 1: | |
minutes = unique_intervals[0] // 60 | |
return f"*/{minutes} * * * *" | |
# Check if job runs at specific times of the day | |
unique_times_of_day = set([(ts.hour, ts.minute) for ts in timestamps]) | |
if len(unique_times_of_day) > 1: | |
times_str = ",".join([f"{hour}:{minute}" for hour, minute in unique_times_of_day]) | |
return f"{times_str} * * *" | |
# If unable to determine the schedule, return a placeholder | |
return "* * * * *" | |
def parse_syslog_line(line): | |
parts = line.split(" ") | |
if len(parts) < 6: | |
return None | |
if "CRON" not in parts[4]: | |
return None | |
timestamp_str = f"{parts[0]} {parts[1]} {parts[2]}" | |
try: | |
timestamp = datetime.strptime(timestamp_str, "%b %d %H:%M:%S") | |
except: | |
return None | |
user = None | |
command = None | |
for part in parts[5:]: | |
if part.startswith("("): | |
user = part[1:] | |
elif part.startswith("CMD"): | |
command = " ".join(parts[parts.index(part) + 1:]).replace('(', '').replace(')', '') | |
break | |
if user and command: | |
return user, command, timestamp | |
return None | |
def reconstruct_crontab_from_syslog(syslog_file_path): | |
cron_entries = defaultdict(lambda: defaultdict(list)) | |
with open(syslog_file_path, "r") as f: | |
for line in f.readlines(): | |
parsed = parse_syslog_line(line.strip()) | |
if parsed: | |
user, command, timestamp = parsed | |
cron_entries[user][command].append(timestamp) | |
for user, commands in cron_entries.items(): | |
print(f"# Crontab for {user}") | |
for command, timestamps in commands.items(): | |
inferred_schedule = infer_cron_schedule(timestamps) | |
print(f"{inferred_schedule} {command}") | |
print() | |
if __name__ == "__main__": | |
reconstruct_crontab_from_syslog("/var/log/syslog") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment