Skip to content

Instantly share code, notes, and snippets.

@unaizorrilla
Last active April 13, 2020 19:16
Show Gist options
  • Save unaizorrilla/a92a7e05b9a4f513fef04335dcf05989 to your computer and use it in GitHub Desktop.
Save unaizorrilla/a92a7e05b9a4f513fef04335dcf05989 to your computer and use it in GitHub Desktop.
Operator with hosted service and channels
using HealthCheckOperator.Controller;
using HealthCheckOperator.Crd;
using HealthCheckOperator.Diagnostics;
using k8s;
using Microsoft.Extensions.Hosting;
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace HealthCheckOperator.Operator
{
public class Operator : IHostedService
{
private Watcher<HealthCheckResource> _watcher;
private readonly Channel<WatchData> _channel;
private readonly CancellationTokenSource _stoppingCts = new CancellationTokenSource();
private readonly IKubernetes _kubernetesClient;
private readonly IHealthCheckOperatorController _controller;
private readonly OperatorDiagnostics _diagnostics;
public Operator(
IKubernetes kubernetesClient,
IHealthCheckOperatorController controller,
OperatorDiagnostics diagnostics)
{
_kubernetesClient = kubernetesClient ?? throw new ArgumentNullException(nameof(kubernetesClient));
_controller = controller ?? throw new ArgumentNullException(nameof(controller));
_diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics));
_channel = Channel.CreateUnbounded<WatchData>(new UnboundedChannelOptions()
{
SingleReader = true,
SingleWriter = true
});
}
public Task StartAsync(CancellationToken cancellationToken)
{
_diagnostics.OperatorStarting();
//start the channel reader and the watcher
//we are not awaiting the watcher because this is on waiting if not
//crd objects are created on K8S and the hosted can't be shutting down
//gracefully
Task.Run(WatchListener);
_ = StartWatcher(_stoppingCts.Token);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_diagnostics.OperatorShuttingdown();
_stoppingCts.Cancel();
_channel.Writer.Complete();
if ( _watcher != null )
{
//if response is not finished when hosted service is
//on close this object will be null.
_watcher.Dispose();
}
return Task.CompletedTask;
}
private async Task StartWatcher(CancellationToken cancellationToken)
{
var response = await _kubernetesClient.ListClusterCustomObjectWithHttpMessagesAsync(
group: CRDConstants.Group,
version: CRDConstants.Version,
plural: CRDConstants.Plural,
watch: true,
timeoutSeconds: ((int)TimeSpan.FromMinutes(60).TotalSeconds),
cancellationToken: cancellationToken);
_watcher = response.Watch<HealthCheckResource, object>(
onEvent: async (type, item) =>
{
await _channel.Writer.WriteAsync(new WatchData()
{
EventType = type,
Resource = item
}, cancellationToken);
},
onClosed: () =>
{
_watcher.Dispose();
_ = StartWatcher(_stoppingCts.Token);
},
onError: e => _diagnostics.WatcherThrow(e));
}
private async Task WatchListener()
{
while (await _channel.Reader.WaitToReadAsync() && !_stoppingCts.IsCancellationRequested)
{
while (_channel.Reader.TryRead(out WatchData watchData))
{
if (watchData.EventType == WatchEventType.Added)
{
_diagnostics.OperatorEventAdded();
/*
* This is where you write your custom code. Typically you
* need to invoke some method on controller to perform the
* desired actions when a new CRD object is deployed on K8S.
*/
await _controller.Do(watchData.Resource);
}
else if (watchData.EventType == WatchEventType.Deleted)
{
_diagnostics.OperatorEventDeleted();
/*
* This is where you write your custom code. Typically you
* need to invoke some method on controller to perform the
* desired actions when a CRD object is deleted on K8S.
*/
await _controller.UnDo(watchData.Resource);
}
}
}
}
private class WatchData
{
public WatchEventType EventType { get; set; }
public HealthCheckResource Resource { get; set; }
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment