Last active
April 13, 2020 19:16
-
-
Save unaizorrilla/a92a7e05b9a4f513fef04335dcf05989 to your computer and use it in GitHub Desktop.
Operator with hosted service and channels
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using HealthCheckOperator.Controller; | |
using HealthCheckOperator.Crd; | |
using HealthCheckOperator.Diagnostics; | |
using k8s; | |
using Microsoft.Extensions.Hosting; | |
using System; | |
using System.Threading; | |
using System.Threading.Channels; | |
using System.Threading.Tasks; | |
namespace HealthCheckOperator.Operator | |
{ | |
public class Operator : IHostedService | |
{ | |
private Watcher<HealthCheckResource> _watcher; | |
private readonly Channel<WatchData> _channel; | |
private readonly CancellationTokenSource _stoppingCts = new CancellationTokenSource(); | |
private readonly IKubernetes _kubernetesClient; | |
private readonly IHealthCheckOperatorController _controller; | |
private readonly OperatorDiagnostics _diagnostics; | |
public Operator( | |
IKubernetes kubernetesClient, | |
IHealthCheckOperatorController controller, | |
OperatorDiagnostics diagnostics) | |
{ | |
_kubernetesClient = kubernetesClient ?? throw new ArgumentNullException(nameof(kubernetesClient)); | |
_controller = controller ?? throw new ArgumentNullException(nameof(controller)); | |
_diagnostics = diagnostics ?? throw new ArgumentNullException(nameof(diagnostics)); | |
_channel = Channel.CreateUnbounded<WatchData>(new UnboundedChannelOptions() | |
{ | |
SingleReader = true, | |
SingleWriter = true | |
}); | |
} | |
public Task StartAsync(CancellationToken cancellationToken) | |
{ | |
_diagnostics.OperatorStarting(); | |
//start the channel reader and the watcher | |
//we are not awaiting the watcher because this is on waiting if not | |
//crd objects are created on K8S and the hosted can't be shutting down | |
//gracefully | |
Task.Run(WatchListener); | |
_ = StartWatcher(_stoppingCts.Token); | |
return Task.CompletedTask; | |
} | |
public Task StopAsync(CancellationToken cancellationToken) | |
{ | |
_diagnostics.OperatorShuttingdown(); | |
_stoppingCts.Cancel(); | |
_channel.Writer.Complete(); | |
if ( _watcher != null ) | |
{ | |
//if response is not finished when hosted service is | |
//on close this object will be null. | |
_watcher.Dispose(); | |
} | |
return Task.CompletedTask; | |
} | |
private async Task StartWatcher(CancellationToken cancellationToken) | |
{ | |
var response = await _kubernetesClient.ListClusterCustomObjectWithHttpMessagesAsync( | |
group: CRDConstants.Group, | |
version: CRDConstants.Version, | |
plural: CRDConstants.Plural, | |
watch: true, | |
timeoutSeconds: ((int)TimeSpan.FromMinutes(60).TotalSeconds), | |
cancellationToken: cancellationToken); | |
_watcher = response.Watch<HealthCheckResource, object>( | |
onEvent: async (type, item) => | |
{ | |
await _channel.Writer.WriteAsync(new WatchData() | |
{ | |
EventType = type, | |
Resource = item | |
}, cancellationToken); | |
}, | |
onClosed: () => | |
{ | |
_watcher.Dispose(); | |
_ = StartWatcher(_stoppingCts.Token); | |
}, | |
onError: e => _diagnostics.WatcherThrow(e)); | |
} | |
private async Task WatchListener() | |
{ | |
while (await _channel.Reader.WaitToReadAsync() && !_stoppingCts.IsCancellationRequested) | |
{ | |
while (_channel.Reader.TryRead(out WatchData watchData)) | |
{ | |
if (watchData.EventType == WatchEventType.Added) | |
{ | |
_diagnostics.OperatorEventAdded(); | |
/* | |
* This is where you write your custom code. Typically you | |
* need to invoke some method on controller to perform the | |
* desired actions when a new CRD object is deployed on K8S. | |
*/ | |
await _controller.Do(watchData.Resource); | |
} | |
else if (watchData.EventType == WatchEventType.Deleted) | |
{ | |
_diagnostics.OperatorEventDeleted(); | |
/* | |
* This is where you write your custom code. Typically you | |
* need to invoke some method on controller to perform the | |
* desired actions when a CRD object is deleted on K8S. | |
*/ | |
await _controller.UnDo(watchData.Resource); | |
} | |
} | |
} | |
} | |
private class WatchData | |
{ | |
public WatchEventType EventType { get; set; } | |
public HealthCheckResource Resource { get; set; } | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment