Last active
January 30, 2023 12:37
-
-
Save jarlef/10010515ce9d0f3980f0699d358c29c4 to your computer and use it in GitHub Desktop.
Hot Chocolate - Extended Source Stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using HotChocolate.Execution; | |
namespace HC.Example; | |
public class EnumerateMessages<T> : IAsyncEnumerable<T> | |
{ | |
private readonly ISourceStream<T> _sourceStream; | |
private readonly Func<Task> _onCompleted; | |
public EnumerateMessages(ISourceStream<T> sourceStream, Func<Task> onCompleted) | |
{ | |
_sourceStream = sourceStream; | |
_onCompleted = onCompleted; | |
} | |
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) => | |
new WrappedEnumerator<T>(GetAsyncEnumeratorInternally(cancellationToken), CompleteStream); | |
private async IAsyncEnumerator<T> GetAsyncEnumeratorInternally(CancellationToken cancellationToken = default) | |
{ | |
await foreach(var item in _sourceStream.ReadEventsAsync().WithCancellation(cancellationToken)) | |
{ | |
if (cancellationToken.IsCancellationRequested) | |
{ | |
yield break; | |
} | |
yield return item; | |
} | |
} | |
private async ValueTask CompleteStream() | |
{ | |
await _onCompleted(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading.Tasks; | |
using HotChocolate.Execution; | |
namespace HC.Example; | |
public class ExtendedSourceStream<TMessage> : IExtendedSourceStream<TMessage> | |
{ | |
private readonly ISourceStream<TMessage> _innerStream; | |
private Func<Task> _action; | |
public ExtendedSourceStream(ISourceStream<TMessage> innerStream) | |
{ | |
_innerStream = innerStream; | |
} | |
public ISourceStream<TMessage> OnComplete(Func<Task> action) | |
{ | |
_action = action; | |
return this; | |
} | |
public IAsyncEnumerable<TMessage> ReadEventsAsync() | |
{ | |
return new EnumerateMessages<TMessage>(_innerStream, _action); | |
} | |
public async ValueTask DisposeAsync() | |
{ | |
await _innerStream.DisposeAsync(); | |
} | |
IAsyncEnumerable<object> ISourceStream.ReadEventsAsync() => new GenericEnumerateMessages<TMessage>(ReadEventsAsync()); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Collections.Generic; | |
using System.Threading; | |
using System.Threading.Tasks; | |
namespace HC.Example | |
public class GenericEnumerateMessages<TMessage> : IAsyncEnumerable<object> | |
{ | |
private readonly IAsyncEnumerable<TMessage> _messages; | |
public GenericEnumerateMessages(IAsyncEnumerable<TMessage> messages) | |
{ | |
_messages = messages; | |
} | |
public async IAsyncEnumerator<object> GetAsyncEnumerator( | |
CancellationToken cancellationToken = default) | |
{ | |
await foreach (var message in _messages.WithCancellation(cancellationToken)) | |
{ | |
yield return message!; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading.Tasks; | |
using HotChocolate.Execution; | |
using JetBrains.Annotations; | |
namespace HC.Example | |
[UsedImplicitly(ImplicitUseTargetFlags.WithMembers)] | |
public interface IExtendedSourceStream<TMessage> : ISourceStream<TMessage> | |
{ | |
ISourceStream<TMessage> OnComplete(Func<Task> action); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Runtime.CompilerServices; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using HotChocolate; | |
using HotChocolate.Subscriptions; | |
using HotChocolate.Types; | |
using Microsoft.EntityFrameworkCore; | |
namespace HC.Example | |
[ExtendObjectType(typeof(Subscription))] | |
public class SomeSubscriptions | |
{ | |
[SubscribeAndResolve] | |
public async IAsyncEnumerable<ActiveUsers> OnWatchingScreenStream( | |
Guid entityId, | |
Guid userId, | |
[Service] ITopicEventReceiver eventReceiver, | |
[Service] IUserCache userCache, | |
[EnumeratorCancellation] CancellationToken cancellationToken) | |
{ | |
var key = $"users/entityType/{entityId}"; | |
async Task OnComplete() | |
{ | |
await userCache.RemoveUser(key, userId); | |
} | |
var stream = await eventReceiver.SubscribeAsync<string, ActiveUsers>(key, OnComplete, cancellationToken); | |
await userCache.AddUser(key, user); | |
await foreach (var updatedActiveUsers in stream.ReadEventsAsync().WithCancellation(cancellationToken)) | |
{ | |
yield return updatedActiveUsers; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using HotChocolate.Execution; | |
using HotChocolate.Subscriptions; | |
namespace HC.Example; | |
public static class SubscriptionExtensions | |
{ | |
public static async ValueTask<ISourceStream<TMessage>> SubscribeAsync<TTopic, TMessage>( | |
this ITopicEventReceiver eventReceiver, | |
TTopic topic, | |
Func<Task> onComplete, | |
CancellationToken cancellationToken = default) | |
where TTopic : notnull | |
{ | |
var sourceStream = await eventReceiver.SubscribeAsync<TTopic, TMessage>(topic, cancellationToken); | |
var extendedSourceStream = new ExtendedSourceStream<TMessage>(sourceStream); | |
return extendedSourceStream.OnComplete(onComplete); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading.Tasks; | |
namespace HC.Example; | |
public class WrappedEnumerator<T> : IAsyncEnumerator<T> | |
{ | |
private readonly IAsyncEnumerator<T> _enumerator; | |
private readonly Func<ValueTask> _dispose; | |
public WrappedEnumerator(IAsyncEnumerator<T> enumerator, Func<ValueTask> dispose) | |
{ | |
_enumerator = enumerator; | |
_dispose = dispose; | |
} | |
public T Current => _enumerator.Current; | |
public ValueTask<bool> MoveNextAsync() => _enumerator.MoveNextAsync(); | |
public async ValueTask DisposeAsync() | |
{ | |
await _enumerator.DisposeAsync(); | |
await _dispose(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment