Created
September 26, 2024 14:46
-
-
Save recalde/d9fc0f5a706f07364869e149d550309d to your computer and use it in GitHub Desktop.
log_monitor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Text.RegularExpressions; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using k8s; | |
using k8s.Models; | |
using Npgsql; | |
class Program | |
{ | |
private static readonly string labelSelector = "app=myapp"; // Label selector for filtering pods | |
private static readonly string logPattern = @"ERROR.*TransactionID=(\d+)"; // Regex to capture transaction ID | |
private static readonly string namespaceName = "default"; // Namespace to monitor | |
private static readonly string logDirectory = "./logs/"; // Directory to store log files | |
private static readonly int batchSize = 100; // Number of log entries before inserting into the database | |
private static readonly int flushInterval = 10; // Time interval in seconds to flush logs into the database | |
private static ConcurrentDictionary<string, object> logLocks = new ConcurrentDictionary<string, object>(); // Lock per transaction ID | |
private static BlockingCollection<LogEntry> logBuffer = new BlockingCollection<LogEntry>(); // Log buffer for bulk inserts | |
private static readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); | |
static async Task Main(string[] args) | |
{ | |
// Ensure the log directory exists | |
Directory.CreateDirectory(logDirectory); | |
// Start the database streaming task | |
Task dbStreamingTask = Task.Run(() => DatabaseStreamer(cancellationTokenSource.Token)); | |
// Configure Kubernetes client | |
var config = KubernetesClientConfiguration.InClusterConfig(); | |
var client = new Kubernetes(config); | |
// Watch for pods matching the label selector | |
WatchPods(client, namespaceName); | |
// Wait for cancellation (Ctrl+C) to stop | |
Console.CancelKeyPress += (sender, eventArgs) => | |
{ | |
Console.WriteLine("Shutting down..."); | |
cancellationTokenSource.Cancel(); | |
}; | |
await dbStreamingTask; // Await the database streaming task to finish | |
} | |
// Structure to hold log entry data | |
public class LogEntry | |
{ | |
public DateTime Timestamp { get; set; } | |
public string TransactionId { get; set; } | |
public string Message { get; set; } | |
} | |
// Database streamer: collects log entries and inserts them in bulk to PostgreSQL | |
static async Task DatabaseStreamer(CancellationToken cancellationToken) | |
{ | |
var connectionString = "Host=your_db_host;Port=5432;Username=your_db_user;Password=your_db_password;Database=your_db_name"; | |
using var connection = new NpgsqlConnection(connectionString); | |
await connection.OpenAsync(); | |
var lastFlushTime = DateTime.Now; | |
while (!cancellationToken.IsCancellationRequested) | |
{ | |
try | |
{ | |
if (logBuffer.Count >= batchSize || (DateTime.Now - lastFlushTime).TotalSeconds >= flushInterval) | |
{ | |
List<LogEntry> entriesToInsert = new List<LogEntry>(); | |
while (logBuffer.TryTake(out LogEntry entry)) | |
{ | |
entriesToInsert.Add(entry); | |
} | |
if (entriesToInsert.Count > 0) | |
{ | |
var transaction = await connection.BeginTransactionAsync(); | |
using (var cmd = new NpgsqlCommand()) | |
{ | |
cmd.Connection = connection; | |
cmd.CommandText = "INSERT INTO log_entries (timestamp, transaction_id, message) VALUES (@timestamp, @transaction_id, @message)"; | |
cmd.Parameters.Add(new NpgsqlParameter("@timestamp", NpgsqlTypes.NpgsqlDbType.Timestamp)); | |
cmd.Parameters.Add(new NpgsqlParameter("@transaction_id", NpgsqlTypes.NpgsqlDbType.Varchar)); | |
cmd.Parameters.Add(new NpgsqlParameter("@message", NpgsqlTypes.NpgsqlDbType.Text)); | |
foreach (var entry in entriesToInsert) | |
{ | |
cmd.Parameters["@timestamp"].Value = entry.Timestamp; | |
cmd.Parameters["@transaction_id"].Value = entry.TransactionId; | |
cmd.Parameters["@message"].Value = entry.Message; | |
await cmd.ExecuteNonQueryAsync(); | |
} | |
} | |
await transaction.CommitAsync(); | |
lastFlushTime = DateTime.Now; | |
} | |
} | |
} | |
catch (Exception e) | |
{ | |
Console.WriteLine($"Error inserting log entries into the database: {e.Message}"); | |
} | |
await Task.Delay(1000, cancellationToken); // Wait 1 second before the next check | |
} | |
} | |
// Watch for pod events and start log watchers for running pods | |
static void WatchPods(Kubernetes client, string namespaceName) | |
{ | |
var podList = client.ListNamespacedPod(namespaceName, labelSelector: labelSelector); | |
foreach (var pod in podList.Items) | |
{ | |
if (pod.Status.Phase == "Running") | |
{ | |
StartLogWatcher(client, pod.Metadata.Name, namespaceName); | |
} | |
} | |
// Set up a watch to monitor for changes in the pod list | |
var podWatch = client.WatchNamespacedPodAsync(namespaceName, labelSelector: labelSelector, | |
onEvent: (eventType, pod) => | |
{ | |
if (eventType == WatchEventType.Added || eventType == WatchEventType.Modified) | |
{ | |
if (pod.Status.Phase == "Running") | |
{ | |
StartLogWatcher(client, pod.Metadata.Name, namespaceName); | |
} | |
} | |
else if (eventType == WatchEventType.Deleted) | |
{ | |
// Optionally handle pod deletion | |
Console.WriteLine($"Pod {pod.Metadata.Name} deleted"); | |
} | |
}).ConfigureAwait(false); | |
} | |
// Start a log watcher for a specific pod | |
static async void StartLogWatcher(Kubernetes client, string podName, string namespaceName) | |
{ | |
var regex = new Regex(logPattern); | |
Console.WriteLine($"Starting log watcher for pod: {podName}"); | |
using var stream = await client.ReadNamespacedPodLogAsync(podName, namespaceName, follow: true); | |
using var reader = new StreamReader(stream); | |
while (!reader.EndOfStream) | |
{ | |
var line = await reader.ReadLineAsync(); | |
if (string.IsNullOrEmpty(line)) | |
continue; | |
var timestamp = DateTime.UtcNow; | |
var match = regex.Match(line); | |
if (match.Success) | |
{ | |
var transactionId = match.Groups[1].Value; | |
// Write to transaction-specific log file | |
WriteLogEntry(line, transactionId); | |
// Add to buffer for database insertion | |
logBuffer.Add(new LogEntry | |
{ | |
Timestamp = timestamp, | |
TransactionId = transactionId, | |
Message = line | |
}); | |
} | |
} | |
} | |
// Write log entry to a transaction-specific file | |
static void WriteLogEntry(string logEntry, string transactionId) | |
{ | |
var lockObj = logLocks.GetOrAdd(transactionId, _ => new object()); | |
lock (lockObj) | |
{ | |
var filePath = Path.Combine(logDirectory, $"transaction_{transactionId}.log"); | |
using (var writer = new StreamWriter(filePath, append: true)) | |
{ | |
writer.WriteLine(logEntry); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from kubernetes import client, config, watch | |
from datetime import datetime | |
from dateutil import parser | |
import threading | |
import re | |
import time | |
import psycopg2 | |
import psycopg2.extras # For execute_batch | |
import json | |
from collections import defaultdict | |
# Load in-cluster Kubernetes configuration | |
config.load_incluster_config() | |
# Initialize the CoreV1Api to interact with Kubernetes core resources | |
v1 = client.CoreV1Api() | |
# Configuration variables | |
LABEL_SELECTOR = "app=myapp" # The label selector to filter pods (change as needed) | |
LOG_PATTERN = r"ERROR.*TransactionID=(\d+)" # Regex pattern with a capturing group for Transaction ID | |
NAMESPACE = "default" # The Kubernetes namespace to monitor (change as needed) | |
# Thread-safe log file writer setup | |
log_locks = defaultdict(threading.Lock) # Dictionary to store locks for each transaction ID | |
# Database configuration | |
DB_HOST = 'your_db_host' | |
DB_PORT = '5432' | |
DB_NAME = 'your_db_name' | |
DB_USER = 'your_db_user' | |
DB_PASSWORD = 'your_db_password' | |
def write_log_entry(entry, transaction_id): | |
""" | |
Writes a log entry to a transaction-specific log file in a thread-safe manner. | |
Args: | |
entry (str): The log entry to write to the file. | |
transaction_id (str): The transaction ID associated with the log entry. | |
""" | |
# Get the lock for the transaction ID | |
lock = log_locks[transaction_id] | |
with lock: | |
log_file = f"transaction_{transaction_id}.log" | |
with open(log_file, 'a') as f: | |
f.write(entry + '\n') | |
def parse_log_entry(log_entry): | |
""" | |
Parses a log entry string and extracts fields to insert into the database. | |
Args: | |
log_entry (str): The log message string. | |
Returns: | |
dict: A dictionary containing the extracted fields. | |
""" | |
try: | |
# Assuming the log message is a JSON string | |
data = json.loads(log_entry) | |
# Extract the fields you need; adjust the keys based on your log structure | |
return { | |
'timestamp': data.get('timestamp'), # Or use self.last_timestamp if not in log | |
'transaction_id': data.get('transaction_id'), | |
'field1': data.get('field1'), | |
'field2': data.get('field2'), | |
# Add more fields as needed | |
} | |
except json.JSONDecodeError: | |
print(f"Error parsing log entry: {log_entry}") | |
return None | |
class DatabaseStreamer: | |
""" | |
Collects log entries and bulk inserts them into a PostgreSQL database | |
when a specified row count or time interval is reached. | |
""" | |
def __init__(self, batch_size=100, flush_interval=10): | |
""" | |
Initializes the DatabaseStreamer. | |
Args: | |
batch_size (int): The number of log entries to collect before inserting into the database. | |
flush_interval (int): The time interval (in seconds) to wait before inserting collected log entries. | |
""" | |
self.batch_size = batch_size | |
self.flush_interval = flush_interval | |
self.buffer = [] | |
self.buffer_lock = threading.Lock() | |
self.last_flush_time = time.time() | |
self.stop_event = threading.Event() | |
self.thread = threading.Thread(target=self.run, daemon=True) | |
self.thread.start() | |
# Initialize database connection | |
try: | |
self.conn = psycopg2.connect( | |
host=DB_HOST, | |
port=DB_PORT, | |
dbname=DB_NAME, | |
user=DB_USER, | |
password=DB_PASSWORD | |
) | |
self.conn.autocommit = False # Disable autocommit for transaction control | |
self.cursor = self.conn.cursor() | |
except Exception as e: | |
print(f"Error connecting to the database: {e}") | |
raise | |
def add_entry(self, entry): | |
""" | |
Adds a log entry to the buffer. | |
Args: | |
entry (dict): The log entry to add, as a dictionary of field values. | |
""" | |
with self.buffer_lock: | |
self.buffer.append(entry) | |
if len(self.buffer) >= self.batch_size: | |
self.flush() | |
def flush(self): | |
""" | |
Inserts the collected log entries into the database and clears the buffer. | |
""" | |
with self.buffer_lock: | |
if self.buffer: | |
try: | |
# Prepare the insert query; adjust column names as needed | |
insert_query = """ | |
INSERT INTO log_entries (timestamp, transaction_id, field1, field2) | |
VALUES (%s, %s, %s, %s) | |
""" | |
# Extract values from buffer | |
values = [ | |
(entry['timestamp'], entry['transaction_id'], entry['field1'], entry['field2']) | |
for entry in self.buffer | |
] | |
# Execute batch insert | |
psycopg2.extras.execute_batch(self.cursor, insert_query, values) | |
self.conn.commit() | |
self.buffer.clear() | |
self.last_flush_time = time.time() | |
except Exception as e: | |
print(f"Error inserting into database: {e}") | |
self.conn.rollback() | |
def run(self): | |
""" | |
Continuously checks if the flush interval has passed and flushes the buffer if needed. | |
""" | |
while not self.stop_event.is_set(): | |
time.sleep(1) | |
if time.time() - self.last_flush_time >= self.flush_interval: | |
self.flush() | |
def close(self): | |
""" | |
Stops the streamer and closes the database connection. | |
""" | |
self.stop_event.set() | |
self.thread.join() | |
self.flush() # Flush any remaining entries | |
self.cursor.close() | |
self.conn.close() | |
# Initialize the DatabaseStreamer | |
database_streamer = DatabaseStreamer(batch_size=100, flush_interval=10) | |
class PodLogWatcher(threading.Thread): | |
""" | |
A thread that watches the logs of a single Kubernetes pod and filters log entries | |
based on a specified pattern. It also streams matching entries to a database and | |
writes them to transaction-specific log files. | |
""" | |
def __init__(self, namespace, pod_name, stop_event): | |
""" | |
Initializes the PodLogWatcher thread. | |
Args: | |
namespace (str): The namespace of the pod to watch. | |
pod_name (str): The name of the pod to watch. | |
stop_event (threading.Event): An event to signal the thread to stop. | |
""" | |
super().__init__() | |
self.namespace = namespace | |
self.pod_name = pod_name | |
self.stop_event = stop_event | |
self.daemon = True # Set as a daemon thread so it exits when the main program does | |
self.last_timestamp = None # Keep track of the last log timestamp to resume streaming | |
def run(self): | |
""" | |
The main method of the thread that continuously streams and processes the pod's logs. | |
""" | |
while not self.stop_event.is_set(): | |
try: | |
# Prepare arguments for the log streaming API call | |
kwargs = { | |
'name': self.pod_name, | |
'namespace': self.namespace, | |
'follow': True, # Stream logs in real-time | |
'_preload_content': False, # Do not preload the content; stream it | |
'timestamps': True, # Include timestamps in the log entries | |
} | |
# If a last timestamp exists, resume from there to avoid missing logs | |
if self.last_timestamp: | |
kwargs['since_time'] = self.last_timestamp.isoformat() + 'Z' | |
else: | |
# Start from the current time to avoid processing old logs | |
kwargs['since_time'] = datetime.utcnow().isoformat() + 'Z' | |
# Begin streaming the pod's logs | |
stream = watch.Watch().stream( | |
v1.read_namespaced_pod_log, | |
**kwargs | |
) | |
# Process each line from the log stream | |
for line in stream: | |
if self.stop_event.is_set(): | |
break # Exit the loop if a stop event is set | |
decoded_line = line.decode('utf-8').rstrip() | |
if decoded_line: | |
# Split the line into timestamp and log message | |
if ' ' in decoded_line: | |
timestamp_str, log_msg = decoded_line.split(' ', 1) | |
# Convert the timestamp string to a datetime object | |
timestamp = parser.isoparse(timestamp_str) | |
self.last_timestamp = timestamp | |
else: | |
# If no timestamp, use the current time | |
log_msg = decoded_line | |
self.last_timestamp = datetime.utcnow() | |
# Check if the log message matches the specified pattern | |
match = re.search(LOG_PATTERN, log_msg) | |
if match: | |
# Extract the transaction ID from the regex capturing group | |
transaction_id = match.group(1) | |
# Write the matching log entry to the transaction-specific file | |
write_log_entry(decoded_line, transaction_id) | |
# Parse the log message and add to database streamer | |
parsed_entry = parse_log_entry(log_msg) | |
if parsed_entry: | |
# Use the timestamp from the log if available | |
parsed_entry['timestamp'] = parsed_entry.get('timestamp', self.last_timestamp) | |
parsed_entry['transaction_id'] = transaction_id # Ensure transaction ID is included | |
database_streamer.add_entry(parsed_entry) | |
except Exception as e: | |
# Handle exceptions and retry after a delay | |
print(f"Error watching logs for pod {self.pod_name}: {e}") | |
time.sleep(5) # Wait before retrying to avoid rapid failure loops | |
def watch_pods(namespace): | |
""" | |
Watches for pods matching the label selector in the specified namespace | |
and manages log watchers for them. | |
Args: | |
namespace (str): The Kubernetes namespace to monitor for pods. | |
""" | |
pod_watchers = {} # Dictionary to keep track of active PodLogWatcher threads | |
# Get the initial list of pods matching the label selector | |
try: | |
pods = v1.list_namespaced_pod( | |
namespace=namespace, | |
label_selector=LABEL_SELECTOR | |
).items | |
# Start log watchers for existing pods that are running | |
for pod in pods: | |
pod_name = pod.metadata.name | |
if pod.status.phase == 'Running' and pod_name not in pod_watchers: | |
print(f"Starting log watch for existing pod {pod_name}") | |
stop_event = threading.Event() | |
watcher = PodLogWatcher(namespace, pod_name, stop_event) | |
watcher.start() | |
pod_watchers[pod_name] = (watcher, stop_event) | |
except Exception as e: | |
print(f"Error listing initial pods: {e}") | |
# Proceed to watch for pod events even if initial listing fails | |
# Continuously watch for pod events (ADDED, MODIFIED, DELETED) | |
while True: | |
try: | |
w = watch.Watch() | |
for event in w.stream( | |
v1.list_namespaced_pod, | |
namespace=namespace, | |
label_selector=LABEL_SELECTOR | |
): | |
pod = event['object'] | |
pod_name = pod.metadata.name | |
event_type = event['type'] | |
if event_type == 'ADDED': | |
# Start a log watcher for newly added pods that are running | |
if pod.status.phase == 'Running' and pod_name not in pod_watchers: | |
print(f"Starting log watch for pod {pod_name}") | |
stop_event = threading.Event() | |
watcher = PodLogWatcher(namespace, pod_name, stop_event) | |
watcher.start() | |
pod_watchers[pod_name] = (watcher, stop_event) | |
elif event_type == 'MODIFIED': | |
# Handle state changes in existing pods | |
if pod.status.phase == 'Running' and pod_name not in pod_watchers: | |
# Pod transitioned to Running state; start log watcher | |
print(f"Pod {pod_name} is now Running, starting log watch") | |
stop_event = threading.Event() | |
watcher = PodLogWatcher(namespace, pod_name, stop_event) | |
watcher.start() | |
pod_watchers[pod_name] = (watcher, stop_event) | |
elif pod.status.phase != 'Running' and pod_name in pod_watchers: | |
# Pod is no longer Running; stop log watcher | |
print(f"Pod {pod_name} is no longer Running, stopping log watch") | |
watcher, stop_event = pod_watchers.pop(pod_name) | |
stop_event.set() | |
elif event_type == 'DELETED': | |
# Stop the log watcher for deleted pods | |
print(f"Pod {pod_name} deleted") | |
if pod_name in pod_watchers: | |
watcher, stop_event = pod_watchers.pop(pod_name) | |
stop_event.set() | |
except Exception as e: | |
# Handle exceptions and retry after a delay | |
print(f"Error watching pods: {e}") | |
time.sleep(5) # Wait before retrying to avoid rapid failure loops | |
if __name__ == "__main__": | |
try: | |
# Start the pod watcher in the specified namespace | |
watch_pods(NAMESPACE) | |
except KeyboardInterrupt: | |
# Handle graceful shutdown on interrupt signal | |
print("Shutting down gracefully...") | |
finally: | |
# Close the database streamer | |
database_streamer.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment