Skip to content

Instantly share code, notes, and snippets.

@blinkybool
Last active July 12, 2024 20:01
Show Gist options
  • Save blinkybool/1390d53a730493e2ce72549c5bf7eaec to your computer and use it in GitHub Desktop.
Save blinkybool/1390d53a730493e2ce72549c5bf7eaec to your computer and use it in GitHub Desktop.
Stream.luau - control-flow & data-flow library for Roblox.
--!strict
--!native
--[[
MIT License
Copyright (c) 2024 Billy Snikkers
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
]]
--[[
--------------------------------------------------------------------------------
-- Stream
--------------------------------------------------------------------------------
Original author: Billy Snikkers
Modifications by: <you>
Contents:
- clean : A general purpose cleanup function
- Stream constructors and transformers
- `Stream<T...>` type
- of, from, map filter, delayed, merge, pipe, combineLatest, switchMap etc...
- Roblox-instance-related stream constructors/transformers
- propertyOf, attributeOf, fromSignal, fromValueBase, firstChildOf etc...
- LifeStream constructors, transformers and lifetime management.
- `LifeStream<T> = Stream<T,boolean>` type
- eachPlayer, eachChildOf, toEachChild, eachTagged
- mount & new (for declarative and reactive instance creation)
- mount, new, compute, toStream
ATTENTION: The clean function comes first but it's not so interesting.
Skip ahead to `export type Stream<T>` to read about streams.
]]
-- This is set to `any` instead of the commented out version because luau just unfolds it
-- into a mess when you try and hover over any type involving CleanupTask.
type CleanupTask = any
-- type CleanupTask = () -> ()
-- | thread | RBXScriptConnection | Instance
-- | {Destroy: (self: any) -> ()} | {}
-- | {CleanupTask} | nil
--[[
The creation of an object or initiation of a behaviour can (should always) yield
an artifact called a "cleanup task". This artifact enables any resources created
or behaviours connected to be "cleaned up". Some examples
- A thread can be cancelled
- A connection can be disconnected
- An instance (or a class-object with Destroy method) can be destroyed
We can also have higher-order cleanup tasks:
- A collection (array) of cleanups can be cleaned up one-by-one (in reverse order)
- A function can be called to perform arbitrary clean-up or other housekeeping behaviour.
`clean` is a function which uniformly cleans up any of these artifacts (recursively).
Notes:
- We typically use the verb `cleanup` for a CleanupTask variable, pretending it is the function kind of CleanupTask.
- Instead of using this clean function, we could choose to always wrap a cleanup task in a function,
but this clutters the code and creates extra functions unecessarily.
> We just want to return *stuff* and have it dealt with!
- CleanupTasks can be nil
- Multiple CleanupTasks can be simply gathered into an array (itself a cleanup task),
e.g. {cleanup1, cleanup2}. But since any CleanupTask can be nil, this is an array with holes.
Thus you cannot reliably get the length with #, or make other array-iteration assumptions.
- Ignore the refs argument when using clean. It is used internally to avoid infinite recursion.
]]
local function clean(cleanup: CleanupTask, refs: {[any]:true}?)
if cleanup == nil then
return
elseif type(cleanup) == "function" then
cleanup()
elseif type(cleanup) == "thread" then
local cancelled
if coroutine.running() ~= cleanup then
cancelled = pcall(function()
task.cancel(cleanup)
end)
end
if not cancelled then
local toCancel = cleanup
task.defer(function()
task.cancel(toCancel)
end)
end
elseif typeof(cleanup) == "RBXScriptConnection" then
cleanup:Disconnect()
elseif typeof(cleanup) == "Instance" then
cleanup:Destroy()
elseif typeof(cleanup) == "table" then
if (cleanup :: any).Destroy then
(cleanup :: any):Destroy()
elseif getmetatable((cleanup :: any)) == nil then
local max_index = 0
for key in (cleanup :: any) do
if typeof(key) == "number" then
max_index = math.max(max_index, key)
else
warn(`[clean] Cannot clean non-numeric task table - might be an already destroyed object\nTraceback:\n{debug.traceback()}`)
return
end
end
if refs then
if refs[cleanup] then
return
end
refs[cleanup] = true
else
refs = {[cleanup]=true :: true}
end
-- Cleanup in reverse order of creation
for i=max_index, 1, -1 do
clean((cleanup :: any)[i], refs)
end
if not table.isfrozen(cleanup) then
table.clear(cleanup)
end
else
error(`[clean] Cannot cleanup table with a metatable but no :Destroy() method.\nTable:{cleanup}\nTraceback:\n{debug.traceback()}`)
end
end
end
export type Stream<T...> = ((T...) -> ()) -> CleanupTask
--[[
A stream is
- Literally: A function that takes a callback-function (we call it a listener),
which it calls with data whenever it pleases, until it is "cleaned up".
- Conceptually: a stream of data, whose datum is emitted synchronously or asynchronously
Example:
```lua
local function myStream(listener: (number) -> ()): CleanupTask
-- Emits synchronously
listener(1)
listener(2)
-- Emits asynchronously
local value = 3
local thread = task.spawn(function()
while true do
task.wait(1)
listener(value)
value += 1
end
end)
-- This is the cleanup - `clean(thread)` will stop this stream.
return thread
end
local cleanup = myStream(print) -- prints 1 and 2 immediately
-- will print 3,4,5,6... every second until cleanup is called
task.wait(4.5)
clean(cleanup)
```
How to think about streams?
Streams are about control-flow and data-flow. By listening to a stream, i.e. giving it a callback/behaviour, you
yield control to the stream to decide when and with-what-data that behaviour is executed.
In this sense, they can be thought of as a common generalisation of for-loops and events, which both provide data from some source,
and execute a behaviour synchronously (for-loops) or asynchronously (events).
But do not be mistaken, they are strictly more-powerful than either concept!
-- TODO: insert link to blog post
]]
-- selene:allow(unused_variable)
local function NO_OP<T...>(... : T...): () end
--[[
A stream that never emits anything
]]
-- selene:allow(unused_variable)
local function never<T...>(_listener: (T...) -> ()): CleanupTask
return nil
end
--[[
- `listen(stream, listener)` is just `stream(listener)`
- `listen(stream)` is just `stream(NO_OP)` (listens to the stream but does nothing with the emitted values)
]]
local function listen<T...>(stream: Stream<T...>, listener: nil | (T...) -> ()): CleanupTask
return stream(listener or NO_OP)
end
--[[
The provided (tidy)-listener can return a cleanup task after each emission, which will be
called before the next value is emitted (or when the stream ends).
If you have a LifeStream<T>, use `listenTidyEach` to bind to individual lifetimes.
]]
local function listenTidy<T>(stream: Stream<T>, tidyListener: (T) -> CleanupTask): CleanupTask
local cleanupListener: CleanupTask = nil
local cleanupStream = stream(function(value: T): ()
clean(cleanupListener)
cleanupListener = nil
cleanupListener = tidyListener(value)
end)
return function()
clean(cleanupListener)
cleanupListener = nil
clean(cleanupStream)
cleanupStream = nil
end
end
--[[
A stream that emits each item in the array synchronously (not the array itself)
]]
local function from<T>(array : {T}): Stream<T>
return function(listener: (T) -> ()): CleanupTask
for _, value in array do
listener(value)
end
return nil
end
end
--[[
A stream that emits each argument synchronously
]]
local function of<T>(... : T): Stream<T>
local array = {...}
return function(listener: (T) -> ()): CleanupTask
for _, value in array do
listener(value)
end
return nil
end
end
--[[
(Transformer)
`Stream<->` is a functor.
- `map(fn)(stream)` emits `fn(x)` whenever `stream` emits `x`.
Example (using pipe2)
```lua
-- A stream that emits "2", "4" and "6"
local stream : Stream<string> = pipe2(
of(1,2,3),
map(function(x)
return 2 * x
end)
map(tostring)
)
```
]]
local function map<T...,U...>(fn: (T...) -> U...): (Stream<T...>) -> Stream<U...>
return function(stream: Stream<T...>): Stream<U...>
-- The mapped stream
return function(listener: (U...) -> ()): CleanupTask
-- Listen and map the values, returning CleanupTask
return stream(function(...: T...): ()
listener(fn(...))
end)
end
end
end
--[[
(Transformer)
`mapTo(x,y,z)(stream)` emits `(x,y,z)` (together) whenever `stream` emits anything.
]]
local function mapTo<T...,U...>(...: U...): (Stream<T...>) -> Stream<U...>
local values: {any} = table.pack(...)
return function(stream: Stream<T...>): Stream<U...>
-- The mapped stream
return function(listener: (U...) -> ()): CleanupTask
-- selene:allow(unused_variable)
return stream(function(...): ()
listener(table.unpack(values))
end)
end
end
end
--[[
(Transformer)
Call `fn` with the emitted values and re-emit them. Doesn't modify the stream
data, but is useful for inspection for debugging.
Example:
```lua
local stream = of(1,2,3)
listen(tap(print)(stream)) -- prints 1, 2, 3 (on separate lines)
```
]]
local function tap<T...>(fn: (T...) -> ()): (Stream<T...>) -> Stream<T...>
return function(stream: Stream<T...>): Stream<T...>
-- The tapped stream
return function(listener: (T...) -> ()): CleanupTask
return stream(function(...: T...): ()
fn(...)
listener(...)
end)
end
end
end
--[[
(Transformer)
Filter the stream using the provided predicate.
`filter(predicate)(stream)` emits `x` when `stream` emits `x` if `predicate(x) == true`.
]]
local function filter<T...>(predicate: (T...) -> boolean) : (Stream<T...>) -> Stream<T...>
return function(stream: Stream<T...>): Stream<T...>
-- The filtered stream
return function(listener: (T...) -> ()): CleanupTask
return stream(function(...: T...): ()
if predicate(...) then
listener(...)
end
end)
end
end
end
--[[
(Transformer)
`delay(seconds)(stream)` is a stream which, when listened to, delays listening
to `stream` for `seconds` seconds, and then emits the values from `stream`
as they are emitted. It does not yield the current thread.
Example:
```lua
listen(delayed(1)(of("a","b")), print)
-- after 1 second
-- prints "a"
-- prints "b" (immediately after "a")
```
]]
local function delayed<T...>(seconds: number): (Stream<T...>) -> Stream<T...>
return function(stream: Stream<T...>): Stream<T...>
return function(listener:(T...) -> ())
return task.delay(seconds, stream, listener)
end
end
end
--[[
(Transformer)
Merge multiple streams into one.
`merge(stream1, stream2, ..., stream_n)` emits `x` when any of `stream1`, `stream2` ... `stream_n` emits `x`.
Example:
```lua
local stream = merge(
delayed(2)(of("a")),
of("b", "c"),
delayed(1)(of("d"))
)
listen(stream, print)
-- prints "b" (immediately)
-- prints "c" (immediately)
-- prints "d" (after 1 second)
-- prints "a" (after 2 seconds)
```
]]
local function merge<T...>(... : Stream<T...>): Stream<T...>
local streams = {...}
-- The merged stream
return function(listener: (T...) -> ()): CleanupTask
local cleanups = {}
for _, stream in streams do
table.insert(cleanups, stream(listener))
end
return cleanups
end
end
--[[
Pipes invert the function application syntax.
`pipe(stream)(transformer1, transformer2, ..., transformer_n)` is the same as
`transformer_n(...(transformer2(transformer1(stream))))`
This variadic version can't be properly typed, use the `pipe1`, `pipe2`,
`pipe3` and `pipe4` versions if you want to preserve types.
]]
local function pipe(stream: Stream<...any>, ...: (Stream<...any>) -> Stream<...any>): Stream<...any>
for _, transformer in {...} do
stream = transformer(stream)
end
return stream
end
--[[
Why not lol.
]]
local function pipe0<A...>(stream: Stream<A...>): Stream<A...>
return stream
end
--[[
`pipe1(stream, t1)` is just `t1(stream)`.
The pipe functions allow streams to be syntatically transformed in "chronological" order of transformation,
with transformations of the original stream occuring from left to right (or top to bottom).
]]
local function pipe1<A...,Z...>(
stream: Stream<A...>,
t1: (Stream<A...>) -> Stream<Z...>) : Stream<Z...>
return t1(stream)
end
--[[
`pipe2(stream, t1, t2)` is just `t2(t1(stream))`.
The pipe functions allow streams to be syntatically transformed in "chronological" order of transformation,
with transformations of the original stream occuring from left to right (or top to bottom).
]]
local function pipe2<A...,B...,Z...>(
stream: Stream<A...>,
t1: (Stream<A...>) -> Stream<B...>,
t2: (Stream<B...>) -> Stream<Z...>) : Stream<Z...>
return t2(t1(stream))
end
--[[
`pipe3(stream, t1, t2, t3)` is just `t3(t2(t1(stream)))`.
The pipe functions allow streams to be syntatically transformed in "chronological" order of transformation,
with transformations of the original stream occuring from left to right (or top to bottom).
]]
local function pipe3<A...,B...,C...,Z...>(
stream: Stream<A...>,
t1: (Stream<A...>) -> Stream<B...>,
t2: (Stream<B...>) -> Stream<C...>,
t3: (Stream<C...>) -> Stream<Z...>) : Stream<Z...>
return t3(t2(t1(stream)))
end
--[[
`pipe4(stream, t1, t2, t3, t4)` is just `t4(t3(t2(t1(stream))))`.
The pipe functions allow streams to be syntatically transformed in "chronological" order of transformation, with transformations
of the original stream occuring from left to right (or top to bottom).
]]
local function pipe4<A...,B...,C...,D...,Z...>(
stream: Stream<A...>,
t1: (Stream<A...>) -> Stream<B...>,
t2: (Stream<B...>) -> Stream<C...>,
t3: (Stream<C...>) -> Stream<D...>,
t4: (Stream<D...>) -> Stream<Z...>) : Stream<Z...>
return t4(t3(t2(t1(stream))))
end
--[[
(Transformer)
Emits the first value from any of the streams, and ignores the rest.
]]
local function firstOfAny<T...>(...: Stream<T...>): Stream<T...>
local streams = {...}
-- The output stream
return function(listener: (T...) -> ()): CleanupTask
local fired = false
local cleanup : CleanupTask = nil
cleanup = merge(table.unpack(streams))(function(...: T...): ()
if not fired then
fired = true
clean(cleanup)
listener(...)
end
end)
-- If the stream fired immediately (synchronously), cleanup was not yet set
-- so we must clean it up here
if fired then
clean(cleanup)
cleanup = nil
end
return cleanup
end
end
-- Don't export this >:(
local UNSET = newproxy(false)
type UNSET = typeof(UNSET)
--[[
(Transformer)
Only emits values that are not equal to the previous value.
i.e. `1,2,2,3,1,1,2` becomes `1,2,3,1,2`
]]
local function skipUnchanged<T>(stream: Stream<T>): Stream<T>
return function(listener: (T) -> ()): CleanupTask
local prev: T | UNSET = UNSET
return stream(function(x: T): ()
if x ~= prev then
prev = x
listener(x)
end
end)
end
end
--[[
(Transformer)
Converts a table of streams into a stream of tables.
- When `combineLatest(streams)` emits `values :: {[K] : V}` with `values[key] == x` then `x`
is the latest value emitted by the stream `t[key]`.
- `combineLatest(streams)` will not emit until all streams have emitted at least once.
- When all streams have emitted at least once, `combineLatest(streams)` will emit whenever any of the streams emit.
Note: `dontCloneTheEmittedTableIKnowWhatImDoing` should only be true if you know what you're doing, i.e.
- You've read/understand this function
- You will just unpack it or iterate over it, and not mutate it.
- You will not yield at any point before you are done accessing the table (it can be mutated by the stream)
- You will not store a reference to it or mutate it (it can be mutated by the stream)
]]
local function combineLatest<K,V>(streams: {[K] : Stream<V>}, dontCloneTheEmittedTableIKnowWhatImDoing: boolean?): Stream<{[K] : V}>
-- The combined stream
return function(listener: ({[K] : V}) -> ()): CleanupTask
-- Initialize values to UNSET (no emitting until all streams have emitted)
local values: {[K] : V | UNSET} = {}
for key in streams do
values[key] = UNSET
end
-- Listen to all streams
local cleanups = {}
for key, stream in streams do
table.insert(cleanups, stream(function(x: V): ()
values[key] = x
for _, value in values do
if value == UNSET then
return
end
end
if dontCloneTheEmittedTableIKnowWhatImDoing then
listener(values)
else
listener(table.clone(values))
end
end))
end
return cleanups
end
end
--[[
(Transformer)
Map the values in stream `a` with the function `fn`.
]]
local function combine1<A,Z>(a: Stream<A>, fn: (A) -> Z): Stream<Z>
-- Output stream is just `map(fn)(a)`, but we unfold it for less debugging whiplash
return function(listener: (Z) -> ()): CleanupTask
return a(function(value: A): ()
listener(fn(value))
end)
end
end
--[[
(Transformer)
Combine the latest values of `a`, `b` with the function `fn`.
Useful for computing derived values from multiple streams.
]]
local function combine2<A,B,Z>(a: Stream<A>, b: Stream<B>, fn: (A,B) -> Z): Stream<Z>
return function(listener: (Z) -> ()): CleanupTask
return combineLatest({a,b} :: {Stream<any>}, true)(function(values: {any}): ()
listener(fn(table.unpack(values)))
end)
end
end
--[[
(Transformer)
Combine the latest values of `a`, `b`, `c` with the function `fn`.
Useful for computing derived values from multiple streams.
]]
local function combine3<A,B,C,Z>(a: Stream<A>, b: Stream<B>, c: Stream<C>, fn: (A,B,C) -> Z): Stream<Z>
return function(listener: (Z) -> ()): CleanupTask
return combineLatest({a,b,c} :: {Stream<any>}, true)(function(values: {any}): ()
listener(fn(table.unpack(values)))
end)
end
end
--[[
(Transformer)
Combine the latest values of `a`, `b`, `c`, `d` with the function `fn`.
Useful for computing derived values from multiple streams.
]]
local function combine4<A,B,C,D,Z>(a: Stream<A>, b: Stream<B>, c: Stream<C>, d: Stream<D>, fn: (A,B,C,D) -> Z): Stream<Z>
return function(listener: (Z) -> ()): CleanupTask
return combineLatest({a,b,c,d} :: {Stream<any>}, true)(function(values: {any}): ()
listener(fn(table.unpack(values)))
end)
end
end
--[[
(Transformer)
Convert a stream of streams into a stream that always emits from the latest stream,
cleaning up previous streams when a new one is emitted.
]]
local function switchAll<T...>(stream: Stream<Stream<T...>>): Stream<T...>
return function(listener: (T...) -> ()): CleanupTask
local cleanupInner : CleanupTask? = nil
local cleanupStream = stream(function(innerStream: Stream<T...>): ()
clean(cleanupInner)
cleanupInner = nil
cleanupInner = innerStream(listener)
end)
return function()
clean(cleanupInner)
cleanupInner = nil
clean(cleanupStream)
cleanupStream = nil
end
end
end
--[[
(Transformer)
`switchMap(fn)(stream)` emits from the stream `fn(x)` while `x` is the latest emitted value of `stream`.
Useful for when you wanted to listen to a stream that depends on the latest value of another stream.
Prototypical example is `toProperty(property)`. Go read that, otherwise, here is a toy example.
```lua
-- Emits 1, 2, 3, 4, ... with a `seconds` second delay between each.
local function timer(seconds: number): Stream<number>
return function(listener)
return task.spawn(function()
local x = 1
while true do
task.wait(seconds)
listener(x)
end
end)
end
end
-- This emits "(1,1)" (time=0.4s), "(1,2)" (time=0.8s),
-- "(2,1)" (time=1.4s), "(2,2)" (time=1.8s), "(3,1)" (time=2.4s)
-- Note that the inner stream is being "switched out" at the whole-second points.
local stream: Stream<number> = pipe1(
timer(1),
switchMap(function(x)
return pipe1(
timer(0.4),
map(function(y)
return `({x},{y})`
end)
)
end)
)
```
]]
local function switchMap<T...,U...>(fn: (T...) -> Stream<U...>): (Stream<T...>) -> Stream<U...>
return function(source: Stream<T...>): Stream<U...>
-- Apply switchAll to a stream of streams
-- so that we get a stream that emits only from the latest stream
-- Note this code is just `switchAll(map(fn)(source))` unfolded for less debugging whiplash
return switchAll(function(streamListener: (Stream<U...>) -> ()): CleanupTask
-- Apply fn to every emitted value to get a stream, and give it to the streamListener
return source(function(...: T...): ()
streamListener(fn(...))
end)
end)
end
end
--[[
`nilOnce: Stream<T?>`
Just emits `nil` immediately
]]
local function nilOnce<T>(listener: (T?) -> ()): CleanupTask
listener(nil)
return nil
end
--[[
(Transformer) `replaceNil(value)(stream)` replaces nil-values from `stream` with `value`.
]]
local function replaceNil<T>(value: T): (Stream<T?>) -> Stream<T>
return function(stream: Stream<T?>): Stream<T>
return function(listener: (T) -> ())
return stream(function(x: T?)
if x == nil then
listener(value)
else
listener(x)
end
end)
end
end
end
--[[
(Transformer) filters out nil-values from a stream.
Note: it's usually better to use`replaceNil(value)`, or just handle the nil
case in your listener function, since otherwise you are not reacting to your
data becoming unnavailable.
]]
local function blockNil<T>(stream: Stream<T?>): Stream<T>
return function(listener: (T) -> ())
return stream(function(x: T?)
if x ~= nil then
listener(x)
end
end)
end
end
--[[
`attributeOf(instance, attribute)` emits `instance:GetAttribute(attribute)`
immediately and whenever it changes.
Note it could be nil, so consider using `replaceNil` or `blockNil`.
]]
local function attributeOf(instance: Instance, attribute: string): Stream<any?>
return function(listener: (any?) -> ()): CleanupTask
listener(instance:GetAttribute(attribute))
return instance:GetAttributeChangedSignal(attribute):Connect(function()
listener(instance:GetAttribute(attribute))
end)
end
end
--[[
Turns a `Stream<Instance?>` in to a `Stream<any?>`, where the emitted value
is the latest value of the attribute of the latest instance, or nil if either
the latest instance is nil, or the attribute is nil (usually if it's not yet set).
Example:
```lua
-- Emits the "TorsoDamage" attribute of the PrimaryPart of the LocalPlayer's Character,
-- or nil if the LocalPlayer's character is nil, or if the character's the PrimaryPart is nil,
-- or if the attribute is nil.
local stream: Stream<number?> = pipe2(
propertyOf(Players.LocalPlayer, "Character"),
toProperty("PrimaryPart"),
-- the incoming stream to this next transformer is of type Stream<Instance?>
toAttribute("TorsoDamage")
)
```
]]
local function toAttribute(attribute: string): (Stream<Instance?>) -> Stream<any?>
return switchMap(function(instance: Instance?): Stream<any?>
if instance then
return attributeOf(instance, attribute)
else
return nilOnce
end
end)
end
--[[
Safely verify that an instance has a property.
]]
local function hasProperty(instance: Instance, name: string): boolean
return (pcall(instance.GetPropertyChangedSignal, instance, name))
end
--[[
`propertyOf(instance, property)` emits `instance[property]` immediately and whenever it changes.
]]
local function propertyOf(instance: Instance, property: string): Stream<any>
if not hasProperty(instance, property) then
warn(`[Stream.propertyOf] Instance {instance} does not have property {property}`)
return never
end
return function(listener: (any) -> ()): CleanupTask
listener((instance :: any)[property])
return instance:GetPropertyChangedSignal(property):Connect(function()
listener((instance :: any)[property])
end)
end
end
--[[
Turns a Stream<Instance?> into a Stream<any?>, where the emitted value is the property of the Instance,
or nil if the latest instance is nil.
Example:
```lua
-- Emits the FieldOfView of the CurrentCamera of a given viewportFrame.
local stream: Stream<number?> = pipe1(
propertyOf(viewPortFrame, "CurrentCamera"),
-- the incoming stream to this next transformer is of type Stream<Instance?>
toProperty("FieldOfView"),
)
```
]]
local function toProperty(property: string): (Stream<Instance?>) -> Stream<any?>
return switchMap(function(instance: Instance?)
if instance then
return propertyOf(instance, property)
else
return nilOnce
end
end)
end
--[[
Turns an RBXScriptSignal into a stream using :Connect(), emitting whatever it fires with
]]
local function fromSignal<T...>(signal: RBXScriptSignal<T...>): Stream<T...>
return function(listener: (T...) -> ()): CleanupTask
return signal:Connect(listener)
end
end
--[[
Turns an RBXScriptSignal into a stream using :Once(), emitting whatever it fires with
]]
local function fromSignalOnce<T...>(signal: RBXScriptSignal<T...>): Stream<T...>
return function(listener: (T...) -> ()): CleanupTask
return signal:Once(listener)
end
end
--[[
Turns a ValueBase object (IntValue, ObjectValue, etc) into a stream of it's current and
future values.
]]
local fromValueBase :
((IntValue) -> Stream<number>) &
((RayValue) -> Stream<Ray>) &
((BoolValue) -> Stream<boolean>) &
((CFrameValue) -> Stream<CFrame>) &
((Color3Value) -> Stream<Color3>) &
((NumberValue) -> Stream<number>) &
((ObjectValue) -> Stream<Instance?>) &
((StringValue) -> Stream<string>) &
((Vector3Value) -> Stream<Vector3>) &
((BrickColorValue) -> Stream<BrickColor>)
= function(valueBase): Stream<any>
return function(listener: (any) -> ())
listener(valueBase.Value)
return (valueBase.Changed :: any):Connect(function(newValue)
listener(newValue)
end)
end
end
--[[
A LifeStream<T> emits pairs consisting of a value of type T and a boolean,
indicating whether the value is "alive" (true) or "dead" (false).
Most useful when fed into `listenTidyEach`, which binds entities/behaviour
to the lifetime of a value.
]]
export type LifeStream<T> = Stream<T,boolean>
local function listenTidyEach<T>(lifeStream: LifeStream<T>, tidyListener: (T) -> CleanupTask): CleanupTask
local valueCleanups: {[T]: CleanupTask} = {}
local cleanupStream = lifeStream(function(value: T, alive: boolean)
if valueCleanups[value] ~= nil then
clean(valueCleanups[value])
valueCleanups[value] = nil
end
if alive then
valueCleanups[value] = tidyListener(value)
end
end)
return function()
-- We worry that one of these cleanups will trigger the stream to fire again, potentionally adding more
-- so we just keep cleaning until they're all gone.
local value, cleanup = next(valueCleanups)
while cleanup ~= nil do
clean(cleanup)
valueCleanups[value :: any] = nil
value, cleanup = next(valueCleanups)
end
clean(cleanupStream)
cleanupStream = nil
end
end
--[[
A LifeStream<Player> of the players in the game.
- Emits `(player: Player, inGame: boolean)` pairs
- Use `listenTidyEach(eachPlayer, tidyListener)` to bind something to the lifetime of each player.
]]
local eachPlayer: LifeStream<Player>
do
eachPlayer = function(listener: (Player, boolean) -> ())
local Players = game:GetService("Players")
for _, player in Players:GetPlayers() do
listener(player, true)
end
local addedConnection = Players.PlayerAdded:Connect(function(player: Player)
listener(player, true)
end)
local removingConnection = Players.PlayerRemoving:Connect(function(player: Player)
listener(player, false)
end)
return {addedConnection, removingConnection}
end
end
--[[
A `LifeStream<Instance>` of the children of an instance.
- Emits `(child: Instance, isParented: boolean)` pairs
- Use `listenTidyEach(eachChildOf(instance), tidyListener)` to bind something to the lifetime of each child,
which ends when it is deparented.
]]
local function eachChildOf(instance: Instance): LifeStream<Instance>
return function(listener: (Instance, boolean) -> ())
for _, child in instance:GetChildren() do
listener(child, true)
end
local addedConnection = instance.ChildAdded:Connect(function(child: Instance)
listener(child, true)
end)
local removedConnection = instance.ChildRemoved:Connect(function(child: Instance)
listener(child, false)
end)
return {addedConnection, removedConnection}
end
end
--[[
`toEachChild(stream)` turns a `Stream<Instance?>` into a `LifeStream<Instance>`
of the children of the latest emitted instance (see `eachChildOf`).
- The output stream does not emit anything while the latest emitted instance from `stream` is nil.
]]
local toEachChild: (Stream<Instance?>) -> LifeStream<Instance> =
switchMap(function(instance: Instance?)
return if instance then eachChildOf(instance) else never
end)
--[[
A `LifeStream<Instance>` of the children of an instance with `.Name == name`.
- Emits `(child: Instance, isNamedAndParented: boolean)` pairs
- Use `listenTidyEach(eachChildNamedOf(instance, name), tidyListener)` to bind something to the lifetime of each child,
which ends when it is deparented or renamed to something else.
]]
local function eachChildNamedOf(instance: Instance, name: string): LifeStream<Instance>
return function(listener: (Instance, boolean) -> ())
return listenTidyEach(eachChildOf(instance), function(child: Instance)
return propertyOf(instance, "Name")(function(childName: string)
listener(child, childName == name)
end)
end)
end
end
--[[
`toEachChildNamed(name)(stream)` turns a `Stream<Instance?>` into a `LifeStream<Instance>`
of the children of the latest emitted instance with `.Name == name` (see `eachChildNamedOf`)
- The output stream does not emit anything while the latest emitted instance from `stream` is nil.
]]
local function toEachChildNamed(name: string): (Stream<Instance?>) -> LifeStream<Instance>
return switchMap(function(instance: Instance?)
return if instance then eachChildNamedOf(instance, name) else never
end)
end
--[[
`firstChildOf(instance, name)` emits `instance:FindFirstChild(name)` immediately and whenever it changes.
- Note that adding/removing any child of `instance` can effect what `instance:FindFirstChild(name)` is, but
we only emit when the result of `instance:FindFirstChild(name)` changes (using `skipUnchanged`).
- Optionally provide a string `classNameIsA` to error if the `child:IsA(classNameIsA) == false`.
]]
local function firstChildOf(instance: Instance, name: string, classNameIsA: string?): Stream<Instance?>
local stream: Stream<Instance?> = function(listener)
-- We listen to name changes of every child (and current name), since these can effect what instance:FindFirstChild(name) is.
return listenTidyEach(eachChildOf(instance), function(child: Instance)
-- Here we are listening to the property name of each child.
-- This gets cleaned up by `listenTidyEach` when the child is deparented.
return propertyOf(child, "Name")(function(_: string)
local firstChild = instance:FindFirstChild(name)
if firstChild then
if classNameIsA and not firstChild:IsA(classNameIsA) then
error(`[Stream.firstChildOf] Expected {instance}:FindFirstChild({name}):IsA({classNameIsA}) == true, but it is a {firstChild.ClassName}`)
end
end
listener(firstChild)
end)
end)
end
-- `stream` can emit the same instance many times while other children are added/removed/renamed.
-- We only want to emit when it changes (but also the initial value)
return skipUnchanged(stream)
end
--[[
`toFirstChild(name)(stream)` turns a `Stream<Instance?>` into a `Stream<Instance?>`
of the :FindFirstChild(name) of the latest emitted instance (see `firstChildOf`).
- Optionally provide a string `classNameIsA` to error if the `child:IsA(classNameIsA) == false`.
]]
local function toFirstChild(name: string, classNameIsA: string?): (Stream<Instance?>) -> Stream<Instance?>
return switchMap(function(instance: Instance?)
if instance then
return firstChildOf(instance, name, classNameIsA)
else
return nilOnce
end
end)
end
--[[
A `LifeStream<Instance>` of each instance with the given tag.
- Optionally provide an `ancestor` to restrict to instances that are descendants of the `ancestor`.
- Emits (instance: Instance, isTaggedAndDescendent: boolean) pairs.
- Use `listenTidyEach(eachTagged(tag), tidyListener)`, to bind something to the lifetime of each tagged instance,
which ends when the instance is untagged, or when it is no longer a descendant of the ancestor.
]]
local function eachTagged(tag: string, ancestor: Instance?): LifeStream<Instance>
local CollectionService = game:GetService("CollectionService")
local eachTaggedAnywhere = function(listener: (Instance, boolean) -> ())
for _, instance in CollectionService:GetTagged(tag) do
listener(instance, true)
end
local addedConnection = CollectionService:GetInstanceAddedSignal(tag):Connect(function(instance)
listener(instance, true)
end)
local removedConnection = CollectionService:GetInstanceRemovedSignal(tag):Connect(function(instance)
listener(instance, false)
end)
return {addedConnection, removedConnection}
end
if ancestor == nil then
return eachTaggedAnywhere
end
return function(listener: (Instance, boolean) -> ()): CleanupTask
return listenTidyEach(eachTaggedAnywhere, function(instance: Instance) : CleanupTask
local currentIsDescendant = instance:IsDescendantOf(ancestor)
if currentIsDescendant then
listener(instance, true)
end
return instance.AncestryChanged:Connect(function(_: Instance, _: Instance?)
if currentIsDescendant ~= instance:IsDescendantOf(ancestor) then
currentIsDescendant = instance:IsDescendantOf(ancestor)
listener(instance, currentIsDescendant)
end
end)
end)
end
end
--[[
Takes ownership of the emitted values by cleaning them up when a new value is emitted.
]]
local function tidyStream(stream: Stream<CleanupTask>): Stream<CleanupTask>
return function(listener: (CleanupTask) -> ()): CleanupTask
return listenTidy(stream, function(cleanupTask: CleanupTask)
listener(cleanupTask)
return cleanupTask
end)
end
end
-- Helper function for `mount`.
local function toHandler(value: any): ((...any) -> (...any))?
if typeof(value) == "function" then
return value
elseif typeof(value) == "Instance" and value:IsA("ValueBase") then
return function(newValue: any)
(value :: any).Value = newValue
end
elseif typeof(value) == "table" and typeof(value.SetValue) == "function" then
return function(newValue: any)
value:SetValue(newValue)
end
else
return nil
end
end
--[[
Flattens a possibly nested Stream (or other Streamable) of Instances.
For example Stream<Stream<Stream<Instance?>>> will become Stream<Instance?>.
This is useful internally in `mount`, for example, when a `compute` stream returns a `new "Frame" {...}`.
]]
local function fromNestedInstance(value: Instance? | Streamable<Instance?>): Stream<Instance?>
if value == nil then
return of(value :: Instance?)
elseif typeof(value) == "Instance" and value:IsA("ObjectValue") then
return propertyOf(value, "Value")
elseif typeof(value) == "Instance" then
return of(value)
elseif typeof(value) == "function" then
return switchMap(fromNestedInstance)(value)
elseif typeof(value) == "table" and typeof((value :: any).Stream) == "function" then
return fromNestedInstance((value :: any):Stream())
elseif typeof(value) == "table" and typeof((value :: any).Observe) == "function" then
return fromNestedInstance(function(listener)
return (value :: any):Observe():Subscribe(listener)
end)
else
error(`[Stream.fromNestedInstance] Unrecognized nested instance {value} of type {typeof(value)}.`)
end
end
export type Streamable<T> =
Stream<T> | ValueBase | RBXScriptSignal<T> |
{ Stream: (self: any) -> Stream<T> } |
{ Observe: (self: any) -> Stream<T> } |
T
--[[
Convert a Streamable<T> into a Stream<T> (not recursive).
]]
local function toStream<T>(state: Streamable<T>): Stream<T>
if typeof(state) == "function" then
return state
elseif typeof(state) == "Instance" and state:IsA("ValueBase") then
return fromValueBase(state :: any) :: any
elseif typeof(state) == "RBXScriptSignal" then
return fromSignal(state)
elseif typeof(state) == "table" and typeof((state :: any).Stream) == "function" then
return (state :: any):Stream()
elseif typeof(state) == "table" and typeof((state :: any).Observe) == "function" then
return function(listener)
return (state :: any):Observe():Subscribe(listener)
end
else
return of((state :: any) :: T)
end
end
--[[
Very general purpose way of attaching behaviour and children to an instance.
Key-value pairs in mountProps can have many forms, but generally the
key determines what kind of behaviour, and the behaviour itself is derived
from the value.
In some cases, the value is some property or instance, possibly wrapped inside some
stateful object or stream, and this can be arbitrarily nested (see `fromNestedInstance`).
We will refer to this as a Nested<T>, though we don't have a formal type for it.
- `[property: string] = value : Nested<T>`
- Updates the property of `instance` to the latest nested value.
- `value : Nested<Instance>` (there is no key, i.e., the key is an integer)
- Parent the emitted nested Instances to `instance`
In the following cases, the key represents something that should be handled by the value.
The value should be a function, or a stateful object to assign new values
(see toHandler). We will refer to such values with CanHandle<T...> (not a formal type).
- `[eventName: string] = value : CanHandle<T...>`
where `instance[eventName]` is an R`BXScriptSignal<T...>`
- Connects `toHandler(value)` to the signal.
- `[signal: RBXScriptSignal<T...>] = value : CanHandle<T...>`
- Connects `toHandler(value)` to the signal.
- `["instance"] = value` : CanHandle<Instance>
- Calls the `toHandler(value)` with the instance, and adds the returned cleanup task to the cleanup array.
- If the `toHandler(value)` returns a cleanup task, it will be added to the cleanup array.
Attach any extra CleanupTasks using the "Cleanup" key.
- `["cleanup"] = cleanupTask : CleanupTask`
- Adds the cleanup task to the cleanup array.
Finally the key can be a function of type `(Instance) -> any`, which will be called with the `instance`
to compute the intended key. This is useful when the instance is created via `new`.
- `[key: (Instance) -> any] = value : any`
- If key(instance) matches any of the above cases, it will be handled as such.
- If typeof(key(instance)) == "function", it's assumed to be a stream
and is listened to by `toHandler(value)`.
Some examples:
- `[propertyOut("AbsoluteSize")] = state : Stateful<Vector2>
- Passes current and future values of instance.AbsoluteSize to state:SetValue(_)
]]
local function mount(instance: Instance, mountProps: {[any]: any}): {CleanupTask}
assert(typeof(instance) == "Instance", "[mount] Bad instance")
local cleanup = {}
local inst: any = instance -- Typing hack: can't set dynamically named properties on `instance`
local parent = nil
for key, value in mountProps do
if key == "Parent" then
-- Delay parenting
parent = value
continue
end
local keyWasFunc = typeof(key) == "function"
if keyWasFunc then
-- User has given a function in the key to compute the (real) key from the instance
-- Useful for when instance is created via `new`.
-- e.g. propertyOut("AbsoluteSize") : (Instance) -> Stream<Vector2>
key = key(inst)
end
if typeof(key) == "string" then
-- Case: String key refers to a property or signal of `instance`
-- If the line below errors, then the key is not a valid property or signal of `instance`
local keyVal = inst[key]
if key == "instance" then
-- Case: we are providing the instance to the users handler, possibly receiving a CleanupTask
local handler = toHandler(value)
if handler == nil then
error(`[mount] Expected handler for {instance}, got a {typeof(value)} ({value})`)
else
table.insert(cleanup, (handler :: (...any) -> CleanupTask)(inst))
end
elseif key == "cleanup" then
-- Case: User is providing some cleanup
table.insert(cleanup, value)
elseif typeof(keyVal) == "RBXScriptSignal" then
-- Case: we are connecting a handler to an event of the instance
local handler = toHandler(value)
if handler == nil then
error(`[mount] Expected handler for {instance}.{key}, got a {typeof(value)} ({value})`)
end
table.insert(cleanup, keyVal:Connect(toHandler(value)))
else
-- Case: we are mounting `value` to `instance[key]`.
-- `value` should encapsulate some (possibly) stateful property,
-- either a stream, ValueBase, or just a plain value
if typeof(value) == "function" then
-- assume it's a stream
local stream: Stream<any> = toStream(value)
table.insert(cleanup, stream(function(x: any)
inst[key] = x
end))
elseif typeof(value) == "Instance" and value:IsA("ValueBase") then
inst[key] = (value :: any).Value
table.insert(cleanup, (value :: any).Changed:Connect(function(newValue)
inst[key] = newValue
end))
else
inst[key] = value
end
end
elseif typeof(key) == "number" then
-- Case: Numeric key means value is/emits a child instance,
-- or is a table of more props
if typeof(value) == "function" then
-- assume it's a (possibly nested) stream of instances (or instances wrapped in state)
local stream: Stream<Instance?> = fromNestedInstance(value)
table.insert(cleanup, stream(function(child: Instance?)
if child then
child.Parent = inst
end
end))
elseif typeof(value) == "Instance" then
value.Parent = inst
-- An important part of the mount contract.
-- It takes ownership of child instances provided directly!
table.insert(cleanup, inst)
elseif typeof(value) == "table" then
-- Assume it's more props just dumped into the table
table.insert(cleanup, mount(inst, value))
else
error(`Bad child {value} of type {value}`)
end
elseif typeof(key) == "function" then
-- Case: It's a stream
local handler = toHandler(value)
if handler == nil then
error(`[mount] Expected handler for stream returned by key-function, got a {typeof(value)} ({value})`)
end
table.insert(cleanup, key(handler))
elseif typeof(key) == "RBXScriptSignal"
or typeof(key) == "table" and typeof((key).Connect) == "function" then
-- Case: key is a signal, value is a handler
local handler = toHandler(value)
if handler == nil then
error(`[mount] Expected handler for signal {key}, got a {typeof(value)} ({value})`)
end
table.insert(cleanup, (key :: any):Connect(handler))
elseif keyWasFunc then
-- Case: The user returned a key from the function that doesn't match any of the previous
-- cases, so we assume they want to handle the result themselves.
local handler = toHandler(value)
if handler == nil then
error(`[mount] mountProps table entry has the form [(Instance) -> {typeof(key)}] = {value} (type: {typeof(value)}), but value could not be converted to a handler to give a {typeof(key)}.`)
else
table.insert(cleanup, handler(key))
end
end
end
if parent then
-- Now do delayed parenting
if typeof(parent) == "Instance" or parent == nil then
instance.Parent = parent
else
local stream: Stream<Instance?> = fromNestedInstance(parent)
table.insert(cleanup, stream(function(newParent)
if typeof(parent) == "Instance" or newParent == nil then
instance.Parent = newParent
else
error(`[mount] Expected parent to be an Instance or nil, got {newParent} of type {typeof(newParent)}`)
end
end))
end
end
return cleanup
end
--[[
Creates an instance wrapped in a stream (which emits the instance immediately).
- It "owns" the instance, so when the stream is cleaned up, the instance is destroyed
after cleaning up the mounted props.
- Read the documentation for `mount` for more information on `mountProps`.
Example:
```lua
local cleanup = mount(Players.LocalPlayer.PlayerGui, {
new "ScreenGui" {
IgnoreGuiInset = true,
new "Frame" {
Size = UDim2.fromScale(0.5,0.5),
BackgroundColor3 = compute1(RunService.RenderStepped, function(_deltaTime)
local t = os.clock()
return Color3.fromHSV(t % 1, 1, 1)
end))
}
}
})
```
]]
local function new(className: string): (props: {[any]: any}) -> Stream<Instance>
return function(props: {[string]: any}): Stream<Instance>
return function(listener: (Instance) -> ()): CleanupTask
local instance = Instance.new(className)
local cleanupMount = mount(instance, props)
listener(instance)
-- Here we are trusting the `clean` function to cleanup in reverse order.
return {instance, cleanupMount} :: {CleanupTask}
end
end
end
--[[
Creates an instance, mounts the props, and returns the instance and a cleanup task,
to cleanup the props and then the instance
]]
local function newInstance(className: string): (props: {[any]: any}) -> (Instance, CleanupTask)
return function(props: {[string]: any}): (Instance, CleanupTask)
local instance = Instance.new(className)
local cleanup = mount(instance, props)
return instance, ({instance, cleanup} :: {CleanupTask})
end
end
--[[
`propertyOut(name)` is a function sending an instance to the propertyOf(instance,name) stream.
- Can be used as a key in a mountProps table.
Example:
```lua
new "Frame" {
[propertyOut "BackgroundColor3"] = function(color: Color3)
print(`The background color is now {color}`)
end),
}
```
]]
local function propertyOut(name: string): (Instance) -> Stream<any>
return function(instance: Instance): Stream<any>
return propertyOf(instance, name)
end
end
--[[
Usage: `compute(a,b,c,d,...,fn)`, where `a,b,c,d,...` are Streamable<A>, Streamable<B>, etc.
and `fn` is a function of type `(A,B,C,D,...) -> Z`.
1. Converts the first n-1 args to streams with `toStream`.
2. Combines their latest values with `combineLatest` and passes them to `fn`.
]]
local function compute(...): Stream<any>
local args = {...}
local n = select("#", ...)
local fn = args[n]
assert(typeof(fn) == "function", "[compute] Last argument must be a function")
local streams = {}
for i=1, n-1 do
table.insert(streams, toStream(args[i]))
end
if n == 1 then
-- Calls `fn` with no arguments when the stream is listened to.
return map(fn)(of())
elseif n == 2 then
return map(fn)(streams[1])
else
return map(function(values)
return fn(table.unpack(values))
end)(combineLatest(streams, true))
end
end
--[[
Usage `compute1(a,fn)`.
- Converts `a` to a `Stream<A>` with `toStream` and then maps `fn` over the combined stream
of their latest values.
- Useful in `mount` and `new`
Example:
```lua
-- Here Name can be any `Streamable<string>`, such as a `StringValue` or a `Stream<string>`.
return new "TextLabel" {
Text = compute1(Name, function(name: string)
return `Hello {name}!`
)
}
```
]]
local function compute1<A,Z>(a : Streamable<A>, fn : (A) -> Z): Stream<Z>
return map(fn)(toStream(a))
end
--[[
Usage `compute2(a,b,fn)`.
- Converts `a`, `b` into `Stream<A>`, `Stream<B>` with `toStream` and then maps `fn` over the combined stream
of their latest values.
- Useful in `mount` and `new`
Example:
```lua
local IsHovering, IsPressed = Value(false), Value(false) -- Something with :Stream() methods
return new "TextButton" {
Size = compute2(IsHovering, IsPressed, function(isHovering, isPressed)
if isHovering and not isPressed then
return UDim2.new(1,2,1,2)
else
return UDim2.new(1,0,1,0)
end
end),
Text = compute1(IsPressed, function(isPressed)
return isPressed and "Pressed" or "Not Pressed"
end),
MouseEnter = function() IsHovering.Value = true end,
MouseLeave = function() IsHovering.Value = false end,
MouseButton1Down = function() IsPressed.Value = true end,
MouseButton1Up = function() IsPressed.Value = false end,
}
```
]]
local function compute2<A,B,Z>(a : Streamable<A>, b : Streamable<B>, fn : (A,B) -> Z): Stream<Z>
return combine2(toStream(a), toStream(b), fn)
end
--[[
Usage `compute3(a,b,c,fn)`.
- Converts `a`, `b`, `c` into `Stream<A>`, `Stream<B>`, `Stream<C>`
with `toStream` and then maps `fn` over the combined stream of their latest values.
- Useful in `mount` and `new`
- See `compute2` for an example with two streamables.
]]
local function compute3<A,B,C,Z>(a : Streamable<A>, b : Streamable<B>, c : Streamable<C>, fn : (A,B,C) -> Z): Stream<Z>
return combine3(toStream(a), toStream(b), toStream(c), fn)
end
--[[
Usage `compute4(a,b,c,d,fn)`.
- Converts `a`, `b`, `c`, `d` into `Stream<A>`, `Stream<B>`, `Stream<C>`, `Stream<D>`
with `toStream` and then maps `fn` over the combined stream of their latest values.
- Useful in `mount` and `new`
- See `compute2` for an example with two streamables.
]]
local function compute4<A,B,C,D,Z>(a : Streamable<A>, b : Streamable<B>, c : Streamable<C>, d : Streamable<D>, fn : (A,B,C,D) -> Z): Stream<Z>
return combine4(toStream(a), toStream(b), toStream(c), toStream(d), fn)
end
local function _callMeMaybe(myNumber: number): Stream<number>
return function(listener:(number) -> ())
if math.random() > 0.5 then
listener(myNumber)
end
return nil
end
end
return {
-- Helper stuff
clean = clean,
NO_OP = NO_OP,
never = never,
-- Listening
listen = listen,
listenTidy = listenTidy,
-- Stream constructors/transformers
from = from,
of = of,
map = map,
mapTo = mapTo,
tap = tap,
filter = filter,
delayed = delayed,
merge = merge,
pipe = pipe,
pipe0 = pipe0,
pipe1 = pipe1,
pipe2 = pipe2,
pipe3 = pipe3,
pipe4 = pipe4,
firstOfAny = firstOfAny,
skipUnchanged = skipUnchanged,
combineLatest = combineLatest,
combine1 = combine1,
combine2 = combine2,
combine3 = combine3,
combine4 = combine4,
switchAll = switchAll,
switchMap = switchMap,
-- Roblox-instance-related stream constructors/transformers
nilOnce = nilOnce,
replaceNil = replaceNil,
blockNil = blockNil,
attributeOf = attributeOf,
toAttribute = toAttribute,
hasProperty = hasProperty,
propertyOf = propertyOf,
toProperty = toProperty,
fromSignal = fromSignal,
fromSignalOnce = fromSignalOnce,
fromValueBase = fromValueBase,
-- LifeStream listeners/constructors/transformers
listenTidyEach = listenTidyEach,
eachPlayer = eachPlayer,
eachChildOf = eachChildOf,
toEachChild = toEachChild,
eachChildNamedOf = eachChildNamedOf,
toEachChildNamed = toEachChildNamed,
firstChildOf = firstChildOf,
toFirstChild = toFirstChild,
eachTagged = eachTagged,
tidyStream = tidyStream,
-- mounting helpers
toHandler = toHandler,
fromNestedInstance = fromNestedInstance,
toStream = toStream,
-- mount & new
mount = mount,
new = new,
newInstance = newInstance,
-- User-level helpers for mount/new
propertyOut = propertyOut,
compute = compute,
compute1 = compute1,
compute2 = compute2,
compute3 = compute3,
compute4 = compute4,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment