Skip to content

Instantly share code, notes, and snippets.

@Anaminus
Last active January 17, 2024 18:42
Show Gist options
  • Save Anaminus/1f31af4e5280b9333f3f58e13840c670 to your computer and use it in GitHub Desktop.
Save Anaminus/1f31af4e5280b9333f3f58e13840c670 to your computer and use it in GitHub Desktop.
Standalone ReactiveX module ripped from Nevermore.Rx
--@sec: Maid
--@doc: The Maid module provides methods to manage tasks. A **task** is any
-- value that encapsulates the finalization of some procedure or state.
-- "Cleaning" a task invokes it, causing the procedure or state to be finalized.
--
-- All tasks have the following contract when cleaning:
--
-- - A task must not produce an error.
-- - A task must not yield.
-- - A task must finalize only once; cleaning an already cleaned task should be
-- a no-op.
-- - A task must not cause the production of more tasks.
--
-- ### Maid class
--
-- The **Maid** class is used to manage tasks more conveniently. Tasks can be
-- assigned to a maid to be cleaned later.
--
-- A task can be assigned to a maid as "named" or "unnamed". With a named task:
--
-- - The name can be any non-nil value.
-- - A named task can be individually cleaned.
-- - A named task can be individually unassigned from the maid without cleaning
-- it.
--
-- Unnamed tasks can only be cleaned by destroying the maid.
--
-- Any value can be assigned to a maid. Even if a task is not a known task type,
-- the maid will still hold on to the value. This might be used to hold off
-- garbage collection of a value that otherwise has only weak references.
--
-- A maid is not reusable; after a maid has been destroyed, any tasks assigned
-- to the maid are cleaned immediately.
local export = {}
local DEBUG = false
export type Task = any
local task_cancel = task.cancel
-- Cleans *task*. *refs* is used to keep track of tables that have already been
-- traversed.
local function clean(task: Task, refs: {[any]:true}?)
if not task then
return
end
if type(task) == "function" then
task()
elseif type(task) == "thread" then
task_cancel(task)
elseif typeof(task) == "RBXScriptConnection" then
task:Disconnect()
elseif typeof(task) == "Instance" then
task:Destroy()
elseif type(task) == "table" then
if getmetatable(task) == nil then
if refs then
if refs[task] then
return
end
refs[task] = true
else
refs = {[task]=true}
end
for k, v in task do
clean(v, refs)
end
if not table.isfrozen(task) then
table.clear(task)
end
elseif type(task.Destroy) == "function" then
task:Destroy()
end
end
end
--@sec: Maid.clean
--@ord: -2
--@def: function Maid.clean(...: Task)
--@doc: The **clean** function cleans each argument. Does nothing for arguments
-- that are not known task types. The following types are handled:
--
-- - `function`: The function is called.
-- - `thread`: The thread is canceled with `task.cancel`.
-- - `RBXScriptConnection`: The Disconnect method is called.
-- - `Instance`: The Destroy method is called.
-- - `table` without metatable: The value of each entry is cleaned. This applies
-- recursively, and such tables are cleaned only once. The table is cleared
-- unless it is frozen.
-- - `table` with metatable and Destroy function: Destroy is called as a method.
function export.clean(...: Task)
for i = 1, select("#", ...) do
local task = select(i, ...)
clean(task)
end
end
--@sec: Maid.wrap
--@ord: -2
--@def: function Maid.wrap(...: Task): () -> ()
--@doc: The **wrap** function encapsulates the given tasks in a function that
-- cleans them when called.
--
-- **Example:**
-- ```lua
-- local conn = RunService.Heartbeat:Connect(function(dt)
-- print("delta time", dt)
-- end)
-- return Maid.wrap(conn)
-- ```
function export.wrap(...: Task): () -> ()
local tasks
if select("#", ...) == 1 then
tasks = ...
else
tasks = {...}
end
return function()
local t = tasks
if t then
tasks = nil
clean(t)
end
end
end
-- If debugging, wrap returns a callable table instead of a function, allowing
-- the contents to be inspected.
if DEBUG then
function export.wrap(...: Task): () -> ()
local self
if select("#", ...) == 1 then
self = {tasks = ...}
else
self = {tasks = {...}}
end
setmetatable(self, {
__call = function(self)
local tasks = self.tasks
if tasks then
self.tasks = nil
clean(tasks)
end
end,
})
return self
end
end
local Maid = {__index=setmetatable({}, {
__index = function(self, k)
error(string.format("cannot index maid with %q", tostring(k)), 2)
end,
})}
export type Maid = typeof(setmetatable({}, Maid))
--@sec: Maid.new
--@ord: -1
--@def: function Maid.new(): Maid
--@doc: The **new** constructor returns a new instance of the Maid class.
--
-- **Example:**
-- ```lua
-- local maid = Maid.new()
-- ```
function export.new(): Maid
local self = {
-- Map of names to tasks. Can be nil, indicating that the maid is
-- destroyed.
_namedTasks = {},
-- List of unnamed tasks.
_unnamedTasks = {},
}
return setmetatable(self, Maid)
end
--@sec: Maid.Alive
--@def: function Maid:Alive(): boolean
--@doc: The **Alive** method returns false when the maid is destroyed, and true
-- otherwise.
--
-- **Example:**
-- ```lua
-- if maid:Alive() then
-- maid.heartbeat = RunService.Heartbeat:Connect(function(dt)
-- print("delta time", dt)
-- end)
-- end
-- ```
function Maid.__index:Alive(): boolean
return not not self._namedTasks
end
--@sec: Maid.Assign
--@def: function Maid:Assign(name: any, task: Task?)
--@doc: The **Assign** method performs an action depending on the type of
-- *task*. If *task* is nil, then the task assigned as *name* is cleaned, if
-- present. Otherwise, *task* is assigned to the maid as *name*. If a different
-- task (according to rawequal) is already assigned as *name*, then it is
-- cleaned.
--
-- If the maid is destroyed, *task* is cleaned immediately.
--
-- **Examples:**
-- ```lua
-- maid:Assign("part", Instance.new("Part"))
-- ```
--
-- Setting an assigned task to nil unassigns it from the maid and cleans it.
--
-- ```lua
-- maid:Assign("part", nil) -- Remove task and clean it.
-- ```
--
-- Assigning a task with a name that is already assigned cleans the previous
-- task first.
--
-- ```lua
-- maid:Assign("part", Instance.new("Part"))
-- maid:Assign("part", Instance.new("WedgePart"))
-- ```
function Maid.__index:Assign(name: any, task: Task?)
if not self._namedTasks then
clean(task)
return
end
if task then
local prev = self._namedTasks[name]
if not rawequal(prev, task) then
self._namedTasks[name] = task
if prev then
clean(prev)
end
end
else
local prev = self._namedTasks[name]
self._namedTasks[name] = nil
clean(prev)
end
end
--@sec: Maid.AssignEach
--@def: function Maid:AssignEach(...: Task)
--@doc: The **AssignEach** method assigns each given argument as an unnamed
-- task.
--
-- If the maid is destroyed, the each task is cleaned immediately.
function Maid.__index:AssignEach(...: Task)
if not self._namedTasks then
for i = 1, select("#", ...) do
local task = select(i, ...)
clean(task)
end
return
end
for i = 1, select("#", ...) do
local task = select(i, ...)
if task then
table.insert(self._unnamedTasks, task)
end
end
end
--@sec: Maid.Clean
--@def: function Maid:Clean(...: any)
--@doc: The **Clean** method receives a number of names, and cleans the task
-- assigned to the maid for each name. Does nothing if the maid is destroyed,
-- and does nothing for names that have no assigned task.
function Maid.__index:Clean(...: any)
if not self._namedTasks then
return
end
for i = 1, select("#", ...) do
local name = select(i, ...)
local task = self._namedTasks[name]
self._namedTasks[name] = nil
clean(task)
end
end
--@sec: Maid.Connect
--@def: function Maid:Connect(name: any?, signal: RBXScriptSignal, listener: () -> ())
--@doc: The **Connect** method connects *listener* to *signal*, then assigns the
-- resulting connection to the maid as *name*. If *name* is nil, then the
-- connection is assigned as an unnamed task instead. Does nothing if the maid
-- is destroyed.
--
-- Connect is the preferred method when using maids to manage signals, primarily
-- to resolve problems concerning the assignment to a destroyed maid:
-- - Slightly more efficient than regular assignment, since the connection of
-- the signal is never made.
-- - Certain signals can have side-effects when connecting, so avoiding the
-- connection entirely is more correct.
--
-- **Example:**
-- ```lua
-- maid:Connect("heartbeat", RunService.Heartbeat, function(dt)
-- print("delta time", dt)
-- end)
-- ```
function Maid.__index:Connect(name: any, signal: RBXScriptSignal, listener: () -> ())
if not self._namedTasks then
return
end
local connection = signal:Connect(listener)
if name == nil then
table.insert(self._unnamedTasks, connection)
else
self:Assign(name, connection)
end
end
--@sec: Maid.Destroy
--@def: function Maid:Destroy()
--@doc: The **Destroy** method cleans all tasks currently assigned to the maid.
-- Does nothing if the maid is destroyed.
--
-- **Example:**
-- ```lua
-- maid:Destroy()
-- ```
function Maid.__index:Destroy()
local namedTasks = self._namedTasks
if not namedTasks then
return
end
local unnamedTasks = self._unnamedTasks
self._namedTasks = false
self._unnamedTasks = false
clean(namedTasks)
clean(unnamedTasks)
end
--@sec: Maid.Unassign
--@def: function Maid:Unassign(name: any): Task
--@doc: The **Unassign** method removes the task assigned to the maid as *name*,
-- returning the task. Returns nil if no task is assigned as *name*, or if the
-- maid is Destroyed.
function Maid.__index:Unassign(name: any): Task
if not self._namedTasks then
return nil
end
local task = self._namedTasks[name]
self._namedTasks[name] = nil
return task
end
--@sec: Maid.Wrap
--@def: function Maid:Wrap(): () -> ()
--@doc: The **Wrap** method encapsulates the maid by returning a function that
-- cleans the maid when called.
--
-- **Example:**
-- ```lua
-- return maid:Wrap()
-- ```
function Maid.__index:Wrap(): () -> ()
return export.wrap(self)
end
--@sec: Maid.__newindex
--@def: Maid[any] = Task?
--@doc: Assigns a task according to the [Assign][Maid.Assign] method, where the
-- index is the name, and the value is the task. If the index is a string that
-- is a single underscore, then the task is assigned according to
-- [AssignEach][Maid.AssignEach] instead.
--
-- Tasks can be assigned to the maid like a table:
--
-- ```lua
-- maid.foo = task -- Assign task as "foo".
-- ```
--
-- Setting an assigned task to nil unassigns it from the maid and cleans it:
--
-- ```lua
-- maid.foo = nil -- Remove task and clean it.
-- ```
--
-- Assigning a task with a name that is already assigned cleans the previous
-- task first:
--
-- ```lua
-- maid.foo = task -- Assign task as "foo".
-- maid.foo = otherTask -- Remove task, clean it, and assign otherTask as "foo".
-- ```
--
-- Assigning to the special `_` index assigns an unnamed task (to explicitly
-- assign as `_`, use the [Assign][Maid.Assign] method).
--
-- ```lua
-- maid._ = task -- Assign task.
-- maid._ = otherTask -- Assign otherTask.
-- ```
--
-- **Note**: Tasks assigned to the maid cannot be indexed:
--
-- ```lua
-- print(maid.foo)
-- --> ERROR: cannot index maid with "foo"
-- ```
function Maid:__newindex(name: any, task: Task?)
if name == "_" then
self:AssignEach(task)
else
self:Assign(name, task)
end
end
return table.freeze(export)
local Maid = require(script.Parent.Maid)
export type Task = Maid.Task
export type Fire = (...any) -> ()
export type Fail = (...any) -> ()
export type Complete = () -> ()
export type Subscribe = (Subscriber) -> Task
export type Transformer = (Observable) -> Observable
export type Subscription = () -> ()
-- Rx is a standalone ReactiveX module ripped from Nevermore.Rx.
--
-- https://quenty.github.io/NevermoreEngine/api/Rx
local export = {}
local UNSET = newproxy()
type stateType = "pending" | "failed" | "complete" | "cancelled"
local PENDING: "pending" = "pending"
local FAILED: "failed" = "failed"
local COMPLETE: "complete" = "complete"
local CANCELLED: "cancelled" = "cancelled"
-- Clean up task belonging to subscriber.
local function doCleanup(self: _Subscriber)
if self._cleanupTask then
local task = self._cleanupTask
self._cleanupTask = nil
Maid.clean(task)
end
end
export type Subscriber = {
Fire: (self: Subscriber, ...any) -> (),
Fail: (self: Subscriber, ...any) -> (),
Complete: (self: Subscriber) -> (),
GetFireFailComplete: (self: Subscriber) -> (Fire, Fail, Complete),
GetFailComplete: (self: Subscriber) -> (Fail, Complete),
IsPending: (self: Subscriber) -> boolean,
}
type _Subscriber = Subscriber & {
_state: stateType,
_fireCallback: Fire,
_failCallback: Fail,
_completeCallback: Complete,
_cleanupTask: Task?,
}
local Subscriber = {__index={}}
function newSubscriber(fire: Fire?, fail: Fail?, complete: Complete?): Subscriber
local subscriber = setmetatable({
_state = PENDING,
_fireCallback = fire,
_failCallback = fail,
_completeCallback = complete,
_cleanupTask = nil,
}, Subscriber)
return subscriber
end
function Subscriber.__index:Fire(...: any)
if self._state == PENDING then
if self._fireCallback then
self._fireCallback(...)
end
elseif self._state == CANCELLED then
warn("Subscriber.Fire: event pushed to cancelled subscriber")
end
end
function Subscriber.__index:Fail(...: any)
if self._state ~= PENDING then
return
end
self._state = FAILED
if self._failCallback then
self._failCallback(...)
end
doCleanup(self)
end
function Subscriber.__index:GetFireFailComplete(): (Fire, Fail, Complete)
return function(...)
self:Fire(...)
end, function(...)
self:Fail(...)
end, function()
self:Complete()
end
end
function Subscriber.__index:GetFailComplete(): (Fail, Complete)
return function(...)
self:Fail(...)
end, function()
self:Complete()
end
end
function Subscriber.__index:Complete()
if self._state ~= PENDING then
return
end
self._state = COMPLETE
if self._completeCallback then
self._completeCallback()
end
doCleanup(self)
end
function Subscriber.__index:IsPending(): boolean
return self._state == PENDING
end
export type Observable = {
Pipe: (self: Observable, transformers: {Transformer}) -> Observable,
Subscribe: (self: Observable,
onFire: Fire?,
onFail: Fail?,
onComplete: Complete?
) -> Subscription,
}
local Observable = {__index={}}
function isObservable(v: any): boolean
return getmetatable(v) == Observable
end
export.isObservable = isObservable
local function newObservable(onSubscribe: Subscribe): Observable
assert(type(onSubscribe) == "function", "onSubscribe must be function")
local self = setmetatable({
_onSubscribe = onSubscribe,
}, Observable)
return self
end
export.observable = newObservable
function Observable.__index:Pipe(transformers: {Transformer}): Observable
assert(type(transformers) == "table", "transformers must be a table")
local current = self
for _, transformer in pairs(transformers) do
assert(type(transformer) == "function", "transformer must be function")
current = transformer(current)
assert(isObservable(current))
end
return current
end
function Observable.__index:Subscribe(fire: Fire?, fail: Fail?, complete: Complete?): Subscription
local sub = newSubscriber(fire, fail, complete)
local task = self._onSubscribe(sub)
local function subscription()
if sub._state == PENDING then
sub._state = CANCELLED
end
doCleanup(sub)
end
if not task then
return subscription
end
if sub._state ~= PENDING then
subscription()
return subscription
end
sub._cleanupTask = task
return subscription
end
--[=[
An empty observable that completes immediately
@prop EMPTY Observable<()>
@readonly
@within Rx
]=]
--[=[
An observable that never completes.
@prop NEVER Observable<()>
@readonly
@within Rx
]=]
export.empty = newObservable(function(sub)
sub:Complete()
end)
export.never = newObservable(function()end)
--[=[
Pipes the tranformers through each other
https://rxjs-dev.firebaseapp.com/api/index/function/pipe
@param transformers { Observable<any> }
@return (source: Observable<T>) -> Observable<U>
]=]
function export.pipe(transformers: {Transformer}): Transformer
assert(type(transformers) == "table", "transformers must be a table")
for index, transformer in transformers do
if type(transformer) ~= "function" then
error(string.format("bad transformer %s: function expected, got %s", tostring(index), typeof(transformer)), 2)
end
end
return function(source: Observable): Observable
assert(source, "Bad source")
local current = source
for key, transformer in transformers do
current = transformer(current)
if not isObservable(current) then
error(string.format("bad result from transformer %s: Observable expected, got %q", tostring(key), tostring(current)), 2)
end
end
return current
end
end
--[=[
http://reactivex.io/documentation/operators/just.html
```lua
Rx.of(1, 2, 3):Subscribe(print, function()
print("Complete")
end)) --> 1, 2, 3, "Complete"
```
@param ... any -- Arguments to emit
@return Observable
]=]
function export.of(...: any): Observable
local args = table.pack(...)
return newObservable(function(sub)
for i = 1, args.n do
sub:Fire(args[i])
end
sub:Complete()
end)
end
--[=[
Converts an item
http://reactivex.io/documentation/operators/from.html
@param item Promise | table
@return Observable
]=]
function export.from(item: {any}): Observable
if type(item) == "table" then
return export.of(table.unpack(item))
else
-- TODO: Iterator?
error("[Rx.from] - cannot convert")
end
end
--[=[
https://rxjs-dev.firebaseapp.com/api/operators/merge
@param observables { Observable }
@return Observable
]=]
function export.merge(observables: {Observable}): Observable
assert(type(observables) == "table", "observables must be array of Obserable values")
for _, item in pairs(observables) do
assert(isObservable(item), "Observable expected")
end
return newObservable(function(sub)
local maid = {}
for _, observable in pairs(observables) do
table.insert(maid, observable:Subscribe(sub:GetFireFailComplete()))
end
return maid
end)
end
--[=[
Converts a Signal into an observable.
https://rxjs-dev.firebaseapp.com/api/index/function/fromEvent
@param event Signal<T>
@return Observable<T>
]=]
function export.fromSignal(event: RBXScriptSignal): Observable
return newObservable(function(sub)
-- This stream never completes or fails!
return event:Connect(function(...)
sub:Fire(...)
end)
end)
end
--[=[
Taps into the observable and executes the onFire/onFail/onComplete
commands.
https://rxjs-dev.firebaseapp.com/api/operators/tap
@param onFire function?
@param onFail function?
@param onComplete function?
@return (source: Observable<T>) -> Observable<T>
]=]
function export.tap(onFire: Fire?, onFail: Fail?, onComplete: Complete?): Transformer
assert(type(onFire) == "function" or onFire == nil, "onFire must be function or nil")
assert(type(onFail) == "function" or onFail == nil, "onFail must be function or nil")
assert(type(onComplete) == "function" or onComplete == nil, "onComplete must be function or nil")
return function(source: Observable): Observable
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
return source:Subscribe(
function(...)
if onFire then
onFire(...)
end
if sub:IsPending() then
sub:Fire(...)
end
end,
function(...)
if onFail then
onFail(...)
end
sub:Fail(...)
end,
function()
if onComplete then
onComplete()
end
sub:Complete()
end)
end)
end
end
--[=[
Starts the observable with the given value from the callback
http://reactivex.io/documentation/operators/start.html
@param callback function
@return (source: Observable) -> Observable
]=]
function export.start(callback: () -> any): Transformer
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
sub:Fire(callback())
return source:Subscribe(sub:GetFireFailComplete())
end)
end
end
--[=[
Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data.
When all subscribers have unsubscribed it will unsubscribe from the source Observable.
https://rxjs.dev/api/operators/share
@return (source: Observable) -> Observable
]=]
function export.share(): Transformer
return function(source)
assert(isObservable(source), "source must be Observable")
local _currentSub: Subscription? = nil
local subs = {}
local lastFail = UNSET
local lastComplete = UNSET
local function connectToSourceIfNeeded()
if not _currentSub then
lastFail = UNSET
lastComplete = UNSET
_currentSub = source:Subscribe(function(...)
for _, sub in pairs(subs) do
sub:Fire(...)
end
end, function(...)
lastFail = table.pack(...)
for _, sub in pairs(subs) do
sub:Fail(...)
end
end, function(...)
lastComplete = table.pack(...)
for _, sub in pairs(subs) do
sub:Complete(...)
end
end)
end
end
local function disconnectFromSource()
if _currentSub then
_currentSub()
_currentSub = nil
end
lastFail = UNSET
lastComplete = UNSET
end
return newObservable(function(sub)
if lastFail ~= UNSET then
sub:Fail(table.unpack(lastFail, 1, lastFail.n))
return
end
if lastComplete ~= UNSET then
sub:Fail(table.unpack(lastComplete, 1, lastComplete.n))
return
end
table.insert(subs, sub)
connectToSourceIfNeeded()
return function()
local index = table.find(subs, sub)
if index then
table.remove(subs, index)
if #subs == 0 then
disconnectFromSource()
end
end
end
end)
end
end
--[=[
Same as [Rx.share] except it also replays the value
@param bufferSize number -- Number of entries to cache
@param windowTimeSeconds number -- Time
@return (source: Observable) -> Observable
]=]
function export.shareReplay(bufferSize: number?, windowTimeSeconds: number?): Transformer
assert(type(bufferSize) == "number" or bufferSize == nil, "bufferSize must be number or nil")
assert(type(windowTimeSeconds) == "number" or windowTimeSeconds == nil, "windowTimeSeconds must be number or nil")
bufferSize = bufferSize or math.huge
windowTimeSeconds = windowTimeSeconds or math.huge
return function(source)
assert(isObservable(source), "source must be Observable")
local _currentSub: Subscription? = nil
local subs = {}
local buffer = {}
local lastFail = UNSET
local lastComplete = UNSET
local function getEventsCopy()
local now = os.clock()
local events = {}
for _, event in pairs(buffer) do
if (now - event.timestamp) <= windowTimeSeconds then
table.insert(events, event)
end
end
return events
end
local function connectToSourceIfNeeded()
if not _currentSub then
buffer = {}
lastFail = UNSET
lastComplete = UNSET
_currentSub = source:Subscribe(function(...)
-- TODO: also prune events by timestamp
if #buffer + 1 > bufferSize then
table.remove(buffer, 1) -- O(n), not great.
end
-- Queue before we start
local event = table.pack(...)
event.timestamp = os.clock()
table.insert(buffer, event)
for _, sub in pairs(subs) do
sub:Fire(...)
end
end, function(...)
lastFail = table.pack(...)
for _, sub in pairs(subs) do
sub:Fail(...)
end
end, function(...)
lastComplete = table.pack(...)
for _, sub in pairs(subs) do
sub:Complete(...)
end
end)
end
end
local function disconnectFromSource()
if _currentSub then
_currentSub()
_currentSub = nil
end
buffer = {}
lastFail = UNSET
lastComplete = UNSET
end
return newObservable(function(sub)
if lastFail ~= UNSET then
sub:Fail(table.unpack(lastFail, 1, lastFail.n))
return
end
if lastComplete ~= UNSET then
sub:Fail(table.unpack(lastComplete, 1, lastComplete.n))
return
end
table.insert(subs, sub)
-- Firing could lead to re-entrance. Lets just use the buffer as-is.
for _, item in pairs(getEventsCopy()) do
sub:Fire(table.unpack(item, 1, item.n))
end
connectToSourceIfNeeded()
return function()
local index = table.find(subs, sub)
if index then
table.remove(subs, index)
if #subs == 0 then
disconnectFromSource()
end
end
end
end)
end
end
--[=[
Caches the current value
@return (source: Observable) -> Observable
]=]
function export.cache(): Transformer
return export.shareReplay(1)
end
--[=[
Like start, but also from (list!)
@param callback () -> { T }
@return (source: Observable) -> Observable
]=]
function export.startFrom(callback: () -> {any}): Transformer
assert(type(callback) == "function", "callback must be function")
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
for _, value in pairs(callback()) do
sub:Fire(value)
end
return source:Subscribe(sub:GetFireFailComplete())
end)
end
end
--[=[
Starts with the given values
https://rxjs-dev.firebaseapp.com/api/operators/startWith
@param values { T }
@return (source: Observable) -> Observable
]=]
function export.startWith(values: {any}): Transformer
assert(type(values) == "table", "values must be array of values")
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
for _, item in pairs(values) do
sub:Fire(item)
end
return source:Subscribe(sub:GetFireFailComplete())
end)
end
end
--[=[
Defaults the observable to a value if it isn't fired immediately
```lua
Rx.NEVER:Pipe({
Rx.defaultsTo("Hello")
}):Subscribe(print) --> Hello
```
@param value any
@return (source: Observable) -> Observable
]=]
function export.defaultsTo(value: any): Transformer
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local fired = false
local subscription = source:Subscribe(
function(...)
fired = true
sub:Fire(...)
end,
sub:GetFailComplete())
if not fired then
sub:Fire(value)
end
return subscription
end)
end
end
--[=[
Defaults the observable value to nil
```lua
Rx.NEVER:Pipe({
Rx.defaultsToNil
}):Subscribe(print) --> nil
```
Great for defaulting Roblox attributes and objects
@function defaultsToNil
@param source Observable
@return Observable
@within Rx
]=]
export.defaultsToNil = export.defaultsTo(nil)
--[=[
Ends the observable with these values before cancellation
https://www.learnrxjs.io/learn-rxjs/operators/combination/endwith
@param values { T }
@return (source: Observable) -> Observable
]=]
function export.endWith(values: {any}): Transformer
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
return source:Subscribe(
function(...)
sub:Fire(...)
end,
function(...)
for _, item in pairs(values) do
sub:Fire(item)
end
sub:Fail(...)
end,
function()
for _, item in pairs(values) do
sub:Fire(item)
end
sub:Complete()
end)
end)
end
end
export type Predicate = (...any) -> boolean
--[=[
http://reactivex.io/documentation/operators/filter.html
Filters out values
```lua
Rx.of(1, 2, 3, 4, 5):Pipe({
Rx.where(function(value)
return value % 2 == 0
end)
}):Subscribe(print) --> 2, 4
```
@param predicate (value: T) -> boolean
@return (source: Observable<T>) -> Observable<T>
]=]
function export.where(predicate: Predicate): Transformer
assert(type(predicate) == "function", "predicate must be function")
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
return source:Subscribe(
function(...)
if predicate(...) then
sub:Fire(...)
end
end,
sub:GetFailComplete()
)
end)
end
end
-- Return values if they satisfy predicate. Return defaults otherwise.
function export.whereElse(predicate: Predicate, ...: any): Transformer
assert(type(predicate) == "function", "predicate must be function")
local args = table.pack(...)
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
return source:Subscribe(
function(...)
if predicate(...) then
sub:Fire(...)
else
sub:Fire(table.unpack(args, 1, args.n))
end
end,
sub:GetFailComplete()
)
end)
end
end
--[=[
Only takes distinct values from the observable stream.
http://reactivex.io/documentation/operators/distinct.html
```lua
Rx.of(1, 1, 2, 3, 3, 1):Pipe({
Rx.distinct()
}):Subscribe(print) --> 1, 2, 3, 1
```
@return (source: Observable<T>) -> Observable<T>
]=]
function export.distinct(): Transformer
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local last = UNSET
return source:Subscribe(
function(value)
-- TODO: Support tuples
if last == value then
return
end
last = value
sub:Fire(last)
end,
sub:GetFailComplete()
)
end)
end
end
--[=[
https://rxjs.dev/api/operators/mapTo
@param ... any -- The value to map each source value to.
@return (source: Observable<T>) -> Observable<T>
]=]
function export.mapTo(...: any): Transformer
local args = table.pack(...)
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
return source:Subscribe(function()
sub:Fire(table.unpack(args, 1, args.n))
end, sub:GetFailComplete())
end)
end
end
export type Project = (...any) -> (...any)
--[=[
http://reactivex.io/documentation/operators/map.html
Maps one value to another
```lua
Rx.of(1, 2, 3, 4, 5):Pipe({
Rx.map(function(x)
return x + 1
end)
}):Subscribe(print) -> 2, 3, 4, 5, 6
```
@param project (T) -> U
@return (source: Observable<T>) -> Observable<U>
]=]
function export.map(project: Project): Transformer
assert(type(project) == "function", "project must be function")
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
return source:Subscribe(function(...)
sub:Fire(project(...))
end, sub:GetFailComplete())
end)
end
end
--[=[
Merges higher order observables together.
Basically, if you have an observable that is emitting an observable,
this subscribes to each emitted observable and combines them into a
single observable.
```lua
Rx.of(Rx.of(1, 2, 3), Rx.of(4))
:Pipe({
Rx.mergeAll()
})
:Subscribe(print) -> 1, 2, 3, 4
```
@return (source: Observable<Observable<T>>) -> Observable<T>
]=]
function export.mergeAll(): Transformer
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local maid = Maid.new()
local pendingCount = 0
local topComplete = false
maid._ = source:Subscribe(
function(observable)
assert(isObservable(observable), "Observable expected")
pendingCount = pendingCount + 1
local subscription
subscription = observable:Subscribe(
function(...)
-- Merge each inner observable
sub:Fire(...)
end,
function(...)
-- Emit failure automatically
sub:Fail(...)
end,
function()
maid[subscription] = nil
pendingCount = pendingCount - 1
if pendingCount == 0 and topComplete then
sub:Complete()
maid:Destroy()
end
end)
maid[subscription] = subscription
end,
function(...)
sub:Fail(...) -- Also reflect failures up to the top!
maid:Destroy()
end,
function()
topComplete = true
if pendingCount == 0 then
sub:Complete()
maid:Destroy()
end
end)
return maid
end)
end
end
--[=[
Merges higher order observables together
https://rxjs.dev/api/operators/switchAll
Works like mergeAll, where you subscribe to an observable which is
emitting observables. However, when another observable is emitted it
disconnects from the other observable and subscribes to that one.
@return (source: Observable<Observable<T>>) -> Observable<T>
]=]
function export.switchAll(): Transformer
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local outerMaid = Maid.new()
local topComplete = false
local insideComplete = false
local currentInside = nil
outerMaid._ = function()
-- Ensure inner subscription is disconnected first. This prevents
-- the inner sub from firing while the outer is subscribed,
-- throwing a warning.
outerMaid._innerSub = nil
outerMaid._outerSuber = nil
end
outerMaid._outerSuber = source:Subscribe(
function(observable)
assert(isObservable(observable), "Observable expected")
insideComplete = false
currentInside = observable
outerMaid._innerSub = observable:Subscribe(
function(...)
sub:Fire(...)
end, -- Merge each inner observable
function(...)
if currentInside == observable then
sub:Fail(...)
end
end, -- Emit failure automatically
function()
if currentInside == observable then
insideComplete = true
if insideComplete and topComplete then
sub:Complete()
outerMaid:Destroy() -- Paranoid ensure cleanup.
end
end
end)
end,
function(...)
sub:Fail(...) -- Also reflect failures up to the top!
outerMaid:Destroy()
end,
function()
topComplete = true
if insideComplete and topComplete then
sub:Complete()
outerMaid:Destroy() -- Paranoid ensure cleanup
end
end)
return outerMaid
end)
end
end
--[=[
Sort of equivalent of promise.then()
This takes a stream of observables
@param project (value: T) -> Observable<U>
@param resultSelector ((initialValue: T, outputValue: U) -> U)?
@return (source: Observable<T>) -> Observable<U>
]=]
function export.flatMap(project: (any) -> Observable, resultSelector: ((any, any) -> any)?): Transformer
assert(type(project) == "function", "project must be function")
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local maid = Maid.new()
local pendingCount = 0
local topComplete = false
maid._ = source:Subscribe(
function(...)
local outerValue = ...
local observable = project(...)
assert(isObservable(observable), "project must return Observable")
pendingCount = pendingCount + 1
local innerMaid = Maid.new()
innerMaid._ = observable:Subscribe(
function(...)
-- Merge each inner observable
if resultSelector then
sub:Fire(resultSelector(outerValue, ...))
else
sub:Fire(...)
end
end,
function(...)
sub:Fail(...)
end, -- Emit failure automatically
function()
innerMaid:Destroy()
pendingCount = pendingCount - 1
if pendingCount == 0 and topComplete then
sub:Complete()
maid:Destroy()
end
end)
maid._ = innerMaid
end,
function(...)
sub:Fail(...) -- Also reflect failures up to the top!
maid:Destroy()
end,
function()
topComplete = true
if pendingCount == 0 then
sub:Complete()
maid:Destroy()
end
end)
return maid
end)
end
end
function export.switchMap(project: Project): Transformer
return export.pipe({
export.map(project),
export.switchAll(),
})
end
function export.takeUntil(notifier: Observable): Transformer
assert(isObservable(notifier))
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local maid = Maid.new()
local cancelled = false
local function cancel()
maid:Destroy()
cancelled = true
end
-- Any value emitted will cancel (complete without any values allows all values to pass)
maid._ = notifier:Subscribe(cancel, cancel, nil)
-- Cancelled immediately? Oh boy.
if cancelled then
maid:Destroy()
return nil
end
-- Subscribe!
maid._ = source:Subscribe(sub:GetFireFailComplete())
return maid
end)
end
end
--[=[
Returns an observable that takes in a tuple, and emits that tuple, then
completes.
```lua
Rx.packed("a", "b")
:Subscribe(function(first, second)
print(first, second) --> a, b
end)
```
@param ... any
@return Observable
]=]
function export.packed(...: {any}): Observable
local args = table.pack(...)
return newObservable(function(sub)
sub:Fire(table.unpack(args, 1, args.n))
sub:Complete()
end)
end
--[=[
Unpacks the observables value if a table is received
@param observable Observable<{T}>
@return Observable<T>
]=]
function export.unpacked(observable: Observable, i: number?, j: number?): Observable
assert(isObservable(observable), "observable must be Observable")
return newObservable(function(sub)
return observable:Subscribe(function(value)
if type(value) == "table" then
sub:Fire(table.unpack(value, i or 1, j or #value))
else
warn(string.format("Observable result: table expected, got %s", typeof(value)))
end
end, sub:GetFailComplete())
end)
end
--[=[
Acts as a finalizer callback once the subscription is unsubscribed.
```lua
Rx.of("a", "b"):Pipe({
Rx.finalize(function()
print("Subscription done!")
end)
})
```
http://reactivex.io/documentation/operators/do.html
https://rxjs-dev.firebaseapp.com/api/operators/finalize
https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/finalize.ts
@param finalizerCallback () -> ()
@return (source: Observable<T>) -> Observable<T>
]=]
function export.finalize(finalizerCallback: () -> ()): Transformer
assert(type(finalizerCallback) == "function", "finalizerCallback must be function")
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
return {
source:Subscribe(sub:GetFireFailComplete()),
finalizerCallback,
}
end)
end
end
--[=[
Given an observable that emits observables, emit an
observable that once the initial observable completes,
the latest values of each emitted observable will be
combined into an array that will be emitted.
https://rxjs.dev/api/operators/combineLatestAll
@return (source: Observable<Observable<T>>) -> Observable<{ T }>
]=]
function export.combineLatestAll(): Transformer
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local observables = {}
local alive = true
local maid = {}
table.insert(maid, function() alive = false end)
table.insert(maid, source:Subscribe(
function(value)
assert(isObservable(value))
table.insert(observables, value)
end,
function(...)
sub:Fail(...)
end,
function()
if not alive then
return
end
table.insert(maid, export.combineLatest(observables):Subscribe(sub:GetFireFailComplete()))
end
))
return maid
end)
end
end
--[=[
The same as combineLatestAll.
This is for backwards compatability, and is deprecated.
@function combineAll
@deprecated 1.0.0 -- Use Rx.combineLatestAll
@within Rx
@return (source: Observable<Observable<T>>) -> Observable<{ T }>
]=]
export.combineAll = export.combineLatestAll
--[=[
Catches an error, and allows another observable to be subscribed
in terms of handling the error.
:::warning
This method is not yet tested
:::
@param callback (error: TError) -> Observable<TErrorResult>
@return (source: Observable<T>) -> Observable<T | TErrorResult>
]=]
function export.catchError(callback: (...any) -> Observable): Transformer
assert(type(callback) == "function", "callback must be a function")
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local maid = {}
-- Yikes, let's hope event ordering is good
local alive = true
table.insert(maid, function() alive = false end)
table.insert(maid, source:Subscribe(
function(...)
sub:Fire(...)
end,
function(...)
if not alive then
-- if we failed because maid was cancelled, then we'll get called here?
-- I think.
return
end
-- at this point, we can only have one error, so we need to subscribe to the result
-- and continue the observiable
local observable = callback(...)
assert(isObservable(observable), "callback must return Observable")
table.insert(maid, observable:Subscribe(sub:GetFireFailComplete()))
end,
function()
sub:Complete()
end
))
return maid
end)
end
end
--[=[
One of the most useful functions this combines the latest values of
observables at each chance!
```lua
Rx.combineLatest({
child = Rx.fromSignal(Workspace.ChildAdded),
lastChildRemoved = Rx.fromSignal(Workspace.ChildRemoved),
value = 5,
}):Subscribe(function(data)
print(data.child) --> last child
print(data.lastChildRemoved) --> other value
print(data.value) --> 5
end)
```
:::tip
Note that the resulting observable will not emit until all input
observables are emitted.
:::
@param observables { [TKey]: Observable<TEmitted> | TEmitted }
@return Observable<{ [TKey]: TEmitted }>
]=]
function export.combineLatest(observables: {[any]: Observable}): Observable
assert(type(observables) == "table", "observables must be a dictionary")
return newObservable(function(sub)
local pending = 0
local latest = {}
for key, value in pairs(observables) do
if isObservable(value) then
pending = pending + 1
latest[key] = UNSET
else
latest[key] = value
end
end
if pending == 0 then
sub:Fire(latest)
sub:Complete()
return
end
local maid = {}
local function fireIfAllSet()
for _, value in pairs(latest) do
if value == UNSET then
return
end
end
sub:Fire(table.clone(latest))
end
for key, observer in pairs(observables) do
if isObservable(observer) then
table.insert(maid, observer:Subscribe(
function(value)
latest[key] = value
fireIfAllSet()
end,
function(...)
pending = pending - 1
sub:Fail(...)
end,
function()
pending = pending - 1
if pending == 0 then
sub:Complete()
end
end)
)
end
end
return maid
end)
end
--[=[
http://reactivex.io/documentation/operators/using.html
Each time a subscription occurs, the resource is constructed
and exists for the lifetime of the observation. The observableFactory
uses the resource for subscription.
:::note
Note from Quenty: I haven't found this that useful.
:::
@param resourceFactory () -> MaidTask
@param observableFactory (MaidTask) -> Observable<T>
@return Observable<T>
]=]
function export.using(resourceFactory: () -> Task, observableFactory: (Task) -> Observable): Observable
return newObservable(function(sub)
local maid = {}
local resource = resourceFactory()
table.insert(maid, resource)
local observable = observableFactory(resource)
assert(isObservable(observable), "factory must return Observable")
table.insert(maid, observable:Subscribe(sub:GetFireFailComplete()))
return maid
end)
end
--[=[
Takes the first entry and terminates the observable. Equivalent to the following:
```lua
Rx.take(1)
```
https://reactivex.io/documentation/operators/first.html
@return (source: Observable<T>) -> Observable<T>
]=]
function export.first(): Transformer
return export.take(1)
end
--[=[
Takes n entries and then completes the observation.
https://rxjs.dev/api/operators/take
@param number number
@return (source: Observable<T>) -> Observable<T>
]=]
function export.take(number: number): Transformer
assert(type(number) == "number", "number must be number")
assert(number > 0, "Bad number")
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local taken = 0
return source:Subscribe(function(...)
if taken >= number then
warn("Rx.take: still getting values past subscription")
return
end
taken = taken + 1
sub:Fire(...)
if taken >= number then
sub:Complete()
end
end, sub:GetFailComplete())
end)
end
end
--[=[
Takes n entries and then completes the observation.
https://rxjs.dev/api/operators/take
@param toSkip number
@return (source: Observable<T>) -> Observable<T>
]=]
function export.skip(toSkip: number): Transformer
assert(type(toSkip) == "number", "toSkip must be number")
assert(toSkip > 0, "toSkip must be greater than 0")
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local skipped = 0
return source:Subscribe(function(...)
if skipped <= toSkip then
skipped = skipped + 1
return
end
sub:Fire(...)
end, sub:GetFailComplete())
end)
end
end
--[=[
Defers the subscription and creation of the observable until the
actual subscription of the observable.
https://rxjs-dev.firebaseapp.com/api/index/function/defer
https://netbasal.com/getting-to-know-the-defer-observable-in-rxjs-a16f092d8c09
@param observableFactory () -> Observable<T>
@return Observable<T>
]=]
function export.defer(observableFactory: () -> Observable): Observable
return newObservable(function(sub)
local observable
local ok, err = pcall(function()
observable = observableFactory()
end)
if not ok then
sub:Fail(err)
return
end
if not isObservable(observable) then
sub:Fail("Observable expected")
return
end
return observable:Subscribe(sub:GetFireFailComplete())
end)
end
--[=[
Shift the emissions from an Observable forward in time by a particular amount.
@param seconds number
@return (source: Observable<T>) -> Observable<T>
]=]
function export.delay(seconds: number): Transformer
assert(type(seconds) == "number", "seconds must be number")
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local maid = Maid.new()
maid._ = source:Subscribe(function(...)
local args = table.pack(...)
maid[args] = task.delay(seconds, function()
maid[args] = nil
sub:Fire(table.unpack(args, 1, args.n))
end)
end, sub:GetFailComplete())
return maid
end)
end
end
--[=[
Emits output every `n` seconds
@param initialDelaySeconds number
@param seconds number
@return (source: Observable<number>) -> Observable<number>
]=]
function export.timer(initialDelaySeconds: number?, seconds: number): Observable
assert(type(initialDelaySeconds) == "number" or initialDelaySeconds == nil, "initialDelaySeconds must be number or nil")
assert(type(seconds) == "number", "seconds must be number")
return newObservable(function(sub)
local number = -1
local running = true
local thread = task.spawn(function()
if initialDelaySeconds and initialDelaySeconds > 0 then
task.wait(initialDelaySeconds)
end
while running do
number += 1
sub:Fire(number)
task.wait(seconds)
end
end)
return function()
running = false
task.cancel(thread)
end
end)
end
--[=[
Honestly, I have not used this one much.
https://rxjs-dev.firebaseapp.com/api/operators/withLatestFrom
https://medium.com/js-in-action/rxjs-nosy-combinelatest-vs-selfish-withlatestfrom-a957e1af42bf
@param inputObservables {Observable<TInput>}
@return (source: Observable<T>) -> Observable<{T, ...TInput}>
]=]
function export.withLatestFrom(inputObservables: {Observable}): Transformer
assert(type(inputObservables) == "table", "inputObservables must be array of Observable values")
for _, observable in pairs(inputObservables) do
assert(isObservable(observable), "observable expected")
end
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local maid = {}
local latest = {}
for key, observable in pairs(inputObservables) do
latest[key] = UNSET
table.insert(maid, observable:Subscribe(function(value)
latest[key] = value
end, nil, nil))
end
table.insert(maid, source:Subscribe(function(value)
for _, item in pairs(latest) do
if item == UNSET then
return
end
end
sub:Fire({value, unpack(latest)})
end, sub:GetFailComplete()))
return maid
end)
end
end
--[=[
https://rxjs-dev.firebaseapp.com/api/operators/scan
@param accumulator (current: TSeed, ...: TInput) -> TResult
@param seed TSeed
@return (source: Observable<TInput>) -> Observable<TResult>
]=]
function export.scan(accumulator: (any, ...any) -> any, seed: any): Transformer
assert(type(accumulator) == "function", "accumulator must be function")
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local current = seed
return source:Subscribe(function(...)
current = accumulator(current, ...)
sub:Fire(current)
end, sub:GetFailComplete())
end)
end
end
local ThrottledFunction = {}
ThrottledFunction.ClassName = "ThrottledFunction"
ThrottledFunction.__index = ThrottledFunction
function ThrottledFunction.new(timeoutInSeconds, func, config)
assert(type(timeoutInSeconds) == "number", "timeoutInSeconds must be number")
assert(type(func) == "function", "func must be function")
local self = setmetatable({}, ThrottledFunction)
self._nextCallTimeStamp = 0
self._timeout = timeoutInSeconds
self._func = func
self._trailingValue = nil
self._callLeading = true
self._callTrailing = true
self:_configureOrError(config)
return self
end
function ThrottledFunction:Call(...)
if self._trailingValue then
-- Update the next value to be dispatched
self._trailingValue = table.pack(...)
elseif self._nextCallTimeStamp <= tick() then
if self._callLeading or self._callLeadingFirstTime then
self._callLeadingFirstTime = false
-- Dispatch immediately
self._nextCallTimeStamp = tick() + self._timeout
self._func(...)
elseif self._callTrailing then
-- Schedule for trailing at exactly timeout
self._trailingValue = table.pack(...)
task.delay(self._timeout, function()
if self.Destroy then
self:_dispatch()
end
end)
else
error("trailing and leading are both disabled", 2)
end
elseif self._callLeading or self._callTrailing or self._callLeadingFirstTime then
self._callLeadingFirstTime = false
-- As long as either leading or trailing are set to true, we are good
local remainingTime = self._nextCallTimeStamp - tick()
self._trailingValue = table.pack(...)
task.delay(remainingTime, function()
if self.Destroy then
self:_dispatch()
end
end)
end
end
function ThrottledFunction:_dispatch()
self._nextCallTimeStamp = tick() + self._timeout
local trailingValue = self._trailingValue
if trailingValue then
-- Clear before call so we are in valid state!
self._trailingValue = nil
self._func(unpack(trailingValue, 1, trailingValue.n))
end
end
function ThrottledFunction:_configureOrError(throttleConfig)
if throttleConfig == nil then
return
end
assert(type(throttleConfig) == "table", "throttleConfig must be table")
for key, value in pairs(throttleConfig) do
assert(type(value) == "boolean", "throttleConfig entry must be boolean")
if key == "leading" then
self._callLeading = value
elseif key == "trailing" then
self._callTrailing = value
elseif key == "leadingFirstTimeOnly" then
self._callLeadingFirstTime = value
else
error(string.format("bad key %q in throttleConfig", tostring(key)))
end
end
assert(self._callLeading or self._callTrailing, "cannot configure both leading and trailing disabled")
end
function ThrottledFunction:Destroy()
self._trailingValue = nil
self._func = nil
setmetatable(self, nil)
end
--[=[
Throttles emission of observables.
https://rxjs-dev.firebaseapp.com/api/operators/throttleTime
:::note
Note that on complete, the last item is not included, for now, unlike the existing version in rxjs.
:::
@param duration number
@param throttleConfig { leading = true, trailing = true }
@return (source: Observable) -> Observable
]=]
function export.throttleTime(duration: number, throttleConfig: {leading: boolean?, trailing: boolean?}): Transformer
assert(type(duration) == "number", "duration must be number")
assert(type(throttleConfig) == "table" or throttleConfig == nil, "throttleConfig must be table or nil")
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local throttledFunction = ThrottledFunction.new(duration, function(...)
sub:Fire(...)
end, throttleConfig)
return {
throttledFunction,
source:Subscribe(function(...)
throttledFunction:Call(...)
end, sub:GetFailComplete()),
}
end)
end
end
--[=[
Throttles emission of observables on the defer stack to the last emission.
@return (source: Observable) -> Observable
]=]
function export.throttleDefer(): Transformer
return function(source)
assert(isObservable(source), "source must be Observable")
return newObservable(function(sub)
local maid = Maid.new()
local lastResult
maid._ = source:Subscribe(function(...)
if not lastResult then
lastResult = table.pack(...)
-- Queue up our result
maid._currentQueue = task.defer(function()
local current = lastResult
lastResult = nil
if sub:IsPending() then
sub:Fire(table.unpack(current, 1, current.n))
end
end)
else
lastResult = table.pack(...)
end
end, sub:GetFailComplete())
return maid
end)
end
end
return table.freeze(export)
local Maid = require(script.Parent.Maid)
local Rx = require(script.Parent.Rx)
-- Rxi provides Rx primitives for producing live queries of instances.
--
-- In general, observables will emit nothing (that is, an empty tuple) when a
-- value is no longer observed.
local export = {}
local UNSET = newproxy()
-- Returns an observer that fires with no values, then completes.
local function nilobs()
return Rx.observable(function(sub)
sub:Fire()
sub:Complete()
end)
end
-- Returns an observable that emits the given instance, or nothing if it is not
-- an instance.
function export.of(instance: Instance): Rx.Observable
if typeof(instance) ~= "Instance" then
return nilobs()
end
return Rx.observable(function(sub)
sub:Fire(instance)
sub:Complete()
end)
end
-- Stops the chain unless the value is not nil.
function export.notNil(): Rx.Transformer
return Rx.where(function(value)
return value ~= nil
end)
end
-- Emits the value if it is of type *t*, or nothing otherwise.
function export.isTypeOf(t: string)
return Rx.whereElse(function(value)
return typeof(value) == t
end)
end
-- Emits the value if it is an instance of class *c*, or nothing otherwise.
function export.isClass(c: string)
return Rx.whereElse(function(value)
return typeof(value) == "Instance" and value.ClassName == c
end)
end
-- Emits the value if it is an instance inheriting class *c*, or nothing
-- otherwise.
function export.isClassOf(c: string)
return Rx.whereElse(function(value)
return typeof(value) == "Instance" and value:IsA(c)
end)
end
-- Emits the value if it is an item of enum *e*, or nothing otherwise.
function export.isEnum(e: Enum)
return Rx.whereElse(function(value)
return typeof(value) == "EnumItem" and value.EnumType == e
end)
end
-- Emits the value if it is an array where each element is of type *t*, or
-- nothing otherwise.
function export.isArrayOf(t: string)
return Rx.whereElse(function(value)
if type(value) ~= "table" then
return false
end
for _, v in value do
if typeof(v) ~= t then
return false
end
end
return true
end)
end
-- Returns whether *instance* has *property*.
local function hasProperty(instance, property)
return (pcall(instance.GetPropertyChangedSignal, instance, property))
end
-- Observes from the first value the property named *property*. Emits nothing
-- while getting the property produces an error, or the first value is not an
-- Instance.
function export.property(property: string): Rx.Transformer
return Rx.pipe{
export.isTypeOf("Instance"),
Rx.switchMap(function(instance: Instance)
if not instance then return nilobs() end
return Rx.observable(function(sub)
if not hasProperty(instance, property) then
sub:Fire()
sub:Complete()
return
end
local conn = instance:GetPropertyChangedSignal(property):Connect(function()
sub:Fire((instance::any)[property])
end)
sub:Fire((instance::any)[property])
return conn
end)
end),
}
end
-- Observes from the first value the attribute named *attribute*. Emits nothing
-- while the first value is not an Instance.
function export.attribute(attribute: string): Rx.Transformer
return Rx.pipe{
export.isTypeOf("Instance"),
Rx.switchMap(function(instance: Instance)
if not instance then return nilobs() end
return Rx.observable(function(sub)
local conn = instance:GetAttributeChangedSignal(attribute):Connect(function()
sub:Fire(instance:GetAttribute(attribute))
end)
sub:Fire(instance:GetAttribute(attribute))
return conn
end)
end),
}
end
-- Observes from the first value the first child where the Name property equals
-- *name*. Emits nothing while no child is found, or the first value is not an
-- Instance.
function export.findFirstChild(name: string)
return Rx.pipe{
export.isTypeOf("Instance"),
Rx.switchMap(function(instance)
return Rx.observable(function(sub)
local maid = Maid.new()
local current = UNSET
local function updateCurrent(child)
local next = instance:FindFirstChild(name)
if next ~= current then
current = next
sub:Fire(current)
end
end
maid.connAdded = instance.ChildAdded:Connect(function(child)
maid[child] = child:GetPropertyChangedSignal("Name"):Connect(function()
updateCurrent()
end)
updateCurrent()
end)
maid.connRemoved = instance.ChildRemoved:Connect(function(child)
maid[child] = nil
updateCurrent()
end)
for i, child in instance:GetChildren() do
maid[child] = child:GetPropertyChangedSignal("Name"):Connect(function()
updateCurrent()
end)
end
updateCurrent()
return maid
end)
end),
}
end
-- Observes from the first value the first child where the Name property equals
-- *name* and ClassName equals *className*. Emits nothing while no child is
-- found, or the first value is not an Instance.
function export.findFirstChildWithClass(className: string, name: string)
return Rx.pipe{
export.isTypeOf("Instance"),
switch = Rx.switchMap(function(instance)
return Rx.observable(function(sub)
local maid = Maid.new()
local current = UNSET
local function updateCurrent()
local next
for _, child in instance:GetChildren() do
if child.ClassName == className and child.Name == name then
next = child
break
end
end
if next ~= current then
current = next
sub:Fire(current)
end
end
maid.connAdded = instance.ChildAdded:Connect(function(child)
if child.ClassName ~= className then return end
maid[child] = child:GetPropertyChangedSignal("Name"):Connect(function()
updateCurrent()
end)
updateCurrent()
end)
maid.connRemoved = instance.ChildRemoved:Connect(function(child)
if child.ClassName ~= className then return end
maid[child] = nil
updateCurrent()
end)
for i, child in instance:GetChildren() do
if child.ClassName ~= className then continue end
maid[child] = child:GetPropertyChangedSignal("Name"):Connect(function()
updateCurrent()
end)
end
updateCurrent()
return maid
end)
end)
}
end
-- Observes from the first value the first child where the Name property equals
-- *name* and ClassName inherits *className*. Emits nothing while no child is
-- found, or the first value is not an Instance.
function export.findFirstChildWithClassOf(className: string, name: string)
return Rx.pipe{
export.isTypeOf("Instance"),
switch = Rx.switchMap(function(instance)
return Rx.observable(function(sub)
local maid = Maid.new()
local current = UNSET
local function updateCurrent()
local next
for _, child in instance:GetChildren() do
if child:IsA(className) and child.Name == name then
next = child
break
end
end
if next ~= current then
current = next
sub:Fire(current)
end
end
maid.connAdded = instance.ChildAdded:Connect(function(child)
if not child:IsA(className) then return end
maid[child] = child:GetPropertyChangedSignal("Name"):Connect(function()
updateCurrent()
end)
updateCurrent()
end)
maid.connRemoved = instance.ChildRemoved:Connect(function(child)
if not child:IsA(className) then return end
maid[child] = nil
updateCurrent()
end)
for i, child in instance:GetChildren() do
if not child:IsA(className) then continue end
maid[child] = child:GetPropertyChangedSignal("Name"):Connect(function()
updateCurrent()
end)
end
updateCurrent()
return maid
end)
end)
}
end
-- Observes from the first value the first child where the ClassName property
-- equals *className*. Emits nothing while no child is found, or the first value
-- is not an Instance.
function export.findFirstChildOfClass(className: string)
return Rx.pipe{
export.isTypeOf("Instance"),
Rx.switchMap(function(instance)
if not instance then return nilobs() end
return Rx.observable(function(sub)
local maid = Maid.new()
local current = UNSET
local function updateCurrent()
local next = instance:FindFirstChildOfClass(className)
if next ~= current then
current = next
sub:Fire(current)
end
end
maid.connAdded = instance.ChildAdded:Connect(updateCurrent)
maid.connRemoved = instance.ChildRemoved:Connect(updateCurrent)
updateCurrent()
return maid
end)
end),
}
end
-- Observes from the first value the first child where the ClassName property
-- inherits from *className*. Emits nothing while no child is found, or the
-- first value is not an Instance.
function export.findFirstChildWhichIsA(className: string)
return Rx.pipe{
export.isTypeOf("Instance"),
Rx.switchMap(function(instance)
if not instance then return nilobs() end
return Rx.observable(function(sub)
local maid = Maid.new()
local current = UNSET
local function updateCurrent()
local next = instance:FindFirstChildWhichIsA(className)
if next ~= current then
current = next
sub:Fire(current)
end
end
maid.connAdded = instance.ChildAdded:Connect(updateCurrent)
maid.connRemoved = instance.ChildRemoved:Connect(updateCurrent)
updateCurrent()
return maid
end)
end),
}
end
-- Observes the children of the first value. Emits nothing while the first value
-- is not an Instance.
function export.children()
return Rx.pipe{
export.isTypeOf("Instance"),
Rx.switchMap(function(instance)
if not instance then return nilobs() end
return Rx.observable(function(sub)
local maid = Maid.new()
local current
local function updateCurrent()
current = instance:GetChildren()
sub:Fire(current)
end
maid.connAdded = instance.ChildAdded:Connect(updateCurrent)
maid.connRemoved = instance.ChildRemoved:Connect(updateCurrent)
updateCurrent()
return maid
end)
end),
}
end
return table.freeze(export)
@blinkybool
Copy link

blinkybool commented Feb 14, 2023

Some of the findFirstChild transformers here have bugs. They need if not instance then return nilobs() end inside the switchMap.

Also thanks for making this!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment