Skip to content

Instantly share code, notes, and snippets.

@recalde
Created September 26, 2024 14:46
Show Gist options
  • Save recalde/d9fc0f5a706f07364869e149d550309d to your computer and use it in GitHub Desktop.
Save recalde/d9fc0f5a706f07364869e149d550309d to your computer and use it in GitHub Desktop.
log_monitor
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using k8s;
using k8s.Models;
using Npgsql;
class Program
{
private static readonly string labelSelector = "app=myapp"; // Label selector for filtering pods
private static readonly string logPattern = @"ERROR.*TransactionID=(\d+)"; // Regex to capture transaction ID
private static readonly string namespaceName = "default"; // Namespace to monitor
private static readonly string logDirectory = "./logs/"; // Directory to store log files
private static readonly int batchSize = 100; // Number of log entries before inserting into the database
private static readonly int flushInterval = 10; // Time interval in seconds to flush logs into the database
private static ConcurrentDictionary<string, object> logLocks = new ConcurrentDictionary<string, object>(); // Lock per transaction ID
private static BlockingCollection<LogEntry> logBuffer = new BlockingCollection<LogEntry>(); // Log buffer for bulk inserts
private static readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
static async Task Main(string[] args)
{
// Ensure the log directory exists
Directory.CreateDirectory(logDirectory);
// Start the database streaming task
Task dbStreamingTask = Task.Run(() => DatabaseStreamer(cancellationTokenSource.Token));
// Configure Kubernetes client
var config = KubernetesClientConfiguration.InClusterConfig();
var client = new Kubernetes(config);
// Watch for pods matching the label selector
WatchPods(client, namespaceName);
// Wait for cancellation (Ctrl+C) to stop
Console.CancelKeyPress += (sender, eventArgs) =>
{
Console.WriteLine("Shutting down...");
cancellationTokenSource.Cancel();
};
await dbStreamingTask; // Await the database streaming task to finish
}
// Structure to hold log entry data
public class LogEntry
{
public DateTime Timestamp { get; set; }
public string TransactionId { get; set; }
public string Message { get; set; }
}
// Database streamer: collects log entries and inserts them in bulk to PostgreSQL
static async Task DatabaseStreamer(CancellationToken cancellationToken)
{
var connectionString = "Host=your_db_host;Port=5432;Username=your_db_user;Password=your_db_password;Database=your_db_name";
using var connection = new NpgsqlConnection(connectionString);
await connection.OpenAsync();
var lastFlushTime = DateTime.Now;
while (!cancellationToken.IsCancellationRequested)
{
try
{
if (logBuffer.Count >= batchSize || (DateTime.Now - lastFlushTime).TotalSeconds >= flushInterval)
{
List<LogEntry> entriesToInsert = new List<LogEntry>();
while (logBuffer.TryTake(out LogEntry entry))
{
entriesToInsert.Add(entry);
}
if (entriesToInsert.Count > 0)
{
var transaction = await connection.BeginTransactionAsync();
using (var cmd = new NpgsqlCommand())
{
cmd.Connection = connection;
cmd.CommandText = "INSERT INTO log_entries (timestamp, transaction_id, message) VALUES (@timestamp, @transaction_id, @message)";
cmd.Parameters.Add(new NpgsqlParameter("@timestamp", NpgsqlTypes.NpgsqlDbType.Timestamp));
cmd.Parameters.Add(new NpgsqlParameter("@transaction_id", NpgsqlTypes.NpgsqlDbType.Varchar));
cmd.Parameters.Add(new NpgsqlParameter("@message", NpgsqlTypes.NpgsqlDbType.Text));
foreach (var entry in entriesToInsert)
{
cmd.Parameters["@timestamp"].Value = entry.Timestamp;
cmd.Parameters["@transaction_id"].Value = entry.TransactionId;
cmd.Parameters["@message"].Value = entry.Message;
await cmd.ExecuteNonQueryAsync();
}
}
await transaction.CommitAsync();
lastFlushTime = DateTime.Now;
}
}
}
catch (Exception e)
{
Console.WriteLine($"Error inserting log entries into the database: {e.Message}");
}
await Task.Delay(1000, cancellationToken); // Wait 1 second before the next check
}
}
// Watch for pod events and start log watchers for running pods
static void WatchPods(Kubernetes client, string namespaceName)
{
var podList = client.ListNamespacedPod(namespaceName, labelSelector: labelSelector);
foreach (var pod in podList.Items)
{
if (pod.Status.Phase == "Running")
{
StartLogWatcher(client, pod.Metadata.Name, namespaceName);
}
}
// Set up a watch to monitor for changes in the pod list
var podWatch = client.WatchNamespacedPodAsync(namespaceName, labelSelector: labelSelector,
onEvent: (eventType, pod) =>
{
if (eventType == WatchEventType.Added || eventType == WatchEventType.Modified)
{
if (pod.Status.Phase == "Running")
{
StartLogWatcher(client, pod.Metadata.Name, namespaceName);
}
}
else if (eventType == WatchEventType.Deleted)
{
// Optionally handle pod deletion
Console.WriteLine($"Pod {pod.Metadata.Name} deleted");
}
}).ConfigureAwait(false);
}
// Start a log watcher for a specific pod
static async void StartLogWatcher(Kubernetes client, string podName, string namespaceName)
{
var regex = new Regex(logPattern);
Console.WriteLine($"Starting log watcher for pod: {podName}");
using var stream = await client.ReadNamespacedPodLogAsync(podName, namespaceName, follow: true);
using var reader = new StreamReader(stream);
while (!reader.EndOfStream)
{
var line = await reader.ReadLineAsync();
if (string.IsNullOrEmpty(line))
continue;
var timestamp = DateTime.UtcNow;
var match = regex.Match(line);
if (match.Success)
{
var transactionId = match.Groups[1].Value;
// Write to transaction-specific log file
WriteLogEntry(line, transactionId);
// Add to buffer for database insertion
logBuffer.Add(new LogEntry
{
Timestamp = timestamp,
TransactionId = transactionId,
Message = line
});
}
}
}
// Write log entry to a transaction-specific file
static void WriteLogEntry(string logEntry, string transactionId)
{
var lockObj = logLocks.GetOrAdd(transactionId, _ => new object());
lock (lockObj)
{
var filePath = Path.Combine(logDirectory, $"transaction_{transactionId}.log");
using (var writer = new StreamWriter(filePath, append: true))
{
writer.WriteLine(logEntry);
}
}
}
}
from kubernetes import client, config, watch
from datetime import datetime
from dateutil import parser
import threading
import re
import time
import psycopg2
import psycopg2.extras # For execute_batch
import json
from collections import defaultdict
# Load in-cluster Kubernetes configuration
config.load_incluster_config()
# Initialize the CoreV1Api to interact with Kubernetes core resources
v1 = client.CoreV1Api()
# Configuration variables
LABEL_SELECTOR = "app=myapp" # The label selector to filter pods (change as needed)
LOG_PATTERN = r"ERROR.*TransactionID=(\d+)" # Regex pattern with a capturing group for Transaction ID
NAMESPACE = "default" # The Kubernetes namespace to monitor (change as needed)
# Thread-safe log file writer setup
log_locks = defaultdict(threading.Lock) # Dictionary to store locks for each transaction ID
# Database configuration
DB_HOST = 'your_db_host'
DB_PORT = '5432'
DB_NAME = 'your_db_name'
DB_USER = 'your_db_user'
DB_PASSWORD = 'your_db_password'
def write_log_entry(entry, transaction_id):
"""
Writes a log entry to a transaction-specific log file in a thread-safe manner.
Args:
entry (str): The log entry to write to the file.
transaction_id (str): The transaction ID associated with the log entry.
"""
# Get the lock for the transaction ID
lock = log_locks[transaction_id]
with lock:
log_file = f"transaction_{transaction_id}.log"
with open(log_file, 'a') as f:
f.write(entry + '\n')
def parse_log_entry(log_entry):
"""
Parses a log entry string and extracts fields to insert into the database.
Args:
log_entry (str): The log message string.
Returns:
dict: A dictionary containing the extracted fields.
"""
try:
# Assuming the log message is a JSON string
data = json.loads(log_entry)
# Extract the fields you need; adjust the keys based on your log structure
return {
'timestamp': data.get('timestamp'), # Or use self.last_timestamp if not in log
'transaction_id': data.get('transaction_id'),
'field1': data.get('field1'),
'field2': data.get('field2'),
# Add more fields as needed
}
except json.JSONDecodeError:
print(f"Error parsing log entry: {log_entry}")
return None
class DatabaseStreamer:
"""
Collects log entries and bulk inserts them into a PostgreSQL database
when a specified row count or time interval is reached.
"""
def __init__(self, batch_size=100, flush_interval=10):
"""
Initializes the DatabaseStreamer.
Args:
batch_size (int): The number of log entries to collect before inserting into the database.
flush_interval (int): The time interval (in seconds) to wait before inserting collected log entries.
"""
self.batch_size = batch_size
self.flush_interval = flush_interval
self.buffer = []
self.buffer_lock = threading.Lock()
self.last_flush_time = time.time()
self.stop_event = threading.Event()
self.thread = threading.Thread(target=self.run, daemon=True)
self.thread.start()
# Initialize database connection
try:
self.conn = psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
self.conn.autocommit = False # Disable autocommit for transaction control
self.cursor = self.conn.cursor()
except Exception as e:
print(f"Error connecting to the database: {e}")
raise
def add_entry(self, entry):
"""
Adds a log entry to the buffer.
Args:
entry (dict): The log entry to add, as a dictionary of field values.
"""
with self.buffer_lock:
self.buffer.append(entry)
if len(self.buffer) >= self.batch_size:
self.flush()
def flush(self):
"""
Inserts the collected log entries into the database and clears the buffer.
"""
with self.buffer_lock:
if self.buffer:
try:
# Prepare the insert query; adjust column names as needed
insert_query = """
INSERT INTO log_entries (timestamp, transaction_id, field1, field2)
VALUES (%s, %s, %s, %s)
"""
# Extract values from buffer
values = [
(entry['timestamp'], entry['transaction_id'], entry['field1'], entry['field2'])
for entry in self.buffer
]
# Execute batch insert
psycopg2.extras.execute_batch(self.cursor, insert_query, values)
self.conn.commit()
self.buffer.clear()
self.last_flush_time = time.time()
except Exception as e:
print(f"Error inserting into database: {e}")
self.conn.rollback()
def run(self):
"""
Continuously checks if the flush interval has passed and flushes the buffer if needed.
"""
while not self.stop_event.is_set():
time.sleep(1)
if time.time() - self.last_flush_time >= self.flush_interval:
self.flush()
def close(self):
"""
Stops the streamer and closes the database connection.
"""
self.stop_event.set()
self.thread.join()
self.flush() # Flush any remaining entries
self.cursor.close()
self.conn.close()
# Initialize the DatabaseStreamer
database_streamer = DatabaseStreamer(batch_size=100, flush_interval=10)
class PodLogWatcher(threading.Thread):
"""
A thread that watches the logs of a single Kubernetes pod and filters log entries
based on a specified pattern. It also streams matching entries to a database and
writes them to transaction-specific log files.
"""
def __init__(self, namespace, pod_name, stop_event):
"""
Initializes the PodLogWatcher thread.
Args:
namespace (str): The namespace of the pod to watch.
pod_name (str): The name of the pod to watch.
stop_event (threading.Event): An event to signal the thread to stop.
"""
super().__init__()
self.namespace = namespace
self.pod_name = pod_name
self.stop_event = stop_event
self.daemon = True # Set as a daemon thread so it exits when the main program does
self.last_timestamp = None # Keep track of the last log timestamp to resume streaming
def run(self):
"""
The main method of the thread that continuously streams and processes the pod's logs.
"""
while not self.stop_event.is_set():
try:
# Prepare arguments for the log streaming API call
kwargs = {
'name': self.pod_name,
'namespace': self.namespace,
'follow': True, # Stream logs in real-time
'_preload_content': False, # Do not preload the content; stream it
'timestamps': True, # Include timestamps in the log entries
}
# If a last timestamp exists, resume from there to avoid missing logs
if self.last_timestamp:
kwargs['since_time'] = self.last_timestamp.isoformat() + 'Z'
else:
# Start from the current time to avoid processing old logs
kwargs['since_time'] = datetime.utcnow().isoformat() + 'Z'
# Begin streaming the pod's logs
stream = watch.Watch().stream(
v1.read_namespaced_pod_log,
**kwargs
)
# Process each line from the log stream
for line in stream:
if self.stop_event.is_set():
break # Exit the loop if a stop event is set
decoded_line = line.decode('utf-8').rstrip()
if decoded_line:
# Split the line into timestamp and log message
if ' ' in decoded_line:
timestamp_str, log_msg = decoded_line.split(' ', 1)
# Convert the timestamp string to a datetime object
timestamp = parser.isoparse(timestamp_str)
self.last_timestamp = timestamp
else:
# If no timestamp, use the current time
log_msg = decoded_line
self.last_timestamp = datetime.utcnow()
# Check if the log message matches the specified pattern
match = re.search(LOG_PATTERN, log_msg)
if match:
# Extract the transaction ID from the regex capturing group
transaction_id = match.group(1)
# Write the matching log entry to the transaction-specific file
write_log_entry(decoded_line, transaction_id)
# Parse the log message and add to database streamer
parsed_entry = parse_log_entry(log_msg)
if parsed_entry:
# Use the timestamp from the log if available
parsed_entry['timestamp'] = parsed_entry.get('timestamp', self.last_timestamp)
parsed_entry['transaction_id'] = transaction_id # Ensure transaction ID is included
database_streamer.add_entry(parsed_entry)
except Exception as e:
# Handle exceptions and retry after a delay
print(f"Error watching logs for pod {self.pod_name}: {e}")
time.sleep(5) # Wait before retrying to avoid rapid failure loops
def watch_pods(namespace):
"""
Watches for pods matching the label selector in the specified namespace
and manages log watchers for them.
Args:
namespace (str): The Kubernetes namespace to monitor for pods.
"""
pod_watchers = {} # Dictionary to keep track of active PodLogWatcher threads
# Get the initial list of pods matching the label selector
try:
pods = v1.list_namespaced_pod(
namespace=namespace,
label_selector=LABEL_SELECTOR
).items
# Start log watchers for existing pods that are running
for pod in pods:
pod_name = pod.metadata.name
if pod.status.phase == 'Running' and pod_name not in pod_watchers:
print(f"Starting log watch for existing pod {pod_name}")
stop_event = threading.Event()
watcher = PodLogWatcher(namespace, pod_name, stop_event)
watcher.start()
pod_watchers[pod_name] = (watcher, stop_event)
except Exception as e:
print(f"Error listing initial pods: {e}")
# Proceed to watch for pod events even if initial listing fails
# Continuously watch for pod events (ADDED, MODIFIED, DELETED)
while True:
try:
w = watch.Watch()
for event in w.stream(
v1.list_namespaced_pod,
namespace=namespace,
label_selector=LABEL_SELECTOR
):
pod = event['object']
pod_name = pod.metadata.name
event_type = event['type']
if event_type == 'ADDED':
# Start a log watcher for newly added pods that are running
if pod.status.phase == 'Running' and pod_name not in pod_watchers:
print(f"Starting log watch for pod {pod_name}")
stop_event = threading.Event()
watcher = PodLogWatcher(namespace, pod_name, stop_event)
watcher.start()
pod_watchers[pod_name] = (watcher, stop_event)
elif event_type == 'MODIFIED':
# Handle state changes in existing pods
if pod.status.phase == 'Running' and pod_name not in pod_watchers:
# Pod transitioned to Running state; start log watcher
print(f"Pod {pod_name} is now Running, starting log watch")
stop_event = threading.Event()
watcher = PodLogWatcher(namespace, pod_name, stop_event)
watcher.start()
pod_watchers[pod_name] = (watcher, stop_event)
elif pod.status.phase != 'Running' and pod_name in pod_watchers:
# Pod is no longer Running; stop log watcher
print(f"Pod {pod_name} is no longer Running, stopping log watch")
watcher, stop_event = pod_watchers.pop(pod_name)
stop_event.set()
elif event_type == 'DELETED':
# Stop the log watcher for deleted pods
print(f"Pod {pod_name} deleted")
if pod_name in pod_watchers:
watcher, stop_event = pod_watchers.pop(pod_name)
stop_event.set()
except Exception as e:
# Handle exceptions and retry after a delay
print(f"Error watching pods: {e}")
time.sleep(5) # Wait before retrying to avoid rapid failure loops
if __name__ == "__main__":
try:
# Start the pod watcher in the specified namespace
watch_pods(NAMESPACE)
except KeyboardInterrupt:
# Handle graceful shutdown on interrupt signal
print("Shutting down gracefully...")
finally:
# Close the database streamer
database_streamer.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment