Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Repro of k8s watcher on AKS staying open and missing events (https://github.com/Azure/AKS/issues/1755)
using k8s;
using k8s.Models;
using Microsoft.Rest;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
namespace WatcherTest
{
// When the watcher is idle for > 4 minutes on AKS it stops receiving events and never closes.
// https://github.com/Azure/AKS/issues/1755
static class Program
{
static Kubernetes k8s;
static Random rand = new Random();
static CancellationToken cancel = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token;
static void Main(string[] args)
{
k8s = new Kubernetes(KubernetesClientConfiguration.BuildConfigFromConfigFile(currentContext: args.Length > 0 ? args[0] : null));
try
{
Task.WaitAll(
CreateWatchModify(1), // watch works and closes after ~30 minutes
CreateWatchModify(2), // watch works and closes after ~30 minutes
CreateWatchModify(3), // watch works and closes after ~30 minutes
CreateWatchModify(4), // watch works and closes after ~30 minutes
CreateWatchModify(5), // watch misses modifications and never closes on AKS
CreateWatchModify(6), // watch misses modifications and never closes on AKS
CreateWatchModify(7), // watch misses modifications and never closes on AKS
CreateWatchModify(8)); // watch misses modifications and never closes on AKS
}
catch (AggregateException ae) when (cancel.IsCancellationRequested && ae.InnerExceptions.All(e => e is TaskCanceledException)) { }
}
// Creates and watches a configmap. Modifies it every intervalMinutes.
static async Task CreateWatchModify(int intervalMinutes)
{
// Create a configmap
var cm = new V1ConfigMap(metadata: new V1ObjectMeta(name: $"watchtest-{intervalMinutes}", namespaceProperty: "default"));
try { await k8s.DeleteNamespacedConfigMapAsync(cm.Metadata.Name, cm.Metadata.NamespaceProperty, cancellationToken: cancel); }
catch (HttpOperationException e) when (e.Response.StatusCode == HttpStatusCode.NotFound) { }
cm = await k8s.CreateNamespacedConfigMapAsync(cm, cm.Metadata.NamespaceProperty, cancellationToken: cancel);
// Watch it
Watcher<V1ConfigMap> watch = await k8s.WatchNamespacedConfigMapAsync(cm.Metadata.Name, cm.Metadata.NamespaceProperty, cancellationToken: cancel);
watch.OnEvent += (WatchEventType t, V1ConfigMap c) => Console.WriteLine($"{DateTime.Now} {c.Metadata.Name} OnEvent {c.Data?.String()} ({t})");
watch.OnError += (Exception e) => Console.WriteLine($"{DateTime.Now} {cm.Metadata.Name} OnError {e}");
watch.OnClosed += () => Console.WriteLine($"{DateTime.Now} {cm.Metadata.Name} OnClosed");
// Modify it every intervalMinutes
await Task.Delay(TimeSpan.FromSeconds(rand.Next(0, 60)), cancellationToken: cancel); // Random initial delay to make log easier to read
for (int i = 0; !cancel.IsCancellationRequested; i++)
{
cm.Data = new Dictionary<string, string> { { "iteration", i.ToString() } };
cm = await k8s.ReplaceNamespacedConfigMapAsync(cm, cm.Metadata.Name, cm.Metadata.NamespaceProperty, cancellationToken: cancel);
Console.WriteLine($"{DateTime.Now} {cm.Metadata.Name} Updated {cm.Data.String()}");
await Task.Delay(TimeSpan.FromMinutes(intervalMinutes), cancellationToken: cancel);
}
}
static string String(this IDictionary<string, string> d) => string.Join(", ", d.Select(p => $"{p.Key}={p.Value}"));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment