Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Rx Implementation of an Undo/Redo Recorder.
using System;
using System.Collections.Immutable;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace Steellworks.Rx.UndoRedo
{
/// <summary>
/// Defines properties for fetching a value and its inverse.
/// </summary>
/// <typeparam name="T">Type of the value and inverse.</typeparam>
public interface IInvertibleValue<out T>
{
/// <summary>
/// A value, the inverse of which is InverseValue.
/// </summary>
T Value { get; }
/// <summary>
/// A value, the inverse of which is Value.
/// </summary>
T InverseValue { get; }
}
/// <summary>
/// Basic structure for storing a value and its inverse.
/// </summary>
/// <typeparam name="T">Type of the value and inverse.</typeparam>
public struct ValueAndInverse<T> : IInvertibleValue<T>
{
public ValueAndInverse(T value, T inverseValue) : this()
{
InverseValue = inverseValue;
Value = value;
}
public T Value { get; private set; }
public T InverseValue { get; private set; }
}
/// <summary>
/// Reactive undo/redo recorder. Seamlessly integrates into reactive circuits, recording and replaying data when
/// an undo or redo is triggered.
/// </summary>
public class ReactiveUndoRedoRecorder : IDisposable
{
private readonly Subject<UndoRedoAction> recordSubject = new Subject<UndoRedoAction>();
private readonly IDisposable updateSub;
/// <summary>
/// Instantiates a new UndoRedoRecorder, given streams representing undo and redo triggering events.
/// </summary>
/// <param name="undoTrigger">When this observable produces a value, an undo will occur.</param>
/// <param name="redoTrigger">When this observable produces a value, a redo will occur.</param>
public ReactiveUndoRedoRecorder(IObservable<Unit> undoTrigger, IObservable<Unit> redoTrigger)
{
// explicit wrappers for implicit methods on RecorderStack
Func<RecorderStack, Tuple<Action, RecorderStack>> undo = stack => stack.Undo();
Func<RecorderStack, Tuple<Action, RecorderStack>> redo = stack => stack.Redo();
// We want to keep track of the state of the RecorderStack, and also execute actions produced from
// undo or redo operations.
Tuple<Action, RecorderStack> state0 = Tuple.Create(new Action(() => { }), RecorderStack.Empty);
// Make the trigger streams produce the corresponding actions on the RecorderStack.
IObservable<Func<RecorderStack, Tuple<Action, RecorderStack>>> undoStream = undoTrigger.Select(_ => undo);
IObservable<Func<RecorderStack, Tuple<Action, RecorderStack>>> redoStream = redoTrigger.Select(_ => redo);
// Stream of undo/redo actions which are produced when an Undo or Redo is performed.
IObservable<Action> undoRedoActions =
// Merge all streams that will affect the RecorderStack
Observable.Merge(undoStream, redoStream, recordSubject.Select(CreateRecordClosure))
// Update the RecorderStack
.Scan(state0, (state, action) => action(state.Item2))
// Only propagate the action produced by the RecorderStack update action.
.Select(x => x.Item1);
updateSub = undoRedoActions.Subscribe(x => x());
}
public void Dispose()
{
updateSub.Dispose();
recordSubject.Dispose();
}
private static Func<RecorderStack, Tuple<Action, RecorderStack>> CreateRecordClosure(UndoRedoAction com)
{
return stack => stack.Record(com);
}
private void RecordUndoRedoAction(UndoRedoAction com)
{
recordSubject.OnNext(com);
}
/// <summary>
/// Applies an accumulator function over an observable sequence and returns each intermediate result. Each
/// intermediate state is also recorded for playback via undo or redo.
/// </summary>
/// <typeparam name="TState">The type of the result of the aggregation.</typeparam>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">An observable sequence to accumulate over.</param>
/// <param name="initialState">The intitial accumulator value.</param>
/// <param name="accumulator">An accumulator function to be invoked on each element.</param>
/// <returns>Sequence of intermediate aggregation values with undo/redo interleaved.</returns>
public IObservable<TState> RecordScan<TState, TSource>(IObservable<TSource> source, TState initialState, Func<TState, TSource, TState> accumulator)
{
return Observable.Create<TState>(
observer =>
{
// This subject is used from inside of the recorded UndoRedo actions, in order to send replayed values
// back into this accumulation.
var stateUpdateSubject = new Subject<TState>();
// The idea here is that we want to handle the state update differently, depending if a new value is
// produced or an old value is being replayed. To facilitate this, we project both sequences into a
// sequence of functions that handle the state update.
// If a new value is produced by the source stream, then we also need to perform additional
// subscription logic. This means that we need to produce both a new state and an action to be invoked
// on subscription.
// Handler for source observable (new values)
var sourceUpdates = source.Select<TSource, Func<TState, Tuple<TState, Action>>>(
result =>
currentState =>
{
// First we produce a new state by using the accumulator function.
TState newState = accumulator(currentState, result);
// We also create a UndoRedoAction for replaying the old and new states
var com = new UndoRedoAction(
() => stateUpdateSubject.OnNext(currentState),
() => stateUpdateSubject.OnNext(newState));
// Return the new state, and on subscription record the action.
return Tuple.Create(newState, new Action(() => RecordUndoRedoAction(com)));
});
// Handler for replayed values
var replayedUpdates = stateUpdateSubject.Select<TState, Func<TState, Tuple<TState, Action>>>(
// Here, we throw away the current state and replace it with the replayed state. We don't
// need to do anything on subscription, so we store an "empty" action.
result => _ => Tuple.Create(result, new Action(() => { })));
return
// Merge both sequences.
Observable.Merge(sourceUpdates, replayedUpdates)
// Perform the aggregation. Note that the actual aggregation is performed inside of the handlers
// produced by the merged sequences.
.Scan(Tuple.Create(initialState, new Action(() => { })), (state, handler) => handler(state.Item1))
// Subscribe to the aggregation by...
.Subscribe(
result =>
{
// ...first invoking the subscription action,
result.Item2();
// and then passing the intermediate accumulated value to the observer.
observer.OnNext(result.Item1);
},
observer.OnError,
observer.OnCompleted);
});
}
/// <summary>
/// Applies an accumulator function over an observable sequence and returns each intermediate result. When
/// undone, replays values from the source observable and combines them with the current state via an
/// inverse accumulator function.
/// </summary>
/// <typeparam name="TState">The type of the result of the aggregation.</typeparam>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">An observable sequence to accumulate over.</param>
/// <param name="initialState">The intitial accumulator value.</param>
/// <param name="accumulator">An accumulator function to be invoked on each element.</param>
/// <param name="inverseAccumulator">
/// An accumulator function that does the inverse of accumulator, such that the following
/// invariant is satisfied:
///
/// inverseAccumulator(accumulator(state, a), a) = state
/// </param>
/// <returns>Sequence of intermediate aggregation values with undo/redo interleaved.</returns>
public IObservable<TState> RecordScan<TState, TSource>(IObservable<TSource> source, TState initialState, Func<TState, TSource, TState> accumulator, Func<TState, TSource, TState> inverseAccumulator)
{
return Observable.Create<TState>(
observer =>
{
var stateUpdateSubject = new Subject<Func<TState, TState>>();
var sourceUpdates = source.Select<TSource, Func<TState, Tuple<TState, Action>>>(
result =>
currentState =>
{
// First we produce a new state by using the accumulator function.
TState newState = accumulator(currentState, result);
// We also create a UndoRedoAction for replaying the old and new states
var com = new UndoRedoAction(
() => stateUpdateSubject.OnNext(state => inverseAccumulator(state, result)),
() => stateUpdateSubject.OnNext(state => accumulator(state, result)));
// Return the new state, and on subscription record the action.
return Tuple.Create(newState, new Action(() => RecordUndoRedoAction(com)));
});
// Handler for replayed values
var replayedUpdates = stateUpdateSubject
.Select<Func<TState, TState>, Func<TState, Tuple<TState, Action>>>(
result => currentState => Tuple.Create(result(currentState), new Action(() => { })));
return
// Merge both sequences.
Observable.Merge(sourceUpdates, replayedUpdates)
// Perform the aggregation. Note that the actual aggregation is performed inside of the handlers
// produced by the merged sequences.
.Scan(Tuple.Create(initialState, new Action(() => { })),
(state, handler) => handler(state.Item1))
// Subscribe to the aggregation by...
.Subscribe(
result =>
{
// ...first invoking the subscription action,
result.Item2();
// and then passing the intermediate accumulated value to the observer.
observer.OnNext(result.Item1);
},
observer.OnError,
observer.OnCompleted);
});
}
/// <summary>
/// Applies an accumulator function over an observable sequence of invertible values and returns
/// each intermediate result. When undone, applies the inverse of the recorded value to the
/// accumulator to generate the new intermediate state.
/// </summary>
/// <typeparam name="TState">The type of the result of the aggregation.</typeparam>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">An observable sequence to accumulate over.</param>
/// <param name="initialState">The intitial accumulator value.</param>
/// <param name="accumulator">An accumulator function to be invoked on each element.</param>
/// <returns>Sequence of intermediate aggregation values with undo/redo interleaved.</returns>
public IObservable<TState> RecordScan<TState, TSource>(IObservable<IInvertibleValue<TSource>> source, TState initialState, Func<TState, TSource, TState> accumulator)
{
return Observable.Create<TState>(
observer =>
{
var stateUpdateSubject = new Subject<TSource>();
var sourceUpdates = source.Select<IInvertibleValue<TSource>, Func<TState, Tuple<TState, Action>>>(
result =>
currentState =>
{
// First we produce a new state by using the accumulator function.
TState newState = accumulator(currentState, result.Value);
// We also create a UndoRedoAction for replaying the old and new states
var com = new UndoRedoAction(
() => stateUpdateSubject.OnNext(result.InverseValue),
() => stateUpdateSubject.OnNext(result.Value));
// Return the new state, and on subscription record the action.
return Tuple.Create(newState, new Action(() => RecordUndoRedoAction(com)));
});
// Handler for replayed values
var replayedUpdates = stateUpdateSubject
.Select<TSource, Func<TState, Tuple<TState, Action>>>(
result => currentState => Tuple.Create(accumulator(currentState, result), new Action(() => { })));
return
// Merge both sequences.
Observable.Merge(sourceUpdates, replayedUpdates)
// Perform the aggregation. Note that the actual aggregation is performed inside of the handlers
// produced by the merged sequences.
.Scan(Tuple.Create(initialState, new Action(() => { })),
(state, handler) => handler(state.Item1))
// Subscribe to the aggregation by...
.Subscribe(
result =>
{
// ...first invoking the subscription action,
result.Item2();
// and then passing the intermediate accumulated value to the observer.
observer.OnNext(result.Item1);
},
observer.OnError,
observer.OnCompleted);
});
}
/// <summary>
/// Records all data coming from the given observable sequence, allowing for "replaying" of previous
/// values via undo and redo.
/// </summary>
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
/// <param name="source">An observable sequence to record.</param>
/// <param name="initialState">The intial value, to be produced if the first production is undone.</param>
/// <returns>The source observable sequence with undo/redo interleaved.</returns>
public IObservable<T> Record<T>(IObservable<T> source, T initialState)
{
return RecordScan(source, initialState, (_, source1) => source1);
}
/// <summary>
/// Given an observable sequence of invertable values, creates a new observable sequence that produces
/// values when the source sequence produces, and also when the source production is undone or redone.
/// </summary>
/// <typeparam name="T">The type of the elements produced by the source and result sequences.</typeparam>
/// <param name="source">An observable sequence of invertible values.</param>
/// <returns>The source observable sequence with undo/redo interleaved.</returns>
public IObservable<T> Record<T>(IObservable<IInvertibleValue<T>> source)
{
return Observable.Create<T>(
observer =>
source.Subscribe(
result => // When the source sequence produces values...
{
// Create an UndoRedoAction from the InvertibleValue
var com = new UndoRedoAction(
() => observer.OnNext(result.InverseValue),
() => observer.OnNext(result.Value));
// Record the action onto the RecorderStack.
RecordUndoRedoAction(com);
// Produce the value to the observer.
observer.OnNext(result.Value);
},
observer.OnError,
observer.OnCompleted));
}
/// <summary>
/// Subscribes to an observable sequence of invertible actions, and also records the actions for replaying
/// via undo or redo.
/// </summary>
/// <param name="source">An observable sequence of invertible actions.</param>
/// <returns>An IDisposable representing the subscription to actions produced by the source sequence.</returns>
public IDisposable RecordAndSubscribe(IObservable<IInvertibleValue<Action>> source)
{
return Record(source).Subscribe(f => f());
}
/// <summary>
/// Immutable structure encapsulating the current state of the undo/redo stacks.
/// </summary>
private struct RecorderStack
{
/// <summary>
/// An empty RecorderStack, where nothing has been recorded.
/// </summary>
public static readonly RecorderStack Empty = new RecorderStack(
ImmutableStack.Create<UndoRedoAction>(), ImmutableStack.Create<UndoRedoAction>());
private readonly ImmutableStack<UndoRedoAction> redoStack;
private readonly ImmutableStack<UndoRedoAction> undoStack;
private RecorderStack(ImmutableStack<UndoRedoAction> undoStack, ImmutableStack<UndoRedoAction> redoStack)
: this()
{
this.undoStack = undoStack;
this.redoStack = redoStack;
}
/// <summary>
/// Performs an Undo. This returns the inverse of the action being un-done, and a new RecorderStack
/// where the action has been popped from the undo stack and its inverse pushed onto the redo stack.
/// </summary>
public Tuple<Action, RecorderStack> Undo()
{
if (undoStack.IsEmpty)
return Tuple.Create(new Action(() => { }), this);
UndoRedoAction command;
ImmutableStack<UndoRedoAction> newStack = undoStack.Pop(out command);
return Tuple.Create(command.UndoAction, new RecorderStack(newStack, redoStack.Push(command)));
}
/// <summary>
/// Performs a Redo. This returns the action being re-done, and a new RecorderStack where the action
/// has been popped from the redo stack and its inverse pushed onto the undo stack.
/// </summary>
public Tuple<Action, RecorderStack> Redo()
{
if (redoStack.IsEmpty)
return Tuple.Create(new Action(() => { }), this);
UndoRedoAction command;
ImmutableStack<UndoRedoAction> newStack = redoStack.Pop(out command);
return Tuple.Create(command.RedoAction, new RecorderStack(undoStack.Push(command), newStack));
}
/// <summary>
/// Records an UndoRedoCommand. This returns an "empty" action, and a new RecordStack where the undo
/// action has been pushed onto the undo stack, and the redo stack has been erased.
/// </summary>
/// <param name="command">An undo/redo action pair to be recorded.</param>
public Tuple<Action, RecorderStack> Record(UndoRedoAction command)
{
return Tuple.Create(
new Action(() => { }),
new RecorderStack(undoStack.Push(command), ImmutableStack.Create<UndoRedoAction>()));
}
}
/// <summary>
/// Basic struct for storing a pair of undo/redo actions.
/// </summary>
private struct UndoRedoAction
{
public readonly Action RedoAction;
public readonly Action UndoAction;
public UndoRedoAction(Action undoAction, Action redoAction)
{
UndoAction = undoAction;
RedoAction = redoAction;
}
}
}
public static class UndoRedoExtensions
{
/// <summary>
/// Applies an accumulator function over an observable sequence and returns each intermediate result. Each
/// intermediate state is also recorded for playback via undo or redo.
/// </summary>
/// <typeparam name="TState">The type of the result of the aggregation.</typeparam>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">An observable sequence to accumulate over.</param>
/// <param name="initialState">The intitial accumulator value.</param>
/// <param name="accumulator">An accumulator function to be invoked on each element.</param>
/// <param name="recorder"></param>
/// <returns>Sequence of intermediate aggregation values with undo/redo interleaved.</returns>
public static IObservable<TState> RecordScan<TState, TSource>(this IObservable<TSource> source, TState initialState,
Func<TState, TSource, TState> accumulator, ReactiveUndoRedoRecorder recorder)
{
return recorder.RecordScan(source, initialState, accumulator);
}
/// <summary>
/// Applies an accumulator function over an observable sequence and returns each intermediate result. When
/// undone, replays values from the source observable and combines them with the current state via an
/// inverse accumulator function.
/// </summary>
/// <typeparam name="TState">The type of the result of the aggregation.</typeparam>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">An observable sequence to accumulate over.</param>
/// <param name="initialState">The intitial accumulator value.</param>
/// <param name="accumulator">An accumulator function to be invoked on each element.</param>
/// <param name="inverseAccumulator">
/// An accumulator function that does the inverse of accumulator, such that the following
/// invariant is satisfied:
///
/// inverseAccumulator(accumulator(state, a), a) = state
/// </param>
/// <param name="recorder"></param>
/// <returns>Sequence of intermediate aggregation values with undo/redo interleaved.</returns>
public static IObservable<TState> RecordScan<TState, TSource>(this IObservable<TSource> source,
TState initialState, Func<TState, TSource, TState> accumulator,
Func<TState, TSource, TState> inverseAccumulator, ReactiveUndoRedoRecorder recorder)
{
return recorder.RecordScan(source, initialState, accumulator, inverseAccumulator);
}
/// <summary>
/// Applies an accumulator function over an observable sequence of invertible values and returns
/// each intermediate result. When undone, applies the inverse of the recorded value to the
/// accumulator to generate the new intermediate state.
/// </summary>
/// <typeparam name="TState">The type of the result of the aggregation.</typeparam>
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
/// <param name="source">An observable sequence to accumulate over.</param>
/// <param name="initialState">The intitial accumulator value.</param>
/// <param name="accumulator">An accumulator function to be invoked on each element.</param>
/// <param name="recorder"></param>
/// <returns>Sequence of intermediate aggregation values with undo/redo interleaved.</returns>
public static IObservable<TState> RecordScan<TState, TSource>(this IObservable<IInvertibleValue<TSource>> source,
TState initialState, Func<TState, TSource, TState> accumulator, ReactiveUndoRedoRecorder recorder)
{
return recorder.RecordScan(source, initialState, accumulator);
}
/// <summary>
/// Records all data coming from the given observable sequence, allowing for "replaying" of previous
/// values via undo and redo.
/// </summary>
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
/// <param name="initialState">The intial value, to be produced if the first production is undone.</param>
/// <param name="source">An observable sequence to record.</param>
/// <param name="recorder"></param>
/// <returns>The source observable sequence with undo/redo interleaved.</returns>
public static IObservable<T> Record<T>(this IObservable<T> source, T initialState, ReactiveUndoRedoRecorder recorder)
{
return recorder.Record(source, initialState);
}
/// <summary>
/// Given an observable sequence of invertable values, creates a new observable sequence that produces
/// values when the source sequence produces, and also when the source production is undone or redone.
/// </summary>
/// <typeparam name="T">The type of the elements produced by the source and result sequences.</typeparam>
/// <param name="source">An observable sequence of invertible values.</param>
/// <param name="recorder"></param>
/// <returns>The source observable sequence with undo/redo interleaved.</returns>
public static IObservable<T> Record<T>(this IObservable<IInvertibleValue<T>> source,
ReactiveUndoRedoRecorder recorder)
{
return recorder.Record(source);
}
/// <summary>
/// Subscribes to an observable sequence of invertible actions, and also records the actions for replaying
/// via undo or redo.
/// </summary>
/// <param name="source">An observable sequence of invertible actions.</param>
/// <param name="recorder"></param>
/// <returns>An IDisposable representing the subscription to actions produced by the source sequence.</returns>
public static IDisposable RecordAndSubscribe(this IObservable<IInvertibleValue<Action>> source, ReactiveUndoRedoRecorder recorder)
{
return recorder.RecordAndSubscribe(source);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment