Skip to content

Instantly share code, notes, and snippets.

@jarlef
Last active January 30, 2023 12:37
Show Gist options
  • Save jarlef/10010515ce9d0f3980f0699d358c29c4 to your computer and use it in GitHub Desktop.
Save jarlef/10010515ce9d0f3980f0699d358c29c4 to your computer and use it in GitHub Desktop.
Hot Chocolate - Extended Source Stream
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using HotChocolate.Execution;
namespace HC.Example;
public class EnumerateMessages<T> : IAsyncEnumerable<T>
{
private readonly ISourceStream<T> _sourceStream;
private readonly Func<Task> _onCompleted;
public EnumerateMessages(ISourceStream<T> sourceStream, Func<Task> onCompleted)
{
_sourceStream = sourceStream;
_onCompleted = onCompleted;
}
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) =>
new WrappedEnumerator<T>(GetAsyncEnumeratorInternally(cancellationToken), CompleteStream);
private async IAsyncEnumerator<T> GetAsyncEnumeratorInternally(CancellationToken cancellationToken = default)
{
await foreach(var item in _sourceStream.ReadEventsAsync().WithCancellation(cancellationToken))
{
if (cancellationToken.IsCancellationRequested)
{
yield break;
}
yield return item;
}
}
private async ValueTask CompleteStream()
{
await _onCompleted();
}
}
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using HotChocolate.Execution;
namespace HC.Example;
public class ExtendedSourceStream<TMessage> : IExtendedSourceStream<TMessage>
{
private readonly ISourceStream<TMessage> _innerStream;
private Func<Task> _action;
public ExtendedSourceStream(ISourceStream<TMessage> innerStream)
{
_innerStream = innerStream;
}
public ISourceStream<TMessage> OnComplete(Func<Task> action)
{
_action = action;
return this;
}
public IAsyncEnumerable<TMessage> ReadEventsAsync()
{
return new EnumerateMessages<TMessage>(_innerStream, _action);
}
public async ValueTask DisposeAsync()
{
await _innerStream.DisposeAsync();
}
IAsyncEnumerable<object> ISourceStream.ReadEventsAsync() => new GenericEnumerateMessages<TMessage>(ReadEventsAsync());
}
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace HC.Example
public class GenericEnumerateMessages<TMessage> : IAsyncEnumerable<object>
{
private readonly IAsyncEnumerable<TMessage> _messages;
public GenericEnumerateMessages(IAsyncEnumerable<TMessage> messages)
{
_messages = messages;
}
public async IAsyncEnumerator<object> GetAsyncEnumerator(
CancellationToken cancellationToken = default)
{
await foreach (var message in _messages.WithCancellation(cancellationToken))
{
yield return message!;
}
}
}
using System;
using System.Threading.Tasks;
using HotChocolate.Execution;
using JetBrains.Annotations;
namespace HC.Example
[UsedImplicitly(ImplicitUseTargetFlags.WithMembers)]
public interface IExtendedSourceStream<TMessage> : ISourceStream<TMessage>
{
ISourceStream<TMessage> OnComplete(Func<Task> action);
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using HotChocolate;
using HotChocolate.Subscriptions;
using HotChocolate.Types;
using Microsoft.EntityFrameworkCore;
namespace HC.Example
[ExtendObjectType(typeof(Subscription))]
public class SomeSubscriptions
{
[SubscribeAndResolve]
public async IAsyncEnumerable<ActiveUsers> OnWatchingScreenStream(
Guid entityId,
Guid userId,
[Service] ITopicEventReceiver eventReceiver,
[Service] IUserCache userCache,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
var key = $"users/entityType/{entityId}";
async Task OnComplete()
{
await userCache.RemoveUser(key, userId);
}
var stream = await eventReceiver.SubscribeAsync<string, ActiveUsers>(key, OnComplete, cancellationToken);
await userCache.AddUser(key, user);
await foreach (var updatedActiveUsers in stream.ReadEventsAsync().WithCancellation(cancellationToken))
{
yield return updatedActiveUsers;
}
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using HotChocolate.Execution;
using HotChocolate.Subscriptions;
namespace HC.Example;
public static class SubscriptionExtensions
{
public static async ValueTask<ISourceStream<TMessage>> SubscribeAsync<TTopic, TMessage>(
this ITopicEventReceiver eventReceiver,
TTopic topic,
Func<Task> onComplete,
CancellationToken cancellationToken = default)
where TTopic : notnull
{
var sourceStream = await eventReceiver.SubscribeAsync<TTopic, TMessage>(topic, cancellationToken);
var extendedSourceStream = new ExtendedSourceStream<TMessage>(sourceStream);
return extendedSourceStream.OnComplete(onComplete);
}
}
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace HC.Example;
public class WrappedEnumerator<T> : IAsyncEnumerator<T>
{
private readonly IAsyncEnumerator<T> _enumerator;
private readonly Func<ValueTask> _dispose;
public WrappedEnumerator(IAsyncEnumerator<T> enumerator, Func<ValueTask> dispose)
{
_enumerator = enumerator;
_dispose = dispose;
}
public T Current => _enumerator.Current;
public ValueTask<bool> MoveNextAsync() => _enumerator.MoveNextAsync();
public async ValueTask DisposeAsync()
{
await _enumerator.DisposeAsync();
await _dispose();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment