Created
November 29, 2023 23:11
-
-
Save rtlsilva/628daf6e62bf9eaabb2b60974bf04311 to your computer and use it in GitHub Desktop.
A reactive sequencing library for coordinating time-based operations
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using UniRx; | |
namespace MyNamespace.Sequencing { | |
/// <summary> | |
/// Defines functionality for providing observable sequencing capabilities. | |
/// </summary> | |
public interface ISequencer : IObservable<Unit> { | |
/// <summary> | |
/// Adds the given <paramref name="observable"/> to this <see cref="ISequencer"/>. | |
/// </summary> | |
/// <param name="observable">The observable to add to the sequence.</param> | |
/// <returns>The newly added observable as an <see langword="object"/>.</returns> | |
object Add(IObservable<Unit> observable); | |
/// <summary> | |
/// Adds the given <paramref name="selector"/> to this <see cref="ISequencer"/>. | |
/// </summary> | |
/// <param name="selector">The unit observable selector function to add to the sequence.</param> | |
/// <returns>The newly added selector as an <see langword="object"/>.</returns> | |
object Add(Func<IObservable<Unit>> selector); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using UniRx; | |
namespace MyNamespace.Sequencing { | |
/// <summary> | |
/// Base class for sequencers. | |
/// </summary> | |
public abstract class SequencerBase { | |
/// <summary> | |
/// Gets a unit observable from a specified <see langword="object"/>. | |
/// <para>If the object is of a type compatible with a unit observable, it is converted and returned as one.</para> | |
/// <para>If the object is a function returning a unit observable, it is wrapped in an <see cref="Observable.Defer{T}(Func{IObservable{T}})"/>, | |
/// making sure its execution is deferred until subscription (i.e. makes it cold).</para> | |
/// </summary> | |
/// <param name="item">The item to get a unit observable from.</param> | |
/// <returns>The specified object as a unit observable.</returns> | |
protected static IObservable<Unit> GetObservableFromItem(object item) => | |
item as IObservable<Unit> ?? Observable.Defer((Func<IObservable<Unit>>)item); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using UniRx; | |
namespace MyNamespace.Sequencing { | |
/// <summary> | |
/// Base class for <see cref="Sequence"/> or <see cref="Parallel"/> sequencers. | |
/// </summary> | |
public abstract class SequenceOrParallelBase : SequencerBase, ISequencer { | |
private readonly Action<ISequencer> action; | |
private readonly List<object> items = new List<object>(); | |
/// <summary> | |
/// Initializes a new instance of the <see cref="SequenceOrParallelBase"/> class with the given <paramref name="action"/>. | |
/// </summary> | |
/// <param name="action">The action that will be executed to retrieve observable sequences passed during construction.</param> | |
protected SequenceOrParallelBase(Action<ISequencer> action = null) { | |
this.action = action; | |
} | |
/// <inheritdoc/> | |
public object Add(IObservable<Unit> observable) { | |
this.items.Add(observable); | |
return observable; | |
} | |
/// <inheritdoc/> | |
public object Add(Func<IObservable<Unit>> selector) { | |
this.items.Add(selector); | |
return selector; | |
} | |
/// <inheritdoc/> | |
public abstract IDisposable Subscribe(IObserver<Unit> observer); | |
/// <summary> | |
/// Retrieves the unit observables currently held by this sequencer. | |
/// </summary> | |
/// <returns>An enumeration of the unit observables currently in this sequencer.</returns> | |
protected IEnumerable<IObservable<Unit>> GetObservables() { | |
var observables = this.items.Select(GetObservableFromItem).ToList(); | |
this.action?.Invoke(this); | |
return this.items | |
.Select(GetObservableFromItem) | |
.Except(observables) | |
.Concat(observables); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using UniRx; | |
using MyNamespace.Extensions; | |
namespace MyNamespace.Sequencing { | |
/// <summary> | |
/// An observable <see cref="ISequencer"/> that executes observables sequentially. | |
/// It completes once its last child has completed. | |
/// </summary> | |
public sealed class Sequence : SequenceOrParallelBase { | |
/// <summary> | |
/// Creates a new sequence. | |
/// </summary> | |
/// <param name="action">A lambda expression that takes the newly created sequence, so steps may be added to it.</param> | |
/// <returns>The newly created sequence.</returns> | |
public static Sequence Create(Action<ISequencer> action = null) => | |
new Sequence(action); | |
/// <summary> | |
/// Creates a new sequence. | |
/// </summary> | |
/// <param name="selectors">A variable amount of observable selector functions whose results to add to the sequence.</param> | |
/// <returns>The newly created sequence.</returns> | |
public static Sequence Create(params Func<IObservable<Unit>>[] selectors) => | |
Create(seq => selectors.ForEach(selector => seq.Add(selector()))); | |
/// <summary> | |
/// Creates a new sequence. | |
/// </summary> | |
/// <param name="observables">An enumeration of observables to add to the sequence.</param> | |
/// <returns>The newly created sequence.</returns> | |
public static Sequence Create(IEnumerable<IObservable<Unit>> observables) => | |
Create(seq => observables.ForEach(observable => seq.Add(observable))); | |
/// <summary> | |
/// Creates a new sequence. | |
/// </summary> | |
/// <param name="observables">A variable amount of observables to add to the sequence.</param> | |
/// <returns>The newly created sequence.</returns> | |
public static Sequence Create(params IObservable<Unit>[] observables) => | |
Create(seq => observables.ForEach(observable => seq.Add(observable))); | |
/// <summary> | |
/// Creates a new sequence and immediately subscribes to it, triggering its execution. | |
/// </summary> | |
/// <param name="action">A lambda expression that takes the newly created sequence, so steps may be added to it.</param> | |
/// <returns>An IDisposable to allow unsubscription from the sequence, cancelling it.</returns> | |
public static IDisposable Start(Action<ISequencer> action) => | |
Create(action).AutoDetach().Subscribe(); | |
/// <summary> | |
/// Creates a new sequence and immediately subscribes to it, triggering its execution. | |
/// </summary> | |
/// <param name="action">A lambda expression that takes the newly created sequence, so steps may be added to it.</param> | |
/// <param name="onError">An action to execute on error.</param> | |
/// <returns>An IDisposable to allow unsubscription from the live sequence, cancelling it.</returns> | |
public static IDisposable Start(Action<ISequencer> action, Action<Exception> onError) => | |
Create(action).AutoDetach().DoOnError(onError).Subscribe(); | |
/// <summary> | |
/// Creates a new sequence and immediately subscribes to it, triggering its execution. | |
/// </summary> | |
/// <param name="observables">A variable amount of observables to add to the sequence.</param> | |
/// <returns>The newly created sequence.</returns> | |
public static IDisposable Start(params IObservable<Unit>[] observables) => | |
Create(observables).AutoDetach().Subscribe(); | |
private Sequence(Action<ISequencer> action = null) : base(action) { | |
} | |
/// <summary> | |
/// Subscribes to this observable, notifying it that the given <paramref name="observer"/> is ready to receive notifications. | |
/// </summary> | |
/// <param name="observer">The object that is to receive notifications.</param> | |
/// <returns>A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.</returns> | |
public override IDisposable Subscribe(IObserver<Unit> observer) => | |
this.GetObservables() | |
.Concat() | |
.Subscribe(observer); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using MyNamespace.Extensions; | |
using UniRx; | |
namespace MyNamespace.Sequencing { | |
/// <summary> | |
/// An observable <see cref="ISequencer"/> that executes observables in a queue sequentially as soon as possible. | |
/// When it is subscribed to, it starts waiting for new additions to the queue until it is manually disposed of. | |
/// </summary> | |
public class LiveSequence : SequencerBase, ISequencer, IDisposable { | |
/// <summary> | |
/// Creates a new live sequence. | |
/// </summary> | |
/// <param name="action">A lambda expression that takes the newly created live sequence, so steps may be added to it.</param> | |
/// <returns>The newly created live sequence.</returns> | |
public static LiveSequence Create(Action<ISequencer> action) { | |
var liveSequence = new LiveSequence(); | |
action?.Invoke(liveSequence); | |
return liveSequence; | |
} | |
/// <summary> | |
/// Creates a new live sequence and immediately subscribes to it, triggering its execution. | |
/// </summary> | |
/// <returns>The newly created live sequence.</returns> | |
public static LiveSequence Start() { | |
var liveSequence = new LiveSequence(); | |
liveSequence.SubscribeAndForget(); | |
return liveSequence; | |
} | |
/// <summary> | |
/// Creates a new live sequence and immediately subscribes to it, triggering its execution. | |
/// </summary> | |
/// <param name="action">A lambda expression that takes the newly created live sequence, so steps may be added to it.</param> | |
/// <returns>An IDisposable to allow unsubscription from the live sequence, cancelling it.</returns> | |
public static IDisposable Start(Action<ISequencer> action) => | |
Create(action).AutoDetach().Subscribe(); | |
/// <summary> | |
/// Creates a new live sequence and immediately subscribes to it, triggering its execution. | |
/// </summary> | |
/// <param name="action">A lambda expression that takes the newly created live sequence, so steps may be added to it.</param> | |
/// <param name="onError">An action to execute on error.</param> | |
/// <returns>An IDisposable to allow unsubscription from the live sequence, cancelling it.</returns> | |
public static IDisposable Start(Action<ISequencer> action, Action<Exception> onError) => | |
Create(action).AutoDetach().DoOnError(onError).Subscribe(); | |
private Queue<object> queuedItems = new Queue<object>(); | |
private IDisposable currentExecution; | |
private bool isStarted; | |
private bool isExecuting; | |
private Lapse subscriptionLapse; | |
/// <summary> | |
/// Subscribes to this observable, notifying it that the given <paramref name="observer"/> is ready to receive notifications. | |
/// </summary> | |
/// <param name="observer">The object that is to receive notifications.</param> | |
/// <returns>A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.</returns> | |
public IDisposable Subscribe(IObserver<Unit> observer) { | |
if (!this.isStarted) { | |
this.subscriptionLapse = Lapse.Create(); | |
this.isStarted = true; | |
this.StartNext(); | |
} | |
return new CompositeDisposable( | |
Disposable.Create(this.Complete), | |
this.subscriptionLapse.Subscribe(observer)); | |
} | |
/// <summary> | |
/// Adds the given <paramref name="observable"/> to this live sequence. | |
/// </summary> | |
public object Add(IObservable<Unit> observable) => | |
this.AddInternal(observable); | |
/// <summary> | |
/// Adds the observable returned by the given <paramref name="selector"/> to this live sequence. | |
/// </summary> | |
public object Add(Func<IObservable<Unit>> selector) => | |
this.AddInternal(selector); | |
private object AddInternal(object observable) { | |
this.queuedItems.Enqueue(observable); | |
this.StartNext(); | |
return observable; | |
} | |
/// <summary> | |
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. | |
/// </summary> | |
public void Dispose() { | |
this.Complete(); | |
} | |
/// <summary> | |
/// Stops the live sequence and disposes of the currently executing observable, if any. | |
/// </summary> | |
public void Complete() { | |
if (!this.isStarted) { | |
return; | |
} | |
this.subscriptionLapse.Dispose(); | |
this.subscriptionLapse = null; | |
this.isStarted = false; | |
this.currentExecution?.Dispose(); | |
} | |
/// <summary> | |
/// Removes all observables from the sequence, including the currently executing one, if any. | |
/// </summary> | |
public void Clear() { | |
this.queuedItems.Clear(); | |
this.isExecuting = false; | |
this.currentExecution?.Dispose(); | |
} | |
/// <summary> | |
/// Removes observables from the sequence, starting with the given one and up to the end. | |
/// <para/>The currently executing observable, if any, is not affected by this operation because it is no longer in the queue. | |
/// </summary> | |
/// <param name="observable">The observable to start truncating from.</param> | |
/// <returns>Whether the given observable was found and the truncation occurred.</returns> | |
public bool TruncateBefore(object observable) => | |
Truncate(observable, isInclusive: true); | |
/// <summary> | |
/// Removes observables from the sequence, starting after the given one and up to the end. | |
/// <para/>The currently executing observable, if any, is not affected by this operation because it is no longer in the queue. | |
/// </summary> | |
/// <param name="observable">The observable after which to start truncating from.</param> | |
/// <returns>Whether the given observable was found and the truncation occurred.</returns> | |
public bool TruncateAfter(object observable) => | |
Truncate(observable, isInclusive: false); | |
/// <summary> | |
/// Removes observables from sequence, starting with the first one and up to before the given observable. | |
/// <para/>The currently executing observable, if any, is cancelled/disposed and the | |
/// next observable remaining in the queue, if any, is executed. | |
/// </summary> | |
/// <param name="observable">The observable before which to stop skipping.</param> | |
/// <returns>Whether the given observable was found and the skipping occurred.</returns> | |
public bool SkipBefore(object observable) => | |
Skip(observable, isInclusive: false); | |
/// <summary> | |
/// Removes observables from sequence, starting with the first one and up to the given observable. | |
/// <para/>The currently executing observable, if any, is cancelled/disposed and the | |
/// next observable remaining in the queue, if any, is executed. | |
/// </summary> | |
/// <param name="observable">The observable at which to stop skipping.</param> | |
/// <returns>Whether the given observable was found and the skipping occurred.</returns> | |
public bool SkipAfter(object observable) => | |
Skip(observable, isInclusive: true); | |
private bool Skip(object observable, bool isInclusive) { | |
if (!this.queuedItems.Contains(observable)) { | |
return false; | |
} | |
this.isExecuting = false; | |
while (this.queuedItems.Peek() != observable) { | |
this.queuedItems.Dequeue(); | |
} | |
if (isInclusive) { | |
this.queuedItems.Dequeue(); | |
} | |
this.currentExecution?.Dispose(); | |
if (this.isStarted && !this.isExecuting) { | |
this.StartNext(); | |
} | |
return true; | |
} | |
private bool Truncate(object observable, bool isInclusive) { | |
var index = this.queuedItems.IndexOfOrNull(observable); | |
if (index == null) { | |
return false; | |
} | |
var count = index.Value + (isInclusive ? 0 : 1); | |
this.queuedItems = new Queue<object>(this.queuedItems.Take(count)); | |
return true; | |
} | |
private void StartNext() { | |
if (!this.isStarted || this.isExecuting || this.queuedItems.Count == 0) { | |
return; | |
} | |
this.isExecuting = true; | |
var item = this.queuedItems.Dequeue(); | |
var observable = GetObservableFromItem(item); | |
this.currentExecution = observable | |
.Finally(() => { | |
this.isExecuting = false; | |
this.StartNext(); | |
}) | |
.Subscribe(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using UniRx; | |
using MyNamespace.Extensions; | |
namespace MyNamespace.Sequencing { | |
/// <summary> | |
/// An observable <see cref="ISequencer"/> that executes observables in parallel. | |
/// It completes once all its children have completed. | |
/// </summary> | |
public sealed class Parallel : SequenceOrParallelBase { | |
/// <summary> | |
/// Creates a new parallel. | |
/// </summary> | |
/// <param name="action">A lambda expression that takes the newly created parallel, so steps may be added to it.</param> | |
/// <returns>The newly created parallel.</returns> | |
public static Parallel Create(Action<ISequencer> action = null) => | |
new Parallel(action); | |
/// <summary> | |
/// Creates a new parallel. | |
/// </summary> | |
/// <param name="selectors">A variable amount of observable selector functions whose results to add to the parallel.</param> | |
/// <returns>The newly created parallel.</returns> | |
public static Parallel Create(params Func<IObservable<Unit>>[] selectors) => | |
Create(parallel => selectors.ForEach(selector => parallel.Add(selector()))); | |
/// <summary> | |
/// Creates a new parallel. | |
/// </summary> | |
/// <param name="observables">An enumeration of observables to add to the parallel.</param> | |
/// <returns>The newly created parallel.</returns> | |
public static Parallel Create(IEnumerable<IObservable<Unit>> observables) => | |
Create(parallel => observables.ForEach(observable => parallel.Add(observable))); | |
/// <summary> | |
/// Creates a new parallel. | |
/// </summary> | |
/// <param name="observables">A variable amount of observables to add to the parallel.</param> | |
/// <returns>The newly created parallel.</returns> | |
public static Parallel Create(params IObservable<Unit>[] observables) => | |
Create(parallel => observables.ForEach(observable => parallel.Add(observable))); | |
/// <summary> | |
/// Creates a new parallel and immediately subscribes to it, triggering its execution. | |
/// </summary> | |
/// <param name="action">A lambda expression that takes the newly created parallel, so steps may be added to it.</param> | |
/// <returns>An IDisposable to allow unsubscription from the parallel, cancelling it.</returns> | |
public static IDisposable Start(Action<ISequencer> action) => | |
Create(action).AutoDetach().Subscribe(); | |
/// <summary> | |
/// Creates a new live sequence and immediately subscribes to it, triggering its execution. | |
/// </summary> | |
/// <param name="action">A lambda expression that takes the newly created live sequence, so steps may be added to it.</param> | |
/// <param name="onError">An action to execute on error.</param> | |
/// <returns>An IDisposable to allow unsubscription from the live sequence, cancelling it.</returns> | |
public static IDisposable Start(Action<ISequencer> action, Action<Exception> onError) => | |
Create(action).AutoDetach().DoOnError(onError).Subscribe(); | |
/// <summary> | |
/// Creates a new parallel and immediately subscribes to it, triggering its execution. | |
/// </summary> | |
/// <param name="observables">A variable amount of observables to add to the parallel.</param> | |
/// <returns>An IDisposable to allow unsubscription from the parallel, cancelling it.</returns> | |
public static IDisposable Start(params IObservable<Unit>[] observables) => | |
Create(observables).AutoDetach().Subscribe(); | |
private Parallel(Action<ISequencer> action = null) : base(action) { | |
} | |
/// <summary> | |
/// Subscribes to this observable, notifying it that the given <paramref name="observer"/> is ready to receive notifications. | |
/// </summary> | |
/// <param name="observer">The object that is to receive notifications.</param> | |
/// <returns>A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.</returns> | |
public override IDisposable Subscribe(IObserver<Unit> observer) => | |
this.GetObservables() | |
.WhenAll() | |
.Subscribe(observer); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using UniRx; | |
namespace MyNamespace.Sequencing { | |
/// <summary> | |
/// An <see cref="ISequencer"/> that decorates <see cref="Parallel"/> sequencers to add support for timeline-like sequencing. | |
/// </summary> | |
internal class AtSequencer : ISequencer { | |
private readonly ISequencer innerSequencer; | |
private readonly float delay; | |
private readonly IScheduler scheduler; | |
/// <summary> | |
/// Initializes a new instance of the <see cref="AtSequencer"/> class with the given <paramref name="innerSequencer"/> and <paramref name="delay"/>. | |
/// </summary> | |
/// <param name="innerSequencer">The <see cref="ISequencer"/> that will sequence the items added to this one. Must be a <see cref="Parallel"/>.</param> | |
/// <param name="delay">The time offset, expressed in seconds since the start of the <paramref name="innerSequencer"/>, | |
/// that this <see cref="ISequencer"/> will add to items added to it.</param> | |
/// <param name="scheduler">The time scheduler that this sequencer will use for sequencing items added to it.</param> | |
public AtSequencer(ISequencer innerSequencer, float delay, IScheduler scheduler) { | |
if (!(innerSequencer is Parallel)) { | |
throw new NotSupportedException("The ISequencer.At() extension method can only be used on Parallel sequencers"); | |
} | |
this.innerSequencer = innerSequencer; | |
this.delay = delay; | |
this.scheduler = scheduler; | |
} | |
/// <inheritdoc/> | |
public IDisposable Subscribe(IObserver<Unit> observer) => | |
this.innerSequencer.Subscribe(); | |
/// <inheritdoc/> | |
public object Add(IObservable<Unit> observable) => | |
this.innerSequencer.AddSequence(seq => { | |
seq.AddDelay(this.delay, this.scheduler); | |
seq.Add(observable); | |
}); | |
/// <inheritdoc/> | |
public object Add(Func<IObservable<Unit>> selector) => | |
this.innerSequencer.AddSequence(seq => { | |
seq.AddDelay(this.delay, this.scheduler); | |
seq.Add(selector); | |
}); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using UniRx; | |
namespace MyNamespace.Sequencing { | |
/// <summary> | |
/// An observable that completes instantly. | |
/// Can be used as e.g. a marker or checkpoint in sequencers. | |
/// </summary> | |
public class Instant : IObservable<Unit> { | |
private readonly Action action; | |
/// <summary> | |
/// The default immutable <see cref="Instant"/> instance. | |
/// </summary> | |
public static readonly IObservable<Unit> Default = new Instant(); | |
/// <summary> | |
/// Initializes a new instance of the <see cref="Instant"/> class. | |
/// </summary> | |
/// <param name="action">An optional action to be executed when this observable is subscribed to.</param> | |
public Instant(Action action = null) { | |
this.action = action; | |
} | |
/// <summary> | |
/// Subscribes to this observable, notifying it that the given <paramref name="observer"/> is ready to receive notifications. | |
/// </summary> | |
/// <param name="observer">The object that is to receive notifications.</param> | |
/// <returns>A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.</returns> | |
public IDisposable Subscribe(IObserver<Unit> observer) { | |
try { | |
this.action?.Invoke(); | |
} catch (Exception ex) { | |
return Observable | |
.Throw<Unit>(ex) | |
.Subscribe(observer); | |
} | |
observer.OnCompleted(); | |
return Disposable.Empty; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using UniRx; | |
namespace MyNamespace.Sequencing { | |
/// <summary> | |
/// An observable that completes only when it is disposed of. | |
/// </summary> | |
public class Lapse : IObservable<Unit>, IDisposable { | |
private readonly Action<IDisposable> action; | |
private bool isSubscribed; | |
private bool isDisposed; | |
private Subject<Unit> subject; | |
/// <summary> | |
/// Creates a new lapse. | |
/// </summary> | |
/// <param name="action">An optional action that takes the newly created lapse as an <see cref="IDisposable"/>, to be executed when this observable is subscribed to.</param> | |
/// <returns>The newly created lapse.</returns> | |
public static Lapse Create(Action<IDisposable> action = null) => | |
new Lapse(action); | |
/// <summary> | |
/// Initializes a new instance of the <see cref="Lapse"/> class. | |
/// </summary> | |
protected Lapse() { } | |
/// <summary> | |
/// Initializes a new instance of the <see cref="Lapse"/> class. | |
/// </summary> | |
/// <param name="action">An optional action that takes the newly created lapse as an <see cref="IDisposable"/>, to be executed when this observable is subscribed to.</param> | |
protected Lapse(Action<IDisposable> action) { | |
this.action = action; | |
} | |
/// <summary> | |
/// Subscribes to this observable, notifying it that the given <paramref name="observer"/> is ready to receive notifications. | |
/// </summary> | |
/// <param name="observer">The object that is to receive notifications.</param> | |
/// <returns>A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.</returns> | |
public IDisposable Subscribe(IObserver<Unit> observer) { | |
if (this.isDisposed) { | |
throw new ObjectDisposedException(nameof(Lapse)); | |
} | |
if (this.subject == null) { | |
this.subject = new Subject<Unit>(); | |
} | |
if (!this.isSubscribed) { | |
this.isSubscribed = true; | |
this.action?.Invoke(this); | |
if (this.isDisposed) { | |
observer.OnCompleted(); | |
return Disposable.Empty; | |
} | |
} | |
return this.subject.Subscribe(observer); | |
} | |
/// <summary> | |
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. | |
/// </summary> | |
public void Dispose() { | |
if (this.isDisposed) { | |
return; | |
} | |
if (this.subject != null) { | |
this.subject.OnCompleted(); | |
this.subject = null; | |
} | |
this.isDisposed = true; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using UniRx; | |
using MyNamespace.Extensions; | |
namespace MyNamespace.Sequencing { | |
/// <summary> | |
/// Extension methods for working with <see cref="ISequencer"/>s. | |
/// </summary> | |
public static class ISequencerExtensions { | |
/// <summary> | |
/// Adds the given <paramref name="observable"/> to this <paramref name="sequencer"/>. | |
/// </summary> | |
/// <typeparam name="T">The type of the observable's values.</typeparam> | |
public static object Add<T>(this ISequencer sequencer, IObservable<T> observable) => | |
sequencer.Add(observable.AsSingleUnitObservable()); | |
/// <summary> | |
/// Adds the observable returned by the given <paramref name="observableFactory"/> to this <paramref name="sequencer"/>. | |
/// </summary> | |
/// <typeparam name="T">The type of the observable's values.</typeparam> | |
public static object Add<T>(this ISequencer sequencer, Func<IObservable<T>> observableFactory) => | |
sequencer.Add(Observable.Defer(observableFactory)); | |
/// <summary> | |
/// Creates a new <see cref="Parallel"/> and adds it to this <paramref name="sequencer"/>. | |
/// </summary> | |
public static object AddParallel(this ISequencer sequencer, Action<ISequencer> action) => | |
sequencer.Add(() => Parallel.Create(action)); | |
/// <summary> | |
/// Creates a new <see cref="Parallel"/> and adds it to this <paramref name="sequencer"/>. | |
/// </summary> | |
public static object AddParallel(this ISequencer sequencer, params Func<IObservable<Unit>>[] selectors) => | |
sequencer.Add(() => Parallel.Create(selectors)); | |
/// <summary> | |
/// Creates a new <see cref="Parallel"/> and adds it to this <paramref name="sequencer"/>. | |
/// </summary> | |
public static object AddParallel(this ISequencer sequencer, IEnumerable<IObservable<Unit>> observables) => | |
sequencer.Add(() => Parallel.Create(observables)); | |
/// <summary> | |
/// Creates a new <see cref="Sequence"/> and adds it to this <paramref name="sequencer"/>. | |
/// </summary> | |
public static object AddSequence(this ISequencer sequencer, Action<ISequencer> action) => | |
sequencer.Add(() => Sequence.Create(action)); | |
/// <summary> | |
/// Creates a new <see cref="Sequence"/> and adds it to this <paramref name="sequencer"/>. | |
/// </summary> | |
public static object AddSequence(this ISequencer sequencer, params Func<IObservable<Unit>>[] selectors) => | |
sequencer.Add(() => Sequence.Create(selectors)); | |
/// <summary> | |
/// Creates a new <see cref="Sequence"/> and adds it to this <paramref name="sequencer"/>. | |
/// </summary> | |
public static object AddSequence(this ISequencer sequencer, IEnumerable<IObservable<Unit>> observables) => | |
sequencer.Add(() => Sequence.Create(observables)); | |
/// <summary> | |
/// Creates a new <see cref="LiveSequence"/> and adds it to this <paramref name="sequencer"/>. | |
/// </summary> | |
public static LiveSequence AddLiveSequence(this ISequencer sequencer) { | |
var liveSequence = new LiveSequence(); | |
sequencer.Add(liveSequence); | |
return liveSequence; | |
} | |
/// <summary> | |
/// Creates a new <see cref="LiveSequence"/> and adds it to this <paramref name="sequencer"/>. | |
/// </summary> | |
public static LiveSequence AddLiveSequence(this ISequencer sequencer, Action<ISequencer> action) { | |
var liveSequence = LiveSequence.Create(action); | |
sequencer.Add(liveSequence); | |
return liveSequence; | |
} | |
/// <summary> | |
/// Adds a new <see cref="Instant"/> to this <paramref name="sequencer"/>. | |
/// </summary> | |
/// <returns>The observable added to the sequencer.</returns> | |
public static object AddInstant(this ISequencer sequencer) => | |
sequencer.Add(() => new Instant()); | |
/// <summary> | |
/// Adds an instantaneous step to this <paramref name="sequencer"/>. This is synchronous code that will execute at a specific point in a sequence. | |
/// </summary> | |
/// <param name="sequencer">The sequencer that the action is being added to.</param> | |
/// <param name="action">The action being added to the sequencer.</param> | |
/// <returns>The observable added to the sequencer.</returns> | |
public static object AddAction(this ISequencer sequencer, Action action) => | |
sequencer.Add(() => new Instant(action)); | |
/// <summary> | |
/// Adds a gate that pauses sequencing until a given disposable is disposed of. | |
/// </summary> | |
/// <param name="sequencer">The sequencer that the gate is being added to.</param> | |
/// <returns>The disposable that needs to be disposed so that the sequence proceeds past the gate.</returns> | |
public static IDisposable AddLapse(this ISequencer sequencer) { | |
var lapse = Lapse.Create(); | |
sequencer.Add(lapse); | |
return lapse; | |
} | |
/// <summary> | |
/// Adds a gate that pauses sequencing until a given disposable is disposed of. | |
/// </summary> | |
/// <param name="sequencer">The sequencer that the gate is being added to.</param> | |
/// <param name="action">A lambda expression that takes the disposable and is invoked when the gate is reached in the sequence.</param> | |
/// <returns>The observable gate added to the sequencer.</returns> | |
public static object AddLapse(this ISequencer sequencer, Action<IDisposable> action) => | |
sequencer.Add(() => Lapse.Create(action)); | |
/// <summary> | |
/// Adds a gate that pauses sequencing until the last emitted value of an observable becomes true. | |
/// </summary> | |
/// <param name="sequencer">The sequencer that the gate is being added to.</param> | |
/// <param name="gate">The observable whose last emitted boolean value will allow passing through the gate.</param> | |
/// <returns>The observable gate added to the sequencer.</returns> | |
public static object AddGate(this ISequencer sequencer, IObservable<bool> gate) => | |
sequencer.Add(() => gate.WhereTrue().Take(1)); | |
/// <summary> | |
/// Adds a step to this <paramref name="sequencer"/> that waits the given time interval. | |
/// </summary> | |
public static object AddDelay(this ISequencer sequencer, float seconds) => | |
sequencer.AddDelay(TimeSpan.FromSeconds(seconds)); | |
/// <summary> | |
/// Adds a step to this <paramref name="sequencer"/> that waits the given time interval. | |
/// </summary> | |
public static object AddDelay(this ISequencer sequencer, float seconds, IScheduler scheduler) => | |
sequencer.AddDelay(TimeSpan.FromSeconds(seconds), scheduler); | |
/// <summary> | |
/// Adds a step to this <paramref name="sequencer"/> that waits the given time interval. | |
/// </summary> | |
public static object AddDelay(this ISequencer sequencer, TimeSpan interval) => | |
sequencer.AddDelay(interval, Scheduler.DefaultSchedulers.TimeBasedOperations); | |
/// <summary> | |
/// Adds a step to this <paramref name="sequencer"/> that waits the given time interval. | |
/// </summary> | |
public static object AddDelay(this ISequencer sequencer, TimeSpan interval, IScheduler scheduler) => | |
sequencer.Add(Observable.Timer(interval, scheduler)); | |
/// <summary> | |
/// Adds a step to this <paramref name="sequencer"/> that disposes of the given <paramref name="disposable"/>. | |
/// </summary> | |
public static object AddDispose(this ISequencer sequencer, IDisposable disposable) => | |
sequencer.AddAction(disposable.Dispose); | |
/// <summary> | |
/// On a <see cref="Parallel"/> sequencer, allows scheduling of an event at a specific moment, expressed in seconds from the start of the sequencing. | |
/// </summary> | |
/// <example><code> | |
/// Parallel.Create(par => { | |
/// par.At(0.5).Add(UnitObservable); | |
/// par.At(1).AddSequence(Sequence); | |
/// par.At(2.5).AddAction(action); | |
/// }); | |
/// </code></example> | |
/// <remarks>Can only be used on a <see cref="Parallel"/> sequencer.</remarks> | |
public static ISequencer At(this ISequencer sequencer, float timeInSeconds) => | |
new AtSequencer(sequencer, timeInSeconds, Scheduler.DefaultSchedulers.TimeBasedOperations); | |
/// <summary> | |
/// On a <see cref="Parallel"/> sequencer, allows scheduling of an event at a specific moment on a specific scheduler, | |
/// expressed in a time offset from the start of the sequencing. | |
/// </summary> | |
/// <example><code> | |
/// Parallel.Create(par => { | |
/// par.At(0.5, Scheduler.MainThreadEndOfFrame).Add(UnitObservable); | |
/// par.At(1, Scheduler.ThreadPool).AddSequence(Sequence); | |
/// par.At(2.5, Scheduler.Immediate).AddAction(action); | |
/// }); | |
/// </code></example> | |
/// <remarks>Can only be used on a <see cref="Parallel"/> sequencer.</remarks> | |
public static ISequencer At(this ISequencer sequencer, float timeOffset, IScheduler scheduler) => | |
new AtSequencer(sequencer, timeOffset, scheduler); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using UniRx; | |
namespace MyNamespace.Sequencing { | |
/// <summary> | |
/// Extension methods for working with <see cref="LiveSequence"/>s. | |
/// </summary> | |
public static class LiveSequenceExtensions { | |
/// <summary> | |
/// Adds a step to this <paramref name="liveSequence"/> that will complete it when reached. | |
/// </summary> | |
public static void AddComplete(this LiveSequence liveSequence) => | |
liveSequence.AddAction(liveSequence.Complete); | |
/// <summary> | |
/// Truncates everything after the given observable if it is already present in the sequence. Otherwise, adds it to the end of the sequence. | |
/// </summary> | |
/// <param name="liveSequence">The LiveSequence that the observable is being added to.</param> | |
/// <param name="observable">The observable being added to the LiveSequence</param> | |
/// <returns>True if the observable was added at the end of sequence, or False if the observable | |
/// was already present in the sequence and everything after it was truncated instead.</returns> | |
public static bool AddOrTruncateAfter(this LiveSequence liveSequence, IObservable<Unit> observable) { | |
if (liveSequence.TruncateAfter(observable)) { | |
return false; | |
} | |
liveSequence.Add(observable); | |
return true; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using UniRx; | |
using UnityEngine; | |
namespace MyNamespace.Sequencing { | |
/// <summary> | |
/// Extension methods for working with <see cref="IObservable{T}"/> objects. | |
/// </summary> | |
public static class IObservableExtensions { | |
/// <summary> | |
/// Throttles this <paramref name="observable"/> so that it produces values only once per frame. | |
/// </summary> | |
public static IObservable<T> ThrottleOncePerFrame<T>(this IObservable<T> observable) { | |
var lastFrame = -1; | |
return observable | |
.Where(_ => { | |
if (Time.frameCount != lastFrame) { | |
lastFrame = Time.frameCount; | |
return true; | |
} | |
return false; | |
}) | |
.ObserveOn(Scheduler.MainThreadEndOfFrame); | |
} | |
/// <summary> | |
/// Adds this observable into the given <see cref="ISequencer"/>. | |
/// </summary> | |
public static void In(this IObservable<Unit> observable, ISequencer sequencer) { | |
sequencer.Add(observable); | |
} | |
/// <summary> | |
/// Creates a new <see cref="Sequence"/> from the given sequence of unit observables. | |
/// </summary> | |
public static Sequence ToSequence(this IEnumerable<IObservable<Unit>> observables) => | |
Sequence.Create(observables); | |
/// <summary> | |
/// Creates a new <see cref="Parallel"/> from the given sequence of unit observables. | |
/// </summary> | |
public static Parallel ToParallel(this IEnumerable<IObservable<Unit>> observables) => | |
Parallel.Create(observables); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment