Skip to content

Instantly share code, notes, and snippets.

@rtlsilva
Created November 29, 2023 23:11
Show Gist options
  • Save rtlsilva/628daf6e62bf9eaabb2b60974bf04311 to your computer and use it in GitHub Desktop.
Save rtlsilva/628daf6e62bf9eaabb2b60974bf04311 to your computer and use it in GitHub Desktop.
A reactive sequencing library for coordinating time-based operations
using System;
using UniRx;
namespace MyNamespace.Sequencing {
/// <summary>
/// Defines functionality for providing observable sequencing capabilities.
/// </summary>
public interface ISequencer : IObservable<Unit> {
/// <summary>
/// Adds the given <paramref name="observable"/> to this <see cref="ISequencer"/>.
/// </summary>
/// <param name="observable">The observable to add to the sequence.</param>
/// <returns>The newly added observable as an <see langword="object"/>.</returns>
object Add(IObservable<Unit> observable);
/// <summary>
/// Adds the given <paramref name="selector"/> to this <see cref="ISequencer"/>.
/// </summary>
/// <param name="selector">The unit observable selector function to add to the sequence.</param>
/// <returns>The newly added selector as an <see langword="object"/>.</returns>
object Add(Func<IObservable<Unit>> selector);
}
}
using System;
using UniRx;
namespace MyNamespace.Sequencing {
/// <summary>
/// Base class for sequencers.
/// </summary>
public abstract class SequencerBase {
/// <summary>
/// Gets a unit observable from a specified <see langword="object"/>.
/// <para>If the object is of a type compatible with a unit observable, it is converted and returned as one.</para>
/// <para>If the object is a function returning a unit observable, it is wrapped in an <see cref="Observable.Defer{T}(Func{IObservable{T}})"/>,
/// making sure its execution is deferred until subscription (i.e. makes it cold).</para>
/// </summary>
/// <param name="item">The item to get a unit observable from.</param>
/// <returns>The specified object as a unit observable.</returns>
protected static IObservable<Unit> GetObservableFromItem(object item) =>
item as IObservable<Unit> ?? Observable.Defer((Func<IObservable<Unit>>)item);
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using UniRx;
namespace MyNamespace.Sequencing {
/// <summary>
/// Base class for <see cref="Sequence"/> or <see cref="Parallel"/> sequencers.
/// </summary>
public abstract class SequenceOrParallelBase : SequencerBase, ISequencer {
private readonly Action<ISequencer> action;
private readonly List<object> items = new List<object>();
/// <summary>
/// Initializes a new instance of the <see cref="SequenceOrParallelBase"/> class with the given <paramref name="action"/>.
/// </summary>
/// <param name="action">The action that will be executed to retrieve observable sequences passed during construction.</param>
protected SequenceOrParallelBase(Action<ISequencer> action = null) {
this.action = action;
}
/// <inheritdoc/>
public object Add(IObservable<Unit> observable) {
this.items.Add(observable);
return observable;
}
/// <inheritdoc/>
public object Add(Func<IObservable<Unit>> selector) {
this.items.Add(selector);
return selector;
}
/// <inheritdoc/>
public abstract IDisposable Subscribe(IObserver<Unit> observer);
/// <summary>
/// Retrieves the unit observables currently held by this sequencer.
/// </summary>
/// <returns>An enumeration of the unit observables currently in this sequencer.</returns>
protected IEnumerable<IObservable<Unit>> GetObservables() {
var observables = this.items.Select(GetObservableFromItem).ToList();
this.action?.Invoke(this);
return this.items
.Select(GetObservableFromItem)
.Except(observables)
.Concat(observables);
}
}
}
using System;
using System.Collections.Generic;
using UniRx;
using MyNamespace.Extensions;
namespace MyNamespace.Sequencing {
/// <summary>
/// An observable <see cref="ISequencer"/> that executes observables sequentially.
/// It completes once its last child has completed.
/// </summary>
public sealed class Sequence : SequenceOrParallelBase {
/// <summary>
/// Creates a new sequence.
/// </summary>
/// <param name="action">A lambda expression that takes the newly created sequence, so steps may be added to it.</param>
/// <returns>The newly created sequence.</returns>
public static Sequence Create(Action<ISequencer> action = null) =>
new Sequence(action);
/// <summary>
/// Creates a new sequence.
/// </summary>
/// <param name="selectors">A variable amount of observable selector functions whose results to add to the sequence.</param>
/// <returns>The newly created sequence.</returns>
public static Sequence Create(params Func<IObservable<Unit>>[] selectors) =>
Create(seq => selectors.ForEach(selector => seq.Add(selector())));
/// <summary>
/// Creates a new sequence.
/// </summary>
/// <param name="observables">An enumeration of observables to add to the sequence.</param>
/// <returns>The newly created sequence.</returns>
public static Sequence Create(IEnumerable<IObservable<Unit>> observables) =>
Create(seq => observables.ForEach(observable => seq.Add(observable)));
/// <summary>
/// Creates a new sequence.
/// </summary>
/// <param name="observables">A variable amount of observables to add to the sequence.</param>
/// <returns>The newly created sequence.</returns>
public static Sequence Create(params IObservable<Unit>[] observables) =>
Create(seq => observables.ForEach(observable => seq.Add(observable)));
/// <summary>
/// Creates a new sequence and immediately subscribes to it, triggering its execution.
/// </summary>
/// <param name="action">A lambda expression that takes the newly created sequence, so steps may be added to it.</param>
/// <returns>An IDisposable to allow unsubscription from the sequence, cancelling it.</returns>
public static IDisposable Start(Action<ISequencer> action) =>
Create(action).AutoDetach().Subscribe();
/// <summary>
/// Creates a new sequence and immediately subscribes to it, triggering its execution.
/// </summary>
/// <param name="action">A lambda expression that takes the newly created sequence, so steps may be added to it.</param>
/// <param name="onError">An action to execute on error.</param>
/// <returns>An IDisposable to allow unsubscription from the live sequence, cancelling it.</returns>
public static IDisposable Start(Action<ISequencer> action, Action<Exception> onError) =>
Create(action).AutoDetach().DoOnError(onError).Subscribe();
/// <summary>
/// Creates a new sequence and immediately subscribes to it, triggering its execution.
/// </summary>
/// <param name="observables">A variable amount of observables to add to the sequence.</param>
/// <returns>The newly created sequence.</returns>
public static IDisposable Start(params IObservable<Unit>[] observables) =>
Create(observables).AutoDetach().Subscribe();
private Sequence(Action<ISequencer> action = null) : base(action) {
}
/// <summary>
/// Subscribes to this observable, notifying it that the given <paramref name="observer"/> is ready to receive notifications.
/// </summary>
/// <param name="observer">The object that is to receive notifications.</param>
/// <returns>A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.</returns>
public override IDisposable Subscribe(IObserver<Unit> observer) =>
this.GetObservables()
.Concat()
.Subscribe(observer);
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using MyNamespace.Extensions;
using UniRx;
namespace MyNamespace.Sequencing {
/// <summary>
/// An observable <see cref="ISequencer"/> that executes observables in a queue sequentially as soon as possible.
/// When it is subscribed to, it starts waiting for new additions to the queue until it is manually disposed of.
/// </summary>
public class LiveSequence : SequencerBase, ISequencer, IDisposable {
/// <summary>
/// Creates a new live sequence.
/// </summary>
/// <param name="action">A lambda expression that takes the newly created live sequence, so steps may be added to it.</param>
/// <returns>The newly created live sequence.</returns>
public static LiveSequence Create(Action<ISequencer> action) {
var liveSequence = new LiveSequence();
action?.Invoke(liveSequence);
return liveSequence;
}
/// <summary>
/// Creates a new live sequence and immediately subscribes to it, triggering its execution.
/// </summary>
/// <returns>The newly created live sequence.</returns>
public static LiveSequence Start() {
var liveSequence = new LiveSequence();
liveSequence.SubscribeAndForget();
return liveSequence;
}
/// <summary>
/// Creates a new live sequence and immediately subscribes to it, triggering its execution.
/// </summary>
/// <param name="action">A lambda expression that takes the newly created live sequence, so steps may be added to it.</param>
/// <returns>An IDisposable to allow unsubscription from the live sequence, cancelling it.</returns>
public static IDisposable Start(Action<ISequencer> action) =>
Create(action).AutoDetach().Subscribe();
/// <summary>
/// Creates a new live sequence and immediately subscribes to it, triggering its execution.
/// </summary>
/// <param name="action">A lambda expression that takes the newly created live sequence, so steps may be added to it.</param>
/// <param name="onError">An action to execute on error.</param>
/// <returns>An IDisposable to allow unsubscription from the live sequence, cancelling it.</returns>
public static IDisposable Start(Action<ISequencer> action, Action<Exception> onError) =>
Create(action).AutoDetach().DoOnError(onError).Subscribe();
private Queue<object> queuedItems = new Queue<object>();
private IDisposable currentExecution;
private bool isStarted;
private bool isExecuting;
private Lapse subscriptionLapse;
/// <summary>
/// Subscribes to this observable, notifying it that the given <paramref name="observer"/> is ready to receive notifications.
/// </summary>
/// <param name="observer">The object that is to receive notifications.</param>
/// <returns>A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.</returns>
public IDisposable Subscribe(IObserver<Unit> observer) {
if (!this.isStarted) {
this.subscriptionLapse = Lapse.Create();
this.isStarted = true;
this.StartNext();
}
return new CompositeDisposable(
Disposable.Create(this.Complete),
this.subscriptionLapse.Subscribe(observer));
}
/// <summary>
/// Adds the given <paramref name="observable"/> to this live sequence.
/// </summary>
public object Add(IObservable<Unit> observable) =>
this.AddInternal(observable);
/// <summary>
/// Adds the observable returned by the given <paramref name="selector"/> to this live sequence.
/// </summary>
public object Add(Func<IObservable<Unit>> selector) =>
this.AddInternal(selector);
private object AddInternal(object observable) {
this.queuedItems.Enqueue(observable);
this.StartNext();
return observable;
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose() {
this.Complete();
}
/// <summary>
/// Stops the live sequence and disposes of the currently executing observable, if any.
/// </summary>
public void Complete() {
if (!this.isStarted) {
return;
}
this.subscriptionLapse.Dispose();
this.subscriptionLapse = null;
this.isStarted = false;
this.currentExecution?.Dispose();
}
/// <summary>
/// Removes all observables from the sequence, including the currently executing one, if any.
/// </summary>
public void Clear() {
this.queuedItems.Clear();
this.isExecuting = false;
this.currentExecution?.Dispose();
}
/// <summary>
/// Removes observables from the sequence, starting with the given one and up to the end.
/// <para/>The currently executing observable, if any, is not affected by this operation because it is no longer in the queue.
/// </summary>
/// <param name="observable">The observable to start truncating from.</param>
/// <returns>Whether the given observable was found and the truncation occurred.</returns>
public bool TruncateBefore(object observable) =>
Truncate(observable, isInclusive: true);
/// <summary>
/// Removes observables from the sequence, starting after the given one and up to the end.
/// <para/>The currently executing observable, if any, is not affected by this operation because it is no longer in the queue.
/// </summary>
/// <param name="observable">The observable after which to start truncating from.</param>
/// <returns>Whether the given observable was found and the truncation occurred.</returns>
public bool TruncateAfter(object observable) =>
Truncate(observable, isInclusive: false);
/// <summary>
/// Removes observables from sequence, starting with the first one and up to before the given observable.
/// <para/>The currently executing observable, if any, is cancelled/disposed and the
/// next observable remaining in the queue, if any, is executed.
/// </summary>
/// <param name="observable">The observable before which to stop skipping.</param>
/// <returns>Whether the given observable was found and the skipping occurred.</returns>
public bool SkipBefore(object observable) =>
Skip(observable, isInclusive: false);
/// <summary>
/// Removes observables from sequence, starting with the first one and up to the given observable.
/// <para/>The currently executing observable, if any, is cancelled/disposed and the
/// next observable remaining in the queue, if any, is executed.
/// </summary>
/// <param name="observable">The observable at which to stop skipping.</param>
/// <returns>Whether the given observable was found and the skipping occurred.</returns>
public bool SkipAfter(object observable) =>
Skip(observable, isInclusive: true);
private bool Skip(object observable, bool isInclusive) {
if (!this.queuedItems.Contains(observable)) {
return false;
}
this.isExecuting = false;
while (this.queuedItems.Peek() != observable) {
this.queuedItems.Dequeue();
}
if (isInclusive) {
this.queuedItems.Dequeue();
}
this.currentExecution?.Dispose();
if (this.isStarted && !this.isExecuting) {
this.StartNext();
}
return true;
}
private bool Truncate(object observable, bool isInclusive) {
var index = this.queuedItems.IndexOfOrNull(observable);
if (index == null) {
return false;
}
var count = index.Value + (isInclusive ? 0 : 1);
this.queuedItems = new Queue<object>(this.queuedItems.Take(count));
return true;
}
private void StartNext() {
if (!this.isStarted || this.isExecuting || this.queuedItems.Count == 0) {
return;
}
this.isExecuting = true;
var item = this.queuedItems.Dequeue();
var observable = GetObservableFromItem(item);
this.currentExecution = observable
.Finally(() => {
this.isExecuting = false;
this.StartNext();
})
.Subscribe();
}
}
}
using System;
using System.Collections.Generic;
using UniRx;
using MyNamespace.Extensions;
namespace MyNamespace.Sequencing {
/// <summary>
/// An observable <see cref="ISequencer"/> that executes observables in parallel.
/// It completes once all its children have completed.
/// </summary>
public sealed class Parallel : SequenceOrParallelBase {
/// <summary>
/// Creates a new parallel.
/// </summary>
/// <param name="action">A lambda expression that takes the newly created parallel, so steps may be added to it.</param>
/// <returns>The newly created parallel.</returns>
public static Parallel Create(Action<ISequencer> action = null) =>
new Parallel(action);
/// <summary>
/// Creates a new parallel.
/// </summary>
/// <param name="selectors">A variable amount of observable selector functions whose results to add to the parallel.</param>
/// <returns>The newly created parallel.</returns>
public static Parallel Create(params Func<IObservable<Unit>>[] selectors) =>
Create(parallel => selectors.ForEach(selector => parallel.Add(selector())));
/// <summary>
/// Creates a new parallel.
/// </summary>
/// <param name="observables">An enumeration of observables to add to the parallel.</param>
/// <returns>The newly created parallel.</returns>
public static Parallel Create(IEnumerable<IObservable<Unit>> observables) =>
Create(parallel => observables.ForEach(observable => parallel.Add(observable)));
/// <summary>
/// Creates a new parallel.
/// </summary>
/// <param name="observables">A variable amount of observables to add to the parallel.</param>
/// <returns>The newly created parallel.</returns>
public static Parallel Create(params IObservable<Unit>[] observables) =>
Create(parallel => observables.ForEach(observable => parallel.Add(observable)));
/// <summary>
/// Creates a new parallel and immediately subscribes to it, triggering its execution.
/// </summary>
/// <param name="action">A lambda expression that takes the newly created parallel, so steps may be added to it.</param>
/// <returns>An IDisposable to allow unsubscription from the parallel, cancelling it.</returns>
public static IDisposable Start(Action<ISequencer> action) =>
Create(action).AutoDetach().Subscribe();
/// <summary>
/// Creates a new live sequence and immediately subscribes to it, triggering its execution.
/// </summary>
/// <param name="action">A lambda expression that takes the newly created live sequence, so steps may be added to it.</param>
/// <param name="onError">An action to execute on error.</param>
/// <returns>An IDisposable to allow unsubscription from the live sequence, cancelling it.</returns>
public static IDisposable Start(Action<ISequencer> action, Action<Exception> onError) =>
Create(action).AutoDetach().DoOnError(onError).Subscribe();
/// <summary>
/// Creates a new parallel and immediately subscribes to it, triggering its execution.
/// </summary>
/// <param name="observables">A variable amount of observables to add to the parallel.</param>
/// <returns>An IDisposable to allow unsubscription from the parallel, cancelling it.</returns>
public static IDisposable Start(params IObservable<Unit>[] observables) =>
Create(observables).AutoDetach().Subscribe();
private Parallel(Action<ISequencer> action = null) : base(action) {
}
/// <summary>
/// Subscribes to this observable, notifying it that the given <paramref name="observer"/> is ready to receive notifications.
/// </summary>
/// <param name="observer">The object that is to receive notifications.</param>
/// <returns>A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.</returns>
public override IDisposable Subscribe(IObserver<Unit> observer) =>
this.GetObservables()
.WhenAll()
.Subscribe(observer);
}
}
using System;
using UniRx;
namespace MyNamespace.Sequencing {
/// <summary>
/// An <see cref="ISequencer"/> that decorates <see cref="Parallel"/> sequencers to add support for timeline-like sequencing.
/// </summary>
internal class AtSequencer : ISequencer {
private readonly ISequencer innerSequencer;
private readonly float delay;
private readonly IScheduler scheduler;
/// <summary>
/// Initializes a new instance of the <see cref="AtSequencer"/> class with the given <paramref name="innerSequencer"/> and <paramref name="delay"/>.
/// </summary>
/// <param name="innerSequencer">The <see cref="ISequencer"/> that will sequence the items added to this one. Must be a <see cref="Parallel"/>.</param>
/// <param name="delay">The time offset, expressed in seconds since the start of the <paramref name="innerSequencer"/>,
/// that this <see cref="ISequencer"/> will add to items added to it.</param>
/// <param name="scheduler">The time scheduler that this sequencer will use for sequencing items added to it.</param>
public AtSequencer(ISequencer innerSequencer, float delay, IScheduler scheduler) {
if (!(innerSequencer is Parallel)) {
throw new NotSupportedException("The ISequencer.At() extension method can only be used on Parallel sequencers");
}
this.innerSequencer = innerSequencer;
this.delay = delay;
this.scheduler = scheduler;
}
/// <inheritdoc/>
public IDisposable Subscribe(IObserver<Unit> observer) =>
this.innerSequencer.Subscribe();
/// <inheritdoc/>
public object Add(IObservable<Unit> observable) =>
this.innerSequencer.AddSequence(seq => {
seq.AddDelay(this.delay, this.scheduler);
seq.Add(observable);
});
/// <inheritdoc/>
public object Add(Func<IObservable<Unit>> selector) =>
this.innerSequencer.AddSequence(seq => {
seq.AddDelay(this.delay, this.scheduler);
seq.Add(selector);
});
}
}
using System;
using UniRx;
namespace MyNamespace.Sequencing {
/// <summary>
/// An observable that completes instantly.
/// Can be used as e.g. a marker or checkpoint in sequencers.
/// </summary>
public class Instant : IObservable<Unit> {
private readonly Action action;
/// <summary>
/// The default immutable <see cref="Instant"/> instance.
/// </summary>
public static readonly IObservable<Unit> Default = new Instant();
/// <summary>
/// Initializes a new instance of the <see cref="Instant"/> class.
/// </summary>
/// <param name="action">An optional action to be executed when this observable is subscribed to.</param>
public Instant(Action action = null) {
this.action = action;
}
/// <summary>
/// Subscribes to this observable, notifying it that the given <paramref name="observer"/> is ready to receive notifications.
/// </summary>
/// <param name="observer">The object that is to receive notifications.</param>
/// <returns>A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.</returns>
public IDisposable Subscribe(IObserver<Unit> observer) {
try {
this.action?.Invoke();
} catch (Exception ex) {
return Observable
.Throw<Unit>(ex)
.Subscribe(observer);
}
observer.OnCompleted();
return Disposable.Empty;
}
}
}
using System;
using UniRx;
namespace MyNamespace.Sequencing {
/// <summary>
/// An observable that completes only when it is disposed of.
/// </summary>
public class Lapse : IObservable<Unit>, IDisposable {
private readonly Action<IDisposable> action;
private bool isSubscribed;
private bool isDisposed;
private Subject<Unit> subject;
/// <summary>
/// Creates a new lapse.
/// </summary>
/// <param name="action">An optional action that takes the newly created lapse as an <see cref="IDisposable"/>, to be executed when this observable is subscribed to.</param>
/// <returns>The newly created lapse.</returns>
public static Lapse Create(Action<IDisposable> action = null) =>
new Lapse(action);
/// <summary>
/// Initializes a new instance of the <see cref="Lapse"/> class.
/// </summary>
protected Lapse() { }
/// <summary>
/// Initializes a new instance of the <see cref="Lapse"/> class.
/// </summary>
/// <param name="action">An optional action that takes the newly created lapse as an <see cref="IDisposable"/>, to be executed when this observable is subscribed to.</param>
protected Lapse(Action<IDisposable> action) {
this.action = action;
}
/// <summary>
/// Subscribes to this observable, notifying it that the given <paramref name="observer"/> is ready to receive notifications.
/// </summary>
/// <param name="observer">The object that is to receive notifications.</param>
/// <returns>A reference to an interface that allows observers to stop receiving notifications before the provider has finished sending them.</returns>
public IDisposable Subscribe(IObserver<Unit> observer) {
if (this.isDisposed) {
throw new ObjectDisposedException(nameof(Lapse));
}
if (this.subject == null) {
this.subject = new Subject<Unit>();
}
if (!this.isSubscribed) {
this.isSubscribed = true;
this.action?.Invoke(this);
if (this.isDisposed) {
observer.OnCompleted();
return Disposable.Empty;
}
}
return this.subject.Subscribe(observer);
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose() {
if (this.isDisposed) {
return;
}
if (this.subject != null) {
this.subject.OnCompleted();
this.subject = null;
}
this.isDisposed = true;
}
}
}
using System;
using System.Collections.Generic;
using UniRx;
using MyNamespace.Extensions;
namespace MyNamespace.Sequencing {
/// <summary>
/// Extension methods for working with <see cref="ISequencer"/>s.
/// </summary>
public static class ISequencerExtensions {
/// <summary>
/// Adds the given <paramref name="observable"/> to this <paramref name="sequencer"/>.
/// </summary>
/// <typeparam name="T">The type of the observable's values.</typeparam>
public static object Add<T>(this ISequencer sequencer, IObservable<T> observable) =>
sequencer.Add(observable.AsSingleUnitObservable());
/// <summary>
/// Adds the observable returned by the given <paramref name="observableFactory"/> to this <paramref name="sequencer"/>.
/// </summary>
/// <typeparam name="T">The type of the observable's values.</typeparam>
public static object Add<T>(this ISequencer sequencer, Func<IObservable<T>> observableFactory) =>
sequencer.Add(Observable.Defer(observableFactory));
/// <summary>
/// Creates a new <see cref="Parallel"/> and adds it to this <paramref name="sequencer"/>.
/// </summary>
public static object AddParallel(this ISequencer sequencer, Action<ISequencer> action) =>
sequencer.Add(() => Parallel.Create(action));
/// <summary>
/// Creates a new <see cref="Parallel"/> and adds it to this <paramref name="sequencer"/>.
/// </summary>
public static object AddParallel(this ISequencer sequencer, params Func<IObservable<Unit>>[] selectors) =>
sequencer.Add(() => Parallel.Create(selectors));
/// <summary>
/// Creates a new <see cref="Parallel"/> and adds it to this <paramref name="sequencer"/>.
/// </summary>
public static object AddParallel(this ISequencer sequencer, IEnumerable<IObservable<Unit>> observables) =>
sequencer.Add(() => Parallel.Create(observables));
/// <summary>
/// Creates a new <see cref="Sequence"/> and adds it to this <paramref name="sequencer"/>.
/// </summary>
public static object AddSequence(this ISequencer sequencer, Action<ISequencer> action) =>
sequencer.Add(() => Sequence.Create(action));
/// <summary>
/// Creates a new <see cref="Sequence"/> and adds it to this <paramref name="sequencer"/>.
/// </summary>
public static object AddSequence(this ISequencer sequencer, params Func<IObservable<Unit>>[] selectors) =>
sequencer.Add(() => Sequence.Create(selectors));
/// <summary>
/// Creates a new <see cref="Sequence"/> and adds it to this <paramref name="sequencer"/>.
/// </summary>
public static object AddSequence(this ISequencer sequencer, IEnumerable<IObservable<Unit>> observables) =>
sequencer.Add(() => Sequence.Create(observables));
/// <summary>
/// Creates a new <see cref="LiveSequence"/> and adds it to this <paramref name="sequencer"/>.
/// </summary>
public static LiveSequence AddLiveSequence(this ISequencer sequencer) {
var liveSequence = new LiveSequence();
sequencer.Add(liveSequence);
return liveSequence;
}
/// <summary>
/// Creates a new <see cref="LiveSequence"/> and adds it to this <paramref name="sequencer"/>.
/// </summary>
public static LiveSequence AddLiveSequence(this ISequencer sequencer, Action<ISequencer> action) {
var liveSequence = LiveSequence.Create(action);
sequencer.Add(liveSequence);
return liveSequence;
}
/// <summary>
/// Adds a new <see cref="Instant"/> to this <paramref name="sequencer"/>.
/// </summary>
/// <returns>The observable added to the sequencer.</returns>
public static object AddInstant(this ISequencer sequencer) =>
sequencer.Add(() => new Instant());
/// <summary>
/// Adds an instantaneous step to this <paramref name="sequencer"/>. This is synchronous code that will execute at a specific point in a sequence.
/// </summary>
/// <param name="sequencer">The sequencer that the action is being added to.</param>
/// <param name="action">The action being added to the sequencer.</param>
/// <returns>The observable added to the sequencer.</returns>
public static object AddAction(this ISequencer sequencer, Action action) =>
sequencer.Add(() => new Instant(action));
/// <summary>
/// Adds a gate that pauses sequencing until a given disposable is disposed of.
/// </summary>
/// <param name="sequencer">The sequencer that the gate is being added to.</param>
/// <returns>The disposable that needs to be disposed so that the sequence proceeds past the gate.</returns>
public static IDisposable AddLapse(this ISequencer sequencer) {
var lapse = Lapse.Create();
sequencer.Add(lapse);
return lapse;
}
/// <summary>
/// Adds a gate that pauses sequencing until a given disposable is disposed of.
/// </summary>
/// <param name="sequencer">The sequencer that the gate is being added to.</param>
/// <param name="action">A lambda expression that takes the disposable and is invoked when the gate is reached in the sequence.</param>
/// <returns>The observable gate added to the sequencer.</returns>
public static object AddLapse(this ISequencer sequencer, Action<IDisposable> action) =>
sequencer.Add(() => Lapse.Create(action));
/// <summary>
/// Adds a gate that pauses sequencing until the last emitted value of an observable becomes true.
/// </summary>
/// <param name="sequencer">The sequencer that the gate is being added to.</param>
/// <param name="gate">The observable whose last emitted boolean value will allow passing through the gate.</param>
/// <returns>The observable gate added to the sequencer.</returns>
public static object AddGate(this ISequencer sequencer, IObservable<bool> gate) =>
sequencer.Add(() => gate.WhereTrue().Take(1));
/// <summary>
/// Adds a step to this <paramref name="sequencer"/> that waits the given time interval.
/// </summary>
public static object AddDelay(this ISequencer sequencer, float seconds) =>
sequencer.AddDelay(TimeSpan.FromSeconds(seconds));
/// <summary>
/// Adds a step to this <paramref name="sequencer"/> that waits the given time interval.
/// </summary>
public static object AddDelay(this ISequencer sequencer, float seconds, IScheduler scheduler) =>
sequencer.AddDelay(TimeSpan.FromSeconds(seconds), scheduler);
/// <summary>
/// Adds a step to this <paramref name="sequencer"/> that waits the given time interval.
/// </summary>
public static object AddDelay(this ISequencer sequencer, TimeSpan interval) =>
sequencer.AddDelay(interval, Scheduler.DefaultSchedulers.TimeBasedOperations);
/// <summary>
/// Adds a step to this <paramref name="sequencer"/> that waits the given time interval.
/// </summary>
public static object AddDelay(this ISequencer sequencer, TimeSpan interval, IScheduler scheduler) =>
sequencer.Add(Observable.Timer(interval, scheduler));
/// <summary>
/// Adds a step to this <paramref name="sequencer"/> that disposes of the given <paramref name="disposable"/>.
/// </summary>
public static object AddDispose(this ISequencer sequencer, IDisposable disposable) =>
sequencer.AddAction(disposable.Dispose);
/// <summary>
/// On a <see cref="Parallel"/> sequencer, allows scheduling of an event at a specific moment, expressed in seconds from the start of the sequencing.
/// </summary>
/// <example><code>
/// Parallel.Create(par => {
/// par.At(0.5).Add(UnitObservable);
/// par.At(1).AddSequence(Sequence);
/// par.At(2.5).AddAction(action);
/// });
/// </code></example>
/// <remarks>Can only be used on a <see cref="Parallel"/> sequencer.</remarks>
public static ISequencer At(this ISequencer sequencer, float timeInSeconds) =>
new AtSequencer(sequencer, timeInSeconds, Scheduler.DefaultSchedulers.TimeBasedOperations);
/// <summary>
/// On a <see cref="Parallel"/> sequencer, allows scheduling of an event at a specific moment on a specific scheduler,
/// expressed in a time offset from the start of the sequencing.
/// </summary>
/// <example><code>
/// Parallel.Create(par => {
/// par.At(0.5, Scheduler.MainThreadEndOfFrame).Add(UnitObservable);
/// par.At(1, Scheduler.ThreadPool).AddSequence(Sequence);
/// par.At(2.5, Scheduler.Immediate).AddAction(action);
/// });
/// </code></example>
/// <remarks>Can only be used on a <see cref="Parallel"/> sequencer.</remarks>
public static ISequencer At(this ISequencer sequencer, float timeOffset, IScheduler scheduler) =>
new AtSequencer(sequencer, timeOffset, scheduler);
}
}
using System;
using UniRx;
namespace MyNamespace.Sequencing {
/// <summary>
/// Extension methods for working with <see cref="LiveSequence"/>s.
/// </summary>
public static class LiveSequenceExtensions {
/// <summary>
/// Adds a step to this <paramref name="liveSequence"/> that will complete it when reached.
/// </summary>
public static void AddComplete(this LiveSequence liveSequence) =>
liveSequence.AddAction(liveSequence.Complete);
/// <summary>
/// Truncates everything after the given observable if it is already present in the sequence. Otherwise, adds it to the end of the sequence.
/// </summary>
/// <param name="liveSequence">The LiveSequence that the observable is being added to.</param>
/// <param name="observable">The observable being added to the LiveSequence</param>
/// <returns>True if the observable was added at the end of sequence, or False if the observable
/// was already present in the sequence and everything after it was truncated instead.</returns>
public static bool AddOrTruncateAfter(this LiveSequence liveSequence, IObservable<Unit> observable) {
if (liveSequence.TruncateAfter(observable)) {
return false;
}
liveSequence.Add(observable);
return true;
}
}
}
using System;
using System.Collections.Generic;
using UniRx;
using UnityEngine;
namespace MyNamespace.Sequencing {
/// <summary>
/// Extension methods for working with <see cref="IObservable{T}"/> objects.
/// </summary>
public static class IObservableExtensions {
/// <summary>
/// Throttles this <paramref name="observable"/> so that it produces values only once per frame.
/// </summary>
public static IObservable<T> ThrottleOncePerFrame<T>(this IObservable<T> observable) {
var lastFrame = -1;
return observable
.Where(_ => {
if (Time.frameCount != lastFrame) {
lastFrame = Time.frameCount;
return true;
}
return false;
})
.ObserveOn(Scheduler.MainThreadEndOfFrame);
}
/// <summary>
/// Adds this observable into the given <see cref="ISequencer"/>.
/// </summary>
public static void In(this IObservable<Unit> observable, ISequencer sequencer) {
sequencer.Add(observable);
}
/// <summary>
/// Creates a new <see cref="Sequence"/> from the given sequence of unit observables.
/// </summary>
public static Sequence ToSequence(this IEnumerable<IObservable<Unit>> observables) =>
Sequence.Create(observables);
/// <summary>
/// Creates a new <see cref="Parallel"/> from the given sequence of unit observables.
/// </summary>
public static Parallel ToParallel(this IEnumerable<IObservable<Unit>> observables) =>
Parallel.Create(observables);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment