Created August 16, 2021 12:33
Elm-like Store using Combine
import Combine
/// A `Store` is a non-failing Publisher whose output carries values of type `State`.
public final class Store<State, Event, Scheduler: Combine.Scheduler>: Publisher {
public typealias Output = State
public typealias Failure = Never
public typealias Events = AnyPublisher<Event, Never>
private let state: CurrentValueSubject<State, Never>
private var machine: Cancellable!
/// `input` provides a way to inject inputs aka "events" aka "actions" into the store which may trigger side effects - if any - and cause mutations of the state.
public let input = PassthroughSubject<Event, Never>()
/// Initialises a store with the given parameters.
/// Publishes the initial value and then waits for events which will be processed in the `update` function.
/// Whenever the update function will be called the new state will be published.
/// The store receives events sent from clients or side effects via the `input` subject.
/// - Parameters:
/// - initial: The initial state value.
/// - update: The update function which is a pure synchronous function mutating the store's state based on the current state and the given event. It returnes a _command_.
/// - effects: A list of side effect functions.
/// - scheduler: The scheduler defining the execution context where the state will be mutated.
public init<Command>(state initial: State,
update: @escaping (State, (State) -> Void, Event) -> Command,
effects: [(AnyPublisher<Command, Never>) -> Events],
scheduler: Scheduler)
typealias Commands = AnyPublisher<Command, Never>
typealias Effect = (Commands) -> Events
self.state = CurrentValueSubject(initial)
self.machine = {
let commandStream = PassthroughSubject<Command, Never>()
let effectStreams = { $0(commandStream.eraseToAnyPublisher()) }
let eventStream = Publishers.MergeMany([input.eraseToAnyPublisher()] + effectStreams)
return eventStream
.receive(on: scheduler)
.map { (event: Event) -> Command in
update(self.state.value, self.state.send, event)
.sink(receiveValue: { command in
/// Initialises a store with the given parameters.
/// Publishes the initial value and then waits for events which will be processed in the `update` function.
/// Whenever the update function will be called the new state will be published.
/// The store receives events sent from clients via the `input` subject.
/// - Parameters:
/// - initial: The initial state value.
/// - update: The update function which is a pure synchronous function mutating the store's state based on the current state and the given event.
/// - scheduler: The scheduler defining the execution context where the state will be mutated.
public init(state initial: State,
update: @escaping (State, (State) -> Void, Event) -> Void,
scheduler: Scheduler)
self.state = CurrentValueSubject(initial)
self.machine = input
.receive(on: scheduler)
.sink(receiveValue: {
update(self.state.value, self.state.send, $0)
public func receive<S>(subscriber: S) where S : Subscriber, Never == S.Failure, State == S.Input {
state.receive(subscriber: subscriber)
