Last active
April 30, 2018 14:47
-
-
Save Steell/e3c57b88abe2a708e526 to your computer and use it in GitHub Desktop.
Rx Implementation of an Undo/Redo Recorder.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Immutable; | |
using System.Reactive; | |
using System.Reactive.Linq; | |
using System.Reactive.Subjects; | |
namespace Steellworks.Rx.UndoRedo | |
{ | |
/// <summary> | |
/// Defines properties for fetching a value and its inverse. | |
/// </summary> | |
/// <typeparam name="T">Type of the value and inverse.</typeparam> | |
public interface IInvertibleValue<out T> | |
{ | |
/// <summary> | |
/// A value, the inverse of which is InverseValue. | |
/// </summary> | |
T Value { get; } | |
/// <summary> | |
/// A value, the inverse of which is Value. | |
/// </summary> | |
T InverseValue { get; } | |
} | |
/// <summary> | |
/// Basic structure for storing a value and its inverse. | |
/// </summary> | |
/// <typeparam name="T">Type of the value and inverse.</typeparam> | |
public struct ValueAndInverse<T> : IInvertibleValue<T> | |
{ | |
public ValueAndInverse(T value, T inverseValue) : this() | |
{ | |
InverseValue = inverseValue; | |
Value = value; | |
} | |
public T Value { get; private set; } | |
public T InverseValue { get; private set; } | |
} | |
/// <summary> | |
/// Reactive undo/redo recorder. Seamlessly integrates into reactive circuits, recording and replaying data when | |
/// an undo or redo is triggered. | |
/// </summary> | |
public class ReactiveUndoRedoRecorder : IDisposable | |
{ | |
private readonly Subject<UndoRedoAction> recordSubject = new Subject<UndoRedoAction>(); | |
private readonly IDisposable updateSub; | |
/// <summary> | |
/// Instantiates a new UndoRedoRecorder, given streams representing undo and redo triggering events. | |
/// </summary> | |
/// <param name="undoTrigger">When this observable produces a value, an undo will occur.</param> | |
/// <param name="redoTrigger">When this observable produces a value, a redo will occur.</param> | |
public ReactiveUndoRedoRecorder(IObservable<Unit> undoTrigger, IObservable<Unit> redoTrigger) | |
{ | |
// explicit wrappers for implicit methods on RecorderStack | |
Func<RecorderStack, Tuple<Action, RecorderStack>> undo = stack => stack.Undo(); | |
Func<RecorderStack, Tuple<Action, RecorderStack>> redo = stack => stack.Redo(); | |
// We want to keep track of the state of the RecorderStack, and also execute actions produced from | |
// undo or redo operations. | |
Tuple<Action, RecorderStack> state0 = Tuple.Create(new Action(() => { }), RecorderStack.Empty); | |
// Make the trigger streams produce the corresponding actions on the RecorderStack. | |
IObservable<Func<RecorderStack, Tuple<Action, RecorderStack>>> undoStream = undoTrigger.Select(_ => undo); | |
IObservable<Func<RecorderStack, Tuple<Action, RecorderStack>>> redoStream = redoTrigger.Select(_ => redo); | |
// Stream of undo/redo actions which are produced when an Undo or Redo is performed. | |
IObservable<Action> undoRedoActions = | |
// Merge all streams that will affect the RecorderStack | |
Observable.Merge(undoStream, redoStream, recordSubject.Select(CreateRecordClosure)) | |
// Update the RecorderStack | |
.Scan(state0, (state, action) => action(state.Item2)) | |
// Only propagate the action produced by the RecorderStack update action. | |
.Select(x => x.Item1); | |
updateSub = undoRedoActions.Subscribe(x => x()); | |
} | |
public void Dispose() | |
{ | |
updateSub.Dispose(); | |
recordSubject.Dispose(); | |
} | |
private static Func<RecorderStack, Tuple<Action, RecorderStack>> CreateRecordClosure(UndoRedoAction com) | |
{ | |
return stack => stack.Record(com); | |
} | |
private void RecordUndoRedoAction(UndoRedoAction com) | |
{ | |
recordSubject.OnNext(com); | |
} | |
/// <summary> | |
/// Applies an accumulator function over an observable sequence and returns each intermediate result. Each | |
/// intermediate state is also recorded for playback via undo or redo. | |
/// </summary> | |
/// <typeparam name="TState">The type of the result of the aggregation.</typeparam> | |
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> | |
/// <param name="source">An observable sequence to accumulate over.</param> | |
/// <param name="initialState">The intitial accumulator value.</param> | |
/// <param name="accumulator">An accumulator function to be invoked on each element.</param> | |
/// <returns>Sequence of intermediate aggregation values with undo/redo interleaved.</returns> | |
public IObservable<TState> RecordScan<TState, TSource>(IObservable<TSource> source, TState initialState, Func<TState, TSource, TState> accumulator) | |
{ | |
return Observable.Create<TState>( | |
observer => | |
{ | |
// This subject is used from inside of the recorded UndoRedo actions, in order to send replayed values | |
// back into this accumulation. | |
var stateUpdateSubject = new Subject<TState>(); | |
// The idea here is that we want to handle the state update differently, depending if a new value is | |
// produced or an old value is being replayed. To facilitate this, we project both sequences into a | |
// sequence of functions that handle the state update. | |
// If a new value is produced by the source stream, then we also need to perform additional | |
// subscription logic. This means that we need to produce both a new state and an action to be invoked | |
// on subscription. | |
// Handler for source observable (new values) | |
var sourceUpdates = source.Select<TSource, Func<TState, Tuple<TState, Action>>>( | |
result => | |
currentState => | |
{ | |
// First we produce a new state by using the accumulator function. | |
TState newState = accumulator(currentState, result); | |
// We also create a UndoRedoAction for replaying the old and new states | |
var com = new UndoRedoAction( | |
() => stateUpdateSubject.OnNext(currentState), | |
() => stateUpdateSubject.OnNext(newState)); | |
// Return the new state, and on subscription record the action. | |
return Tuple.Create(newState, new Action(() => RecordUndoRedoAction(com))); | |
}); | |
// Handler for replayed values | |
var replayedUpdates = stateUpdateSubject.Select<TState, Func<TState, Tuple<TState, Action>>>( | |
// Here, we throw away the current state and replace it with the replayed state. We don't | |
// need to do anything on subscription, so we store an "empty" action. | |
result => _ => Tuple.Create(result, new Action(() => { }))); | |
return | |
// Merge both sequences. | |
Observable.Merge(sourceUpdates, replayedUpdates) | |
// Perform the aggregation. Note that the actual aggregation is performed inside of the handlers | |
// produced by the merged sequences. | |
.Scan(Tuple.Create(initialState, new Action(() => { })), (state, handler) => handler(state.Item1)) | |
// Subscribe to the aggregation by... | |
.Subscribe( | |
result => | |
{ | |
// ...first invoking the subscription action, | |
result.Item2(); | |
// and then passing the intermediate accumulated value to the observer. | |
observer.OnNext(result.Item1); | |
}, | |
observer.OnError, | |
observer.OnCompleted); | |
}); | |
} | |
/// <summary> | |
/// Applies an accumulator function over an observable sequence and returns each intermediate result. When | |
/// undone, replays values from the source observable and combines them with the current state via an | |
/// inverse accumulator function. | |
/// </summary> | |
/// <typeparam name="TState">The type of the result of the aggregation.</typeparam> | |
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> | |
/// <param name="source">An observable sequence to accumulate over.</param> | |
/// <param name="initialState">The intitial accumulator value.</param> | |
/// <param name="accumulator">An accumulator function to be invoked on each element.</param> | |
/// <param name="inverseAccumulator"> | |
/// An accumulator function that does the inverse of accumulator, such that the following | |
/// invariant is satisfied: | |
/// | |
/// inverseAccumulator(accumulator(state, a), a) = state | |
/// </param> | |
/// <returns>Sequence of intermediate aggregation values with undo/redo interleaved.</returns> | |
public IObservable<TState> RecordScan<TState, TSource>(IObservable<TSource> source, TState initialState, Func<TState, TSource, TState> accumulator, Func<TState, TSource, TState> inverseAccumulator) | |
{ | |
return Observable.Create<TState>( | |
observer => | |
{ | |
var stateUpdateSubject = new Subject<Func<TState, TState>>(); | |
var sourceUpdates = source.Select<TSource, Func<TState, Tuple<TState, Action>>>( | |
result => | |
currentState => | |
{ | |
// First we produce a new state by using the accumulator function. | |
TState newState = accumulator(currentState, result); | |
// We also create a UndoRedoAction for replaying the old and new states | |
var com = new UndoRedoAction( | |
() => stateUpdateSubject.OnNext(state => inverseAccumulator(state, result)), | |
() => stateUpdateSubject.OnNext(state => accumulator(state, result))); | |
// Return the new state, and on subscription record the action. | |
return Tuple.Create(newState, new Action(() => RecordUndoRedoAction(com))); | |
}); | |
// Handler for replayed values | |
var replayedUpdates = stateUpdateSubject | |
.Select<Func<TState, TState>, Func<TState, Tuple<TState, Action>>>( | |
result => currentState => Tuple.Create(result(currentState), new Action(() => { }))); | |
return | |
// Merge both sequences. | |
Observable.Merge(sourceUpdates, replayedUpdates) | |
// Perform the aggregation. Note that the actual aggregation is performed inside of the handlers | |
// produced by the merged sequences. | |
.Scan(Tuple.Create(initialState, new Action(() => { })), | |
(state, handler) => handler(state.Item1)) | |
// Subscribe to the aggregation by... | |
.Subscribe( | |
result => | |
{ | |
// ...first invoking the subscription action, | |
result.Item2(); | |
// and then passing the intermediate accumulated value to the observer. | |
observer.OnNext(result.Item1); | |
}, | |
observer.OnError, | |
observer.OnCompleted); | |
}); | |
} | |
/// <summary> | |
/// Applies an accumulator function over an observable sequence of invertible values and returns | |
/// each intermediate result. When undone, applies the inverse of the recorded value to the | |
/// accumulator to generate the new intermediate state. | |
/// </summary> | |
/// <typeparam name="TState">The type of the result of the aggregation.</typeparam> | |
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> | |
/// <param name="source">An observable sequence to accumulate over.</param> | |
/// <param name="initialState">The intitial accumulator value.</param> | |
/// <param name="accumulator">An accumulator function to be invoked on each element.</param> | |
/// <returns>Sequence of intermediate aggregation values with undo/redo interleaved.</returns> | |
public IObservable<TState> RecordScan<TState, TSource>(IObservable<IInvertibleValue<TSource>> source, TState initialState, Func<TState, TSource, TState> accumulator) | |
{ | |
return Observable.Create<TState>( | |
observer => | |
{ | |
var stateUpdateSubject = new Subject<TSource>(); | |
var sourceUpdates = source.Select<IInvertibleValue<TSource>, Func<TState, Tuple<TState, Action>>>( | |
result => | |
currentState => | |
{ | |
// First we produce a new state by using the accumulator function. | |
TState newState = accumulator(currentState, result.Value); | |
// We also create a UndoRedoAction for replaying the old and new states | |
var com = new UndoRedoAction( | |
() => stateUpdateSubject.OnNext(result.InverseValue), | |
() => stateUpdateSubject.OnNext(result.Value)); | |
// Return the new state, and on subscription record the action. | |
return Tuple.Create(newState, new Action(() => RecordUndoRedoAction(com))); | |
}); | |
// Handler for replayed values | |
var replayedUpdates = stateUpdateSubject | |
.Select<TSource, Func<TState, Tuple<TState, Action>>>( | |
result => currentState => Tuple.Create(accumulator(currentState, result), new Action(() => { }))); | |
return | |
// Merge both sequences. | |
Observable.Merge(sourceUpdates, replayedUpdates) | |
// Perform the aggregation. Note that the actual aggregation is performed inside of the handlers | |
// produced by the merged sequences. | |
.Scan(Tuple.Create(initialState, new Action(() => { })), | |
(state, handler) => handler(state.Item1)) | |
// Subscribe to the aggregation by... | |
.Subscribe( | |
result => | |
{ | |
// ...first invoking the subscription action, | |
result.Item2(); | |
// and then passing the intermediate accumulated value to the observer. | |
observer.OnNext(result.Item1); | |
}, | |
observer.OnError, | |
observer.OnCompleted); | |
}); | |
} | |
/// <summary> | |
/// Records all data coming from the given observable sequence, allowing for "replaying" of previous | |
/// values via undo and redo. | |
/// </summary> | |
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam> | |
/// <param name="source">An observable sequence to record.</param> | |
/// <param name="initialState">The intial value, to be produced if the first production is undone.</param> | |
/// <returns>The source observable sequence with undo/redo interleaved.</returns> | |
public IObservable<T> Record<T>(IObservable<T> source, T initialState) | |
{ | |
return RecordScan(source, initialState, (_, source1) => source1); | |
} | |
/// <summary> | |
/// Given an observable sequence of invertable values, creates a new observable sequence that produces | |
/// values when the source sequence produces, and also when the source production is undone or redone. | |
/// </summary> | |
/// <typeparam name="T">The type of the elements produced by the source and result sequences.</typeparam> | |
/// <param name="source">An observable sequence of invertible values.</param> | |
/// <returns>The source observable sequence with undo/redo interleaved.</returns> | |
public IObservable<T> Record<T>(IObservable<IInvertibleValue<T>> source) | |
{ | |
return Observable.Create<T>( | |
observer => | |
source.Subscribe( | |
result => // When the source sequence produces values... | |
{ | |
// Create an UndoRedoAction from the InvertibleValue | |
var com = new UndoRedoAction( | |
() => observer.OnNext(result.InverseValue), | |
() => observer.OnNext(result.Value)); | |
// Record the action onto the RecorderStack. | |
RecordUndoRedoAction(com); | |
// Produce the value to the observer. | |
observer.OnNext(result.Value); | |
}, | |
observer.OnError, | |
observer.OnCompleted)); | |
} | |
/// <summary> | |
/// Subscribes to an observable sequence of invertible actions, and also records the actions for replaying | |
/// via undo or redo. | |
/// </summary> | |
/// <param name="source">An observable sequence of invertible actions.</param> | |
/// <returns>An IDisposable representing the subscription to actions produced by the source sequence.</returns> | |
public IDisposable RecordAndSubscribe(IObservable<IInvertibleValue<Action>> source) | |
{ | |
return Record(source).Subscribe(f => f()); | |
} | |
/// <summary> | |
/// Immutable structure encapsulating the current state of the undo/redo stacks. | |
/// </summary> | |
private struct RecorderStack | |
{ | |
/// <summary> | |
/// An empty RecorderStack, where nothing has been recorded. | |
/// </summary> | |
public static readonly RecorderStack Empty = new RecorderStack( | |
ImmutableStack.Create<UndoRedoAction>(), ImmutableStack.Create<UndoRedoAction>()); | |
private readonly ImmutableStack<UndoRedoAction> redoStack; | |
private readonly ImmutableStack<UndoRedoAction> undoStack; | |
private RecorderStack(ImmutableStack<UndoRedoAction> undoStack, ImmutableStack<UndoRedoAction> redoStack) | |
: this() | |
{ | |
this.undoStack = undoStack; | |
this.redoStack = redoStack; | |
} | |
/// <summary> | |
/// Performs an Undo. This returns the inverse of the action being un-done, and a new RecorderStack | |
/// where the action has been popped from the undo stack and its inverse pushed onto the redo stack. | |
/// </summary> | |
public Tuple<Action, RecorderStack> Undo() | |
{ | |
if (undoStack.IsEmpty) | |
return Tuple.Create(new Action(() => { }), this); | |
UndoRedoAction command; | |
ImmutableStack<UndoRedoAction> newStack = undoStack.Pop(out command); | |
return Tuple.Create(command.UndoAction, new RecorderStack(newStack, redoStack.Push(command))); | |
} | |
/// <summary> | |
/// Performs a Redo. This returns the action being re-done, and a new RecorderStack where the action | |
/// has been popped from the redo stack and its inverse pushed onto the undo stack. | |
/// </summary> | |
public Tuple<Action, RecorderStack> Redo() | |
{ | |
if (redoStack.IsEmpty) | |
return Tuple.Create(new Action(() => { }), this); | |
UndoRedoAction command; | |
ImmutableStack<UndoRedoAction> newStack = redoStack.Pop(out command); | |
return Tuple.Create(command.RedoAction, new RecorderStack(undoStack.Push(command), newStack)); | |
} | |
/// <summary> | |
/// Records an UndoRedoCommand. This returns an "empty" action, and a new RecordStack where the undo | |
/// action has been pushed onto the undo stack, and the redo stack has been erased. | |
/// </summary> | |
/// <param name="command">An undo/redo action pair to be recorded.</param> | |
public Tuple<Action, RecorderStack> Record(UndoRedoAction command) | |
{ | |
return Tuple.Create( | |
new Action(() => { }), | |
new RecorderStack(undoStack.Push(command), ImmutableStack.Create<UndoRedoAction>())); | |
} | |
} | |
/// <summary> | |
/// Basic struct for storing a pair of undo/redo actions. | |
/// </summary> | |
private struct UndoRedoAction | |
{ | |
public readonly Action RedoAction; | |
public readonly Action UndoAction; | |
public UndoRedoAction(Action undoAction, Action redoAction) | |
{ | |
UndoAction = undoAction; | |
RedoAction = redoAction; | |
} | |
} | |
} | |
public static class UndoRedoExtensions | |
{ | |
/// <summary> | |
/// Applies an accumulator function over an observable sequence and returns each intermediate result. Each | |
/// intermediate state is also recorded for playback via undo or redo. | |
/// </summary> | |
/// <typeparam name="TState">The type of the result of the aggregation.</typeparam> | |
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> | |
/// <param name="source">An observable sequence to accumulate over.</param> | |
/// <param name="initialState">The intitial accumulator value.</param> | |
/// <param name="accumulator">An accumulator function to be invoked on each element.</param> | |
/// <param name="recorder"></param> | |
/// <returns>Sequence of intermediate aggregation values with undo/redo interleaved.</returns> | |
public static IObservable<TState> RecordScan<TState, TSource>(this IObservable<TSource> source, TState initialState, | |
Func<TState, TSource, TState> accumulator, ReactiveUndoRedoRecorder recorder) | |
{ | |
return recorder.RecordScan(source, initialState, accumulator); | |
} | |
/// <summary> | |
/// Applies an accumulator function over an observable sequence and returns each intermediate result. When | |
/// undone, replays values from the source observable and combines them with the current state via an | |
/// inverse accumulator function. | |
/// </summary> | |
/// <typeparam name="TState">The type of the result of the aggregation.</typeparam> | |
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> | |
/// <param name="source">An observable sequence to accumulate over.</param> | |
/// <param name="initialState">The intitial accumulator value.</param> | |
/// <param name="accumulator">An accumulator function to be invoked on each element.</param> | |
/// <param name="inverseAccumulator"> | |
/// An accumulator function that does the inverse of accumulator, such that the following | |
/// invariant is satisfied: | |
/// | |
/// inverseAccumulator(accumulator(state, a), a) = state | |
/// </param> | |
/// <param name="recorder"></param> | |
/// <returns>Sequence of intermediate aggregation values with undo/redo interleaved.</returns> | |
public static IObservable<TState> RecordScan<TState, TSource>(this IObservable<TSource> source, | |
TState initialState, Func<TState, TSource, TState> accumulator, | |
Func<TState, TSource, TState> inverseAccumulator, ReactiveUndoRedoRecorder recorder) | |
{ | |
return recorder.RecordScan(source, initialState, accumulator, inverseAccumulator); | |
} | |
/// <summary> | |
/// Applies an accumulator function over an observable sequence of invertible values and returns | |
/// each intermediate result. When undone, applies the inverse of the recorded value to the | |
/// accumulator to generate the new intermediate state. | |
/// </summary> | |
/// <typeparam name="TState">The type of the result of the aggregation.</typeparam> | |
/// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam> | |
/// <param name="source">An observable sequence to accumulate over.</param> | |
/// <param name="initialState">The intitial accumulator value.</param> | |
/// <param name="accumulator">An accumulator function to be invoked on each element.</param> | |
/// <param name="recorder"></param> | |
/// <returns>Sequence of intermediate aggregation values with undo/redo interleaved.</returns> | |
public static IObservable<TState> RecordScan<TState, TSource>(this IObservable<IInvertibleValue<TSource>> source, | |
TState initialState, Func<TState, TSource, TState> accumulator, ReactiveUndoRedoRecorder recorder) | |
{ | |
return recorder.RecordScan(source, initialState, accumulator); | |
} | |
/// <summary> | |
/// Records all data coming from the given observable sequence, allowing for "replaying" of previous | |
/// values via undo and redo. | |
/// </summary> | |
/// <typeparam name="T">The type of the elements in the source sequence.</typeparam> | |
/// <param name="initialState">The intial value, to be produced if the first production is undone.</param> | |
/// <param name="source">An observable sequence to record.</param> | |
/// <param name="recorder"></param> | |
/// <returns>The source observable sequence with undo/redo interleaved.</returns> | |
public static IObservable<T> Record<T>(this IObservable<T> source, T initialState, ReactiveUndoRedoRecorder recorder) | |
{ | |
return recorder.Record(source, initialState); | |
} | |
/// <summary> | |
/// Given an observable sequence of invertable values, creates a new observable sequence that produces | |
/// values when the source sequence produces, and also when the source production is undone or redone. | |
/// </summary> | |
/// <typeparam name="T">The type of the elements produced by the source and result sequences.</typeparam> | |
/// <param name="source">An observable sequence of invertible values.</param> | |
/// <param name="recorder"></param> | |
/// <returns>The source observable sequence with undo/redo interleaved.</returns> | |
public static IObservable<T> Record<T>(this IObservable<IInvertibleValue<T>> source, | |
ReactiveUndoRedoRecorder recorder) | |
{ | |
return recorder.Record(source); | |
} | |
/// <summary> | |
/// Subscribes to an observable sequence of invertible actions, and also records the actions for replaying | |
/// via undo or redo. | |
/// </summary> | |
/// <param name="source">An observable sequence of invertible actions.</param> | |
/// <param name="recorder"></param> | |
/// <returns>An IDisposable representing the subscription to actions produced by the source sequence.</returns> | |
public static IDisposable RecordAndSubscribe(this IObservable<IInvertibleValue<Action>> source, ReactiveUndoRedoRecorder recorder) | |
{ | |
return recorder.RecordAndSubscribe(source); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment