Last active
January 17, 2024 18:42
-
-
Save Anaminus/1f31af4e5280b9333f3f58e13840c670 to your computer and use it in GitHub Desktop.
Standalone ReactiveX module ripped from Nevermore.Rx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--@sec: Maid | |
--@doc: The Maid module provides methods to manage tasks. A **task** is any | |
-- value that encapsulates the finalization of some procedure or state. | |
-- "Cleaning" a task invokes it, causing the procedure or state to be finalized. | |
-- | |
-- All tasks have the following contract when cleaning: | |
-- | |
-- - A task must not produce an error. | |
-- - A task must not yield. | |
-- - A task must finalize only once; cleaning an already cleaned task should be | |
-- a no-op. | |
-- - A task must not cause the production of more tasks. | |
-- | |
-- ### Maid class | |
-- | |
-- The **Maid** class is used to manage tasks more conveniently. Tasks can be | |
-- assigned to a maid to be cleaned later. | |
-- | |
-- A task can be assigned to a maid as "named" or "unnamed". With a named task: | |
-- | |
-- - The name can be any non-nil value. | |
-- - A named task can be individually cleaned. | |
-- - A named task can be individually unassigned from the maid without cleaning | |
-- it. | |
-- | |
-- Unnamed tasks can only be cleaned by destroying the maid. | |
-- | |
-- Any value can be assigned to a maid. Even if a task is not a known task type, | |
-- the maid will still hold on to the value. This might be used to hold off | |
-- garbage collection of a value that otherwise has only weak references. | |
-- | |
-- A maid is not reusable; after a maid has been destroyed, any tasks assigned | |
-- to the maid are cleaned immediately. | |
local export = {} | |
local DEBUG = false | |
export type Task = any | |
local task_cancel = task.cancel | |
-- Cleans *task*. *refs* is used to keep track of tables that have already been | |
-- traversed. | |
local function clean(task: Task, refs: {[any]:true}?) | |
if not task then | |
return | |
end | |
if type(task) == "function" then | |
task() | |
elseif type(task) == "thread" then | |
task_cancel(task) | |
elseif typeof(task) == "RBXScriptConnection" then | |
task:Disconnect() | |
elseif typeof(task) == "Instance" then | |
task:Destroy() | |
elseif type(task) == "table" then | |
if getmetatable(task) == nil then | |
if refs then | |
if refs[task] then | |
return | |
end | |
refs[task] = true | |
else | |
refs = {[task]=true} | |
end | |
for k, v in task do | |
clean(v, refs) | |
end | |
if not table.isfrozen(task) then | |
table.clear(task) | |
end | |
elseif type(task.Destroy) == "function" then | |
task:Destroy() | |
end | |
end | |
end | |
--@sec: Maid.clean | |
--@ord: -2 | |
--@def: function Maid.clean(...: Task) | |
--@doc: The **clean** function cleans each argument. Does nothing for arguments | |
-- that are not known task types. The following types are handled: | |
-- | |
-- - `function`: The function is called. | |
-- - `thread`: The thread is canceled with `task.cancel`. | |
-- - `RBXScriptConnection`: The Disconnect method is called. | |
-- - `Instance`: The Destroy method is called. | |
-- - `table` without metatable: The value of each entry is cleaned. This applies | |
-- recursively, and such tables are cleaned only once. The table is cleared | |
-- unless it is frozen. | |
-- - `table` with metatable and Destroy function: Destroy is called as a method. | |
function export.clean(...: Task) | |
for i = 1, select("#", ...) do | |
local task = select(i, ...) | |
clean(task) | |
end | |
end | |
--@sec: Maid.wrap | |
--@ord: -2 | |
--@def: function Maid.wrap(...: Task): () -> () | |
--@doc: The **wrap** function encapsulates the given tasks in a function that | |
-- cleans them when called. | |
-- | |
-- **Example:** | |
-- ```lua | |
-- local conn = RunService.Heartbeat:Connect(function(dt) | |
-- print("delta time", dt) | |
-- end) | |
-- return Maid.wrap(conn) | |
-- ``` | |
function export.wrap(...: Task): () -> () | |
local tasks | |
if select("#", ...) == 1 then | |
tasks = ... | |
else | |
tasks = {...} | |
end | |
return function() | |
local t = tasks | |
if t then | |
tasks = nil | |
clean(t) | |
end | |
end | |
end | |
-- If debugging, wrap returns a callable table instead of a function, allowing | |
-- the contents to be inspected. | |
if DEBUG then | |
function export.wrap(...: Task): () -> () | |
local self | |
if select("#", ...) == 1 then | |
self = {tasks = ...} | |
else | |
self = {tasks = {...}} | |
end | |
setmetatable(self, { | |
__call = function(self) | |
local tasks = self.tasks | |
if tasks then | |
self.tasks = nil | |
clean(tasks) | |
end | |
end, | |
}) | |
return self | |
end | |
end | |
local Maid = {__index=setmetatable({}, { | |
__index = function(self, k) | |
error(string.format("cannot index maid with %q", tostring(k)), 2) | |
end, | |
})} | |
export type Maid = typeof(setmetatable({}, Maid)) | |
--@sec: Maid.new | |
--@ord: -1 | |
--@def: function Maid.new(): Maid | |
--@doc: The **new** constructor returns a new instance of the Maid class. | |
-- | |
-- **Example:** | |
-- ```lua | |
-- local maid = Maid.new() | |
-- ``` | |
function export.new(): Maid | |
local self = { | |
-- Map of names to tasks. Can be nil, indicating that the maid is | |
-- destroyed. | |
_namedTasks = {}, | |
-- List of unnamed tasks. | |
_unnamedTasks = {}, | |
} | |
return setmetatable(self, Maid) | |
end | |
--@sec: Maid.Alive | |
--@def: function Maid:Alive(): boolean | |
--@doc: The **Alive** method returns false when the maid is destroyed, and true | |
-- otherwise. | |
-- | |
-- **Example:** | |
-- ```lua | |
-- if maid:Alive() then | |
-- maid.heartbeat = RunService.Heartbeat:Connect(function(dt) | |
-- print("delta time", dt) | |
-- end) | |
-- end | |
-- ``` | |
function Maid.__index:Alive(): boolean | |
return not not self._namedTasks | |
end | |
--@sec: Maid.Assign | |
--@def: function Maid:Assign(name: any, task: Task?) | |
--@doc: The **Assign** method performs an action depending on the type of | |
-- *task*. If *task* is nil, then the task assigned as *name* is cleaned, if | |
-- present. Otherwise, *task* is assigned to the maid as *name*. If a different | |
-- task (according to rawequal) is already assigned as *name*, then it is | |
-- cleaned. | |
-- | |
-- If the maid is destroyed, *task* is cleaned immediately. | |
-- | |
-- **Examples:** | |
-- ```lua | |
-- maid:Assign("part", Instance.new("Part")) | |
-- ``` | |
-- | |
-- Setting an assigned task to nil unassigns it from the maid and cleans it. | |
-- | |
-- ```lua | |
-- maid:Assign("part", nil) -- Remove task and clean it. | |
-- ``` | |
-- | |
-- Assigning a task with a name that is already assigned cleans the previous | |
-- task first. | |
-- | |
-- ```lua | |
-- maid:Assign("part", Instance.new("Part")) | |
-- maid:Assign("part", Instance.new("WedgePart")) | |
-- ``` | |
function Maid.__index:Assign(name: any, task: Task?) | |
if not self._namedTasks then | |
clean(task) | |
return | |
end | |
if task then | |
local prev = self._namedTasks[name] | |
if not rawequal(prev, task) then | |
self._namedTasks[name] = task | |
if prev then | |
clean(prev) | |
end | |
end | |
else | |
local prev = self._namedTasks[name] | |
self._namedTasks[name] = nil | |
clean(prev) | |
end | |
end | |
--@sec: Maid.AssignEach | |
--@def: function Maid:AssignEach(...: Task) | |
--@doc: The **AssignEach** method assigns each given argument as an unnamed | |
-- task. | |
-- | |
-- If the maid is destroyed, the each task is cleaned immediately. | |
function Maid.__index:AssignEach(...: Task) | |
if not self._namedTasks then | |
for i = 1, select("#", ...) do | |
local task = select(i, ...) | |
clean(task) | |
end | |
return | |
end | |
for i = 1, select("#", ...) do | |
local task = select(i, ...) | |
if task then | |
table.insert(self._unnamedTasks, task) | |
end | |
end | |
end | |
--@sec: Maid.Clean | |
--@def: function Maid:Clean(...: any) | |
--@doc: The **Clean** method receives a number of names, and cleans the task | |
-- assigned to the maid for each name. Does nothing if the maid is destroyed, | |
-- and does nothing for names that have no assigned task. | |
function Maid.__index:Clean(...: any) | |
if not self._namedTasks then | |
return | |
end | |
for i = 1, select("#", ...) do | |
local name = select(i, ...) | |
local task = self._namedTasks[name] | |
self._namedTasks[name] = nil | |
clean(task) | |
end | |
end | |
--@sec: Maid.Connect | |
--@def: function Maid:Connect(name: any?, signal: RBXScriptSignal, listener: () -> ()) | |
--@doc: The **Connect** method connects *listener* to *signal*, then assigns the | |
-- resulting connection to the maid as *name*. If *name* is nil, then the | |
-- connection is assigned as an unnamed task instead. Does nothing if the maid | |
-- is destroyed. | |
-- | |
-- Connect is the preferred method when using maids to manage signals, primarily | |
-- to resolve problems concerning the assignment to a destroyed maid: | |
-- - Slightly more efficient than regular assignment, since the connection of | |
-- the signal is never made. | |
-- - Certain signals can have side-effects when connecting, so avoiding the | |
-- connection entirely is more correct. | |
-- | |
-- **Example:** | |
-- ```lua | |
-- maid:Connect("heartbeat", RunService.Heartbeat, function(dt) | |
-- print("delta time", dt) | |
-- end) | |
-- ``` | |
function Maid.__index:Connect(name: any, signal: RBXScriptSignal, listener: () -> ()) | |
if not self._namedTasks then | |
return | |
end | |
local connection = signal:Connect(listener) | |
if name == nil then | |
table.insert(self._unnamedTasks, connection) | |
else | |
self:Assign(name, connection) | |
end | |
end | |
--@sec: Maid.Destroy | |
--@def: function Maid:Destroy() | |
--@doc: The **Destroy** method cleans all tasks currently assigned to the maid. | |
-- Does nothing if the maid is destroyed. | |
-- | |
-- **Example:** | |
-- ```lua | |
-- maid:Destroy() | |
-- ``` | |
function Maid.__index:Destroy() | |
local namedTasks = self._namedTasks | |
if not namedTasks then | |
return | |
end | |
local unnamedTasks = self._unnamedTasks | |
self._namedTasks = false | |
self._unnamedTasks = false | |
clean(namedTasks) | |
clean(unnamedTasks) | |
end | |
--@sec: Maid.Unassign | |
--@def: function Maid:Unassign(name: any): Task | |
--@doc: The **Unassign** method removes the task assigned to the maid as *name*, | |
-- returning the task. Returns nil if no task is assigned as *name*, or if the | |
-- maid is Destroyed. | |
function Maid.__index:Unassign(name: any): Task | |
if not self._namedTasks then | |
return nil | |
end | |
local task = self._namedTasks[name] | |
self._namedTasks[name] = nil | |
return task | |
end | |
--@sec: Maid.Wrap | |
--@def: function Maid:Wrap(): () -> () | |
--@doc: The **Wrap** method encapsulates the maid by returning a function that | |
-- cleans the maid when called. | |
-- | |
-- **Example:** | |
-- ```lua | |
-- return maid:Wrap() | |
-- ``` | |
function Maid.__index:Wrap(): () -> () | |
return export.wrap(self) | |
end | |
--@sec: Maid.__newindex | |
--@def: Maid[any] = Task? | |
--@doc: Assigns a task according to the [Assign][Maid.Assign] method, where the | |
-- index is the name, and the value is the task. If the index is a string that | |
-- is a single underscore, then the task is assigned according to | |
-- [AssignEach][Maid.AssignEach] instead. | |
-- | |
-- Tasks can be assigned to the maid like a table: | |
-- | |
-- ```lua | |
-- maid.foo = task -- Assign task as "foo". | |
-- ``` | |
-- | |
-- Setting an assigned task to nil unassigns it from the maid and cleans it: | |
-- | |
-- ```lua | |
-- maid.foo = nil -- Remove task and clean it. | |
-- ``` | |
-- | |
-- Assigning a task with a name that is already assigned cleans the previous | |
-- task first: | |
-- | |
-- ```lua | |
-- maid.foo = task -- Assign task as "foo". | |
-- maid.foo = otherTask -- Remove task, clean it, and assign otherTask as "foo". | |
-- ``` | |
-- | |
-- Assigning to the special `_` index assigns an unnamed task (to explicitly | |
-- assign as `_`, use the [Assign][Maid.Assign] method). | |
-- | |
-- ```lua | |
-- maid._ = task -- Assign task. | |
-- maid._ = otherTask -- Assign otherTask. | |
-- ``` | |
-- | |
-- **Note**: Tasks assigned to the maid cannot be indexed: | |
-- | |
-- ```lua | |
-- print(maid.foo) | |
-- --> ERROR: cannot index maid with "foo" | |
-- ``` | |
function Maid:__newindex(name: any, task: Task?) | |
if name == "_" then | |
self:AssignEach(task) | |
else | |
self:Assign(name, task) | |
end | |
end | |
return table.freeze(export) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local Maid = require(script.Parent.Maid) | |
export type Task = Maid.Task | |
export type Fire = (...any) -> () | |
export type Fail = (...any) -> () | |
export type Complete = () -> () | |
export type Subscribe = (Subscriber) -> Task | |
export type Transformer = (Observable) -> Observable | |
export type Subscription = () -> () | |
-- Rx is a standalone ReactiveX module ripped from Nevermore.Rx. | |
-- | |
-- https://quenty.github.io/NevermoreEngine/api/Rx | |
local export = {} | |
local UNSET = newproxy() | |
type stateType = "pending" | "failed" | "complete" | "cancelled" | |
local PENDING: "pending" = "pending" | |
local FAILED: "failed" = "failed" | |
local COMPLETE: "complete" = "complete" | |
local CANCELLED: "cancelled" = "cancelled" | |
-- Clean up task belonging to subscriber. | |
local function doCleanup(self: _Subscriber) | |
if self._cleanupTask then | |
local task = self._cleanupTask | |
self._cleanupTask = nil | |
Maid.clean(task) | |
end | |
end | |
export type Subscriber = { | |
Fire: (self: Subscriber, ...any) -> (), | |
Fail: (self: Subscriber, ...any) -> (), | |
Complete: (self: Subscriber) -> (), | |
GetFireFailComplete: (self: Subscriber) -> (Fire, Fail, Complete), | |
GetFailComplete: (self: Subscriber) -> (Fail, Complete), | |
IsPending: (self: Subscriber) -> boolean, | |
} | |
type _Subscriber = Subscriber & { | |
_state: stateType, | |
_fireCallback: Fire, | |
_failCallback: Fail, | |
_completeCallback: Complete, | |
_cleanupTask: Task?, | |
} | |
local Subscriber = {__index={}} | |
function newSubscriber(fire: Fire?, fail: Fail?, complete: Complete?): Subscriber | |
local subscriber = setmetatable({ | |
_state = PENDING, | |
_fireCallback = fire, | |
_failCallback = fail, | |
_completeCallback = complete, | |
_cleanupTask = nil, | |
}, Subscriber) | |
return subscriber | |
end | |
function Subscriber.__index:Fire(...: any) | |
if self._state == PENDING then | |
if self._fireCallback then | |
self._fireCallback(...) | |
end | |
elseif self._state == CANCELLED then | |
warn("Subscriber.Fire: event pushed to cancelled subscriber") | |
end | |
end | |
function Subscriber.__index:Fail(...: any) | |
if self._state ~= PENDING then | |
return | |
end | |
self._state = FAILED | |
if self._failCallback then | |
self._failCallback(...) | |
end | |
doCleanup(self) | |
end | |
function Subscriber.__index:GetFireFailComplete(): (Fire, Fail, Complete) | |
return function(...) | |
self:Fire(...) | |
end, function(...) | |
self:Fail(...) | |
end, function() | |
self:Complete() | |
end | |
end | |
function Subscriber.__index:GetFailComplete(): (Fail, Complete) | |
return function(...) | |
self:Fail(...) | |
end, function() | |
self:Complete() | |
end | |
end | |
function Subscriber.__index:Complete() | |
if self._state ~= PENDING then | |
return | |
end | |
self._state = COMPLETE | |
if self._completeCallback then | |
self._completeCallback() | |
end | |
doCleanup(self) | |
end | |
function Subscriber.__index:IsPending(): boolean | |
return self._state == PENDING | |
end | |
export type Observable = { | |
Pipe: (self: Observable, transformers: {Transformer}) -> Observable, | |
Subscribe: (self: Observable, | |
onFire: Fire?, | |
onFail: Fail?, | |
onComplete: Complete? | |
) -> Subscription, | |
} | |
local Observable = {__index={}} | |
function isObservable(v: any): boolean | |
return getmetatable(v) == Observable | |
end | |
export.isObservable = isObservable | |
local function newObservable(onSubscribe: Subscribe): Observable | |
assert(type(onSubscribe) == "function", "onSubscribe must be function") | |
local self = setmetatable({ | |
_onSubscribe = onSubscribe, | |
}, Observable) | |
return self | |
end | |
export.observable = newObservable | |
function Observable.__index:Pipe(transformers: {Transformer}): Observable | |
assert(type(transformers) == "table", "transformers must be a table") | |
local current = self | |
for _, transformer in pairs(transformers) do | |
assert(type(transformer) == "function", "transformer must be function") | |
current = transformer(current) | |
assert(isObservable(current)) | |
end | |
return current | |
end | |
function Observable.__index:Subscribe(fire: Fire?, fail: Fail?, complete: Complete?): Subscription | |
local sub = newSubscriber(fire, fail, complete) | |
local task = self._onSubscribe(sub) | |
local function subscription() | |
if sub._state == PENDING then | |
sub._state = CANCELLED | |
end | |
doCleanup(sub) | |
end | |
if not task then | |
return subscription | |
end | |
if sub._state ~= PENDING then | |
subscription() | |
return subscription | |
end | |
sub._cleanupTask = task | |
return subscription | |
end | |
--[=[ | |
An empty observable that completes immediately | |
@prop EMPTY Observable<()> | |
@readonly | |
@within Rx | |
]=] | |
--[=[ | |
An observable that never completes. | |
@prop NEVER Observable<()> | |
@readonly | |
@within Rx | |
]=] | |
export.empty = newObservable(function(sub) | |
sub:Complete() | |
end) | |
export.never = newObservable(function()end) | |
--[=[ | |
Pipes the tranformers through each other | |
https://rxjs-dev.firebaseapp.com/api/index/function/pipe | |
@param transformers { Observable<any> } | |
@return (source: Observable<T>) -> Observable<U> | |
]=] | |
function export.pipe(transformers: {Transformer}): Transformer | |
assert(type(transformers) == "table", "transformers must be a table") | |
for index, transformer in transformers do | |
if type(transformer) ~= "function" then | |
error(string.format("bad transformer %s: function expected, got %s", tostring(index), typeof(transformer)), 2) | |
end | |
end | |
return function(source: Observable): Observable | |
assert(source, "Bad source") | |
local current = source | |
for key, transformer in transformers do | |
current = transformer(current) | |
if not isObservable(current) then | |
error(string.format("bad result from transformer %s: Observable expected, got %q", tostring(key), tostring(current)), 2) | |
end | |
end | |
return current | |
end | |
end | |
--[=[ | |
http://reactivex.io/documentation/operators/just.html | |
```lua | |
Rx.of(1, 2, 3):Subscribe(print, function() | |
print("Complete") | |
end)) --> 1, 2, 3, "Complete" | |
``` | |
@param ... any -- Arguments to emit | |
@return Observable | |
]=] | |
function export.of(...: any): Observable | |
local args = table.pack(...) | |
return newObservable(function(sub) | |
for i = 1, args.n do | |
sub:Fire(args[i]) | |
end | |
sub:Complete() | |
end) | |
end | |
--[=[ | |
Converts an item | |
http://reactivex.io/documentation/operators/from.html | |
@param item Promise | table | |
@return Observable | |
]=] | |
function export.from(item: {any}): Observable | |
if type(item) == "table" then | |
return export.of(table.unpack(item)) | |
else | |
-- TODO: Iterator? | |
error("[Rx.from] - cannot convert") | |
end | |
end | |
--[=[ | |
https://rxjs-dev.firebaseapp.com/api/operators/merge | |
@param observables { Observable } | |
@return Observable | |
]=] | |
function export.merge(observables: {Observable}): Observable | |
assert(type(observables) == "table", "observables must be array of Obserable values") | |
for _, item in pairs(observables) do | |
assert(isObservable(item), "Observable expected") | |
end | |
return newObservable(function(sub) | |
local maid = {} | |
for _, observable in pairs(observables) do | |
table.insert(maid, observable:Subscribe(sub:GetFireFailComplete())) | |
end | |
return maid | |
end) | |
end | |
--[=[ | |
Converts a Signal into an observable. | |
https://rxjs-dev.firebaseapp.com/api/index/function/fromEvent | |
@param event Signal<T> | |
@return Observable<T> | |
]=] | |
function export.fromSignal(event: RBXScriptSignal): Observable | |
return newObservable(function(sub) | |
-- This stream never completes or fails! | |
return event:Connect(function(...) | |
sub:Fire(...) | |
end) | |
end) | |
end | |
--[=[ | |
Taps into the observable and executes the onFire/onFail/onComplete | |
commands. | |
https://rxjs-dev.firebaseapp.com/api/operators/tap | |
@param onFire function? | |
@param onFail function? | |
@param onComplete function? | |
@return (source: Observable<T>) -> Observable<T> | |
]=] | |
function export.tap(onFire: Fire?, onFail: Fail?, onComplete: Complete?): Transformer | |
assert(type(onFire) == "function" or onFire == nil, "onFire must be function or nil") | |
assert(type(onFail) == "function" or onFail == nil, "onFail must be function or nil") | |
assert(type(onComplete) == "function" or onComplete == nil, "onComplete must be function or nil") | |
return function(source: Observable): Observable | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
return source:Subscribe( | |
function(...) | |
if onFire then | |
onFire(...) | |
end | |
if sub:IsPending() then | |
sub:Fire(...) | |
end | |
end, | |
function(...) | |
if onFail then | |
onFail(...) | |
end | |
sub:Fail(...) | |
end, | |
function() | |
if onComplete then | |
onComplete() | |
end | |
sub:Complete() | |
end) | |
end) | |
end | |
end | |
--[=[ | |
Starts the observable with the given value from the callback | |
http://reactivex.io/documentation/operators/start.html | |
@param callback function | |
@return (source: Observable) -> Observable | |
]=] | |
function export.start(callback: () -> any): Transformer | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
sub:Fire(callback()) | |
return source:Subscribe(sub:GetFireFailComplete()) | |
end) | |
end | |
end | |
--[=[ | |
Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. | |
When all subscribers have unsubscribed it will unsubscribe from the source Observable. | |
https://rxjs.dev/api/operators/share | |
@return (source: Observable) -> Observable | |
]=] | |
function export.share(): Transformer | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
local _currentSub: Subscription? = nil | |
local subs = {} | |
local lastFail = UNSET | |
local lastComplete = UNSET | |
local function connectToSourceIfNeeded() | |
if not _currentSub then | |
lastFail = UNSET | |
lastComplete = UNSET | |
_currentSub = source:Subscribe(function(...) | |
for _, sub in pairs(subs) do | |
sub:Fire(...) | |
end | |
end, function(...) | |
lastFail = table.pack(...) | |
for _, sub in pairs(subs) do | |
sub:Fail(...) | |
end | |
end, function(...) | |
lastComplete = table.pack(...) | |
for _, sub in pairs(subs) do | |
sub:Complete(...) | |
end | |
end) | |
end | |
end | |
local function disconnectFromSource() | |
if _currentSub then | |
_currentSub() | |
_currentSub = nil | |
end | |
lastFail = UNSET | |
lastComplete = UNSET | |
end | |
return newObservable(function(sub) | |
if lastFail ~= UNSET then | |
sub:Fail(table.unpack(lastFail, 1, lastFail.n)) | |
return | |
end | |
if lastComplete ~= UNSET then | |
sub:Fail(table.unpack(lastComplete, 1, lastComplete.n)) | |
return | |
end | |
table.insert(subs, sub) | |
connectToSourceIfNeeded() | |
return function() | |
local index = table.find(subs, sub) | |
if index then | |
table.remove(subs, index) | |
if #subs == 0 then | |
disconnectFromSource() | |
end | |
end | |
end | |
end) | |
end | |
end | |
--[=[ | |
Same as [Rx.share] except it also replays the value | |
@param bufferSize number -- Number of entries to cache | |
@param windowTimeSeconds number -- Time | |
@return (source: Observable) -> Observable | |
]=] | |
function export.shareReplay(bufferSize: number?, windowTimeSeconds: number?): Transformer | |
assert(type(bufferSize) == "number" or bufferSize == nil, "bufferSize must be number or nil") | |
assert(type(windowTimeSeconds) == "number" or windowTimeSeconds == nil, "windowTimeSeconds must be number or nil") | |
bufferSize = bufferSize or math.huge | |
windowTimeSeconds = windowTimeSeconds or math.huge | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
local _currentSub: Subscription? = nil | |
local subs = {} | |
local buffer = {} | |
local lastFail = UNSET | |
local lastComplete = UNSET | |
local function getEventsCopy() | |
local now = os.clock() | |
local events = {} | |
for _, event in pairs(buffer) do | |
if (now - event.timestamp) <= windowTimeSeconds then | |
table.insert(events, event) | |
end | |
end | |
return events | |
end | |
local function connectToSourceIfNeeded() | |
if not _currentSub then | |
buffer = {} | |
lastFail = UNSET | |
lastComplete = UNSET | |
_currentSub = source:Subscribe(function(...) | |
-- TODO: also prune events by timestamp | |
if #buffer + 1 > bufferSize then | |
table.remove(buffer, 1) -- O(n), not great. | |
end | |
-- Queue before we start | |
local event = table.pack(...) | |
event.timestamp = os.clock() | |
table.insert(buffer, event) | |
for _, sub in pairs(subs) do | |
sub:Fire(...) | |
end | |
end, function(...) | |
lastFail = table.pack(...) | |
for _, sub in pairs(subs) do | |
sub:Fail(...) | |
end | |
end, function(...) | |
lastComplete = table.pack(...) | |
for _, sub in pairs(subs) do | |
sub:Complete(...) | |
end | |
end) | |
end | |
end | |
local function disconnectFromSource() | |
if _currentSub then | |
_currentSub() | |
_currentSub = nil | |
end | |
buffer = {} | |
lastFail = UNSET | |
lastComplete = UNSET | |
end | |
return newObservable(function(sub) | |
if lastFail ~= UNSET then | |
sub:Fail(table.unpack(lastFail, 1, lastFail.n)) | |
return | |
end | |
if lastComplete ~= UNSET then | |
sub:Fail(table.unpack(lastComplete, 1, lastComplete.n)) | |
return | |
end | |
table.insert(subs, sub) | |
-- Firing could lead to re-entrance. Lets just use the buffer as-is. | |
for _, item in pairs(getEventsCopy()) do | |
sub:Fire(table.unpack(item, 1, item.n)) | |
end | |
connectToSourceIfNeeded() | |
return function() | |
local index = table.find(subs, sub) | |
if index then | |
table.remove(subs, index) | |
if #subs == 0 then | |
disconnectFromSource() | |
end | |
end | |
end | |
end) | |
end | |
end | |
--[=[ | |
Caches the current value | |
@return (source: Observable) -> Observable | |
]=] | |
function export.cache(): Transformer | |
return export.shareReplay(1) | |
end | |
--[=[ | |
Like start, but also from (list!) | |
@param callback () -> { T } | |
@return (source: Observable) -> Observable | |
]=] | |
function export.startFrom(callback: () -> {any}): Transformer | |
assert(type(callback) == "function", "callback must be function") | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
for _, value in pairs(callback()) do | |
sub:Fire(value) | |
end | |
return source:Subscribe(sub:GetFireFailComplete()) | |
end) | |
end | |
end | |
--[=[ | |
Starts with the given values | |
https://rxjs-dev.firebaseapp.com/api/operators/startWith | |
@param values { T } | |
@return (source: Observable) -> Observable | |
]=] | |
function export.startWith(values: {any}): Transformer | |
assert(type(values) == "table", "values must be array of values") | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
for _, item in pairs(values) do | |
sub:Fire(item) | |
end | |
return source:Subscribe(sub:GetFireFailComplete()) | |
end) | |
end | |
end | |
--[=[ | |
Defaults the observable to a value if it isn't fired immediately | |
```lua | |
Rx.NEVER:Pipe({ | |
Rx.defaultsTo("Hello") | |
}):Subscribe(print) --> Hello | |
``` | |
@param value any | |
@return (source: Observable) -> Observable | |
]=] | |
function export.defaultsTo(value: any): Transformer | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local fired = false | |
local subscription = source:Subscribe( | |
function(...) | |
fired = true | |
sub:Fire(...) | |
end, | |
sub:GetFailComplete()) | |
if not fired then | |
sub:Fire(value) | |
end | |
return subscription | |
end) | |
end | |
end | |
--[=[ | |
Defaults the observable value to nil | |
```lua | |
Rx.NEVER:Pipe({ | |
Rx.defaultsToNil | |
}):Subscribe(print) --> nil | |
``` | |
Great for defaulting Roblox attributes and objects | |
@function defaultsToNil | |
@param source Observable | |
@return Observable | |
@within Rx | |
]=] | |
export.defaultsToNil = export.defaultsTo(nil) | |
--[=[ | |
Ends the observable with these values before cancellation | |
https://www.learnrxjs.io/learn-rxjs/operators/combination/endwith | |
@param values { T } | |
@return (source: Observable) -> Observable | |
]=] | |
function export.endWith(values: {any}): Transformer | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
return source:Subscribe( | |
function(...) | |
sub:Fire(...) | |
end, | |
function(...) | |
for _, item in pairs(values) do | |
sub:Fire(item) | |
end | |
sub:Fail(...) | |
end, | |
function() | |
for _, item in pairs(values) do | |
sub:Fire(item) | |
end | |
sub:Complete() | |
end) | |
end) | |
end | |
end | |
export type Predicate = (...any) -> boolean | |
--[=[ | |
http://reactivex.io/documentation/operators/filter.html | |
Filters out values | |
```lua | |
Rx.of(1, 2, 3, 4, 5):Pipe({ | |
Rx.where(function(value) | |
return value % 2 == 0 | |
end) | |
}):Subscribe(print) --> 2, 4 | |
``` | |
@param predicate (value: T) -> boolean | |
@return (source: Observable<T>) -> Observable<T> | |
]=] | |
function export.where(predicate: Predicate): Transformer | |
assert(type(predicate) == "function", "predicate must be function") | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
return source:Subscribe( | |
function(...) | |
if predicate(...) then | |
sub:Fire(...) | |
end | |
end, | |
sub:GetFailComplete() | |
) | |
end) | |
end | |
end | |
-- Return values if they satisfy predicate. Return defaults otherwise. | |
function export.whereElse(predicate: Predicate, ...: any): Transformer | |
assert(type(predicate) == "function", "predicate must be function") | |
local args = table.pack(...) | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
return source:Subscribe( | |
function(...) | |
if predicate(...) then | |
sub:Fire(...) | |
else | |
sub:Fire(table.unpack(args, 1, args.n)) | |
end | |
end, | |
sub:GetFailComplete() | |
) | |
end) | |
end | |
end | |
--[=[ | |
Only takes distinct values from the observable stream. | |
http://reactivex.io/documentation/operators/distinct.html | |
```lua | |
Rx.of(1, 1, 2, 3, 3, 1):Pipe({ | |
Rx.distinct() | |
}):Subscribe(print) --> 1, 2, 3, 1 | |
``` | |
@return (source: Observable<T>) -> Observable<T> | |
]=] | |
function export.distinct(): Transformer | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local last = UNSET | |
return source:Subscribe( | |
function(value) | |
-- TODO: Support tuples | |
if last == value then | |
return | |
end | |
last = value | |
sub:Fire(last) | |
end, | |
sub:GetFailComplete() | |
) | |
end) | |
end | |
end | |
--[=[ | |
https://rxjs.dev/api/operators/mapTo | |
@param ... any -- The value to map each source value to. | |
@return (source: Observable<T>) -> Observable<T> | |
]=] | |
function export.mapTo(...: any): Transformer | |
local args = table.pack(...) | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
return source:Subscribe(function() | |
sub:Fire(table.unpack(args, 1, args.n)) | |
end, sub:GetFailComplete()) | |
end) | |
end | |
end | |
export type Project = (...any) -> (...any) | |
--[=[ | |
http://reactivex.io/documentation/operators/map.html | |
Maps one value to another | |
```lua | |
Rx.of(1, 2, 3, 4, 5):Pipe({ | |
Rx.map(function(x) | |
return x + 1 | |
end) | |
}):Subscribe(print) -> 2, 3, 4, 5, 6 | |
``` | |
@param project (T) -> U | |
@return (source: Observable<T>) -> Observable<U> | |
]=] | |
function export.map(project: Project): Transformer | |
assert(type(project) == "function", "project must be function") | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
return source:Subscribe(function(...) | |
sub:Fire(project(...)) | |
end, sub:GetFailComplete()) | |
end) | |
end | |
end | |
--[=[ | |
Merges higher order observables together. | |
Basically, if you have an observable that is emitting an observable, | |
this subscribes to each emitted observable and combines them into a | |
single observable. | |
```lua | |
Rx.of(Rx.of(1, 2, 3), Rx.of(4)) | |
:Pipe({ | |
Rx.mergeAll() | |
}) | |
:Subscribe(print) -> 1, 2, 3, 4 | |
``` | |
@return (source: Observable<Observable<T>>) -> Observable<T> | |
]=] | |
function export.mergeAll(): Transformer | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local maid = Maid.new() | |
local pendingCount = 0 | |
local topComplete = false | |
maid._ = source:Subscribe( | |
function(observable) | |
assert(isObservable(observable), "Observable expected") | |
pendingCount = pendingCount + 1 | |
local subscription | |
subscription = observable:Subscribe( | |
function(...) | |
-- Merge each inner observable | |
sub:Fire(...) | |
end, | |
function(...) | |
-- Emit failure automatically | |
sub:Fail(...) | |
end, | |
function() | |
maid[subscription] = nil | |
pendingCount = pendingCount - 1 | |
if pendingCount == 0 and topComplete then | |
sub:Complete() | |
maid:Destroy() | |
end | |
end) | |
maid[subscription] = subscription | |
end, | |
function(...) | |
sub:Fail(...) -- Also reflect failures up to the top! | |
maid:Destroy() | |
end, | |
function() | |
topComplete = true | |
if pendingCount == 0 then | |
sub:Complete() | |
maid:Destroy() | |
end | |
end) | |
return maid | |
end) | |
end | |
end | |
--[=[ | |
Merges higher order observables together | |
https://rxjs.dev/api/operators/switchAll | |
Works like mergeAll, where you subscribe to an observable which is | |
emitting observables. However, when another observable is emitted it | |
disconnects from the other observable and subscribes to that one. | |
@return (source: Observable<Observable<T>>) -> Observable<T> | |
]=] | |
function export.switchAll(): Transformer | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local outerMaid = Maid.new() | |
local topComplete = false | |
local insideComplete = false | |
local currentInside = nil | |
outerMaid._ = function() | |
-- Ensure inner subscription is disconnected first. This prevents | |
-- the inner sub from firing while the outer is subscribed, | |
-- throwing a warning. | |
outerMaid._innerSub = nil | |
outerMaid._outerSuber = nil | |
end | |
outerMaid._outerSuber = source:Subscribe( | |
function(observable) | |
assert(isObservable(observable), "Observable expected") | |
insideComplete = false | |
currentInside = observable | |
outerMaid._innerSub = observable:Subscribe( | |
function(...) | |
sub:Fire(...) | |
end, -- Merge each inner observable | |
function(...) | |
if currentInside == observable then | |
sub:Fail(...) | |
end | |
end, -- Emit failure automatically | |
function() | |
if currentInside == observable then | |
insideComplete = true | |
if insideComplete and topComplete then | |
sub:Complete() | |
outerMaid:Destroy() -- Paranoid ensure cleanup. | |
end | |
end | |
end) | |
end, | |
function(...) | |
sub:Fail(...) -- Also reflect failures up to the top! | |
outerMaid:Destroy() | |
end, | |
function() | |
topComplete = true | |
if insideComplete and topComplete then | |
sub:Complete() | |
outerMaid:Destroy() -- Paranoid ensure cleanup | |
end | |
end) | |
return outerMaid | |
end) | |
end | |
end | |
--[=[ | |
Sort of equivalent of promise.then() | |
This takes a stream of observables | |
@param project (value: T) -> Observable<U> | |
@param resultSelector ((initialValue: T, outputValue: U) -> U)? | |
@return (source: Observable<T>) -> Observable<U> | |
]=] | |
function export.flatMap(project: (any) -> Observable, resultSelector: ((any, any) -> any)?): Transformer | |
assert(type(project) == "function", "project must be function") | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local maid = Maid.new() | |
local pendingCount = 0 | |
local topComplete = false | |
maid._ = source:Subscribe( | |
function(...) | |
local outerValue = ... | |
local observable = project(...) | |
assert(isObservable(observable), "project must return Observable") | |
pendingCount = pendingCount + 1 | |
local innerMaid = Maid.new() | |
innerMaid._ = observable:Subscribe( | |
function(...) | |
-- Merge each inner observable | |
if resultSelector then | |
sub:Fire(resultSelector(outerValue, ...)) | |
else | |
sub:Fire(...) | |
end | |
end, | |
function(...) | |
sub:Fail(...) | |
end, -- Emit failure automatically | |
function() | |
innerMaid:Destroy() | |
pendingCount = pendingCount - 1 | |
if pendingCount == 0 and topComplete then | |
sub:Complete() | |
maid:Destroy() | |
end | |
end) | |
maid._ = innerMaid | |
end, | |
function(...) | |
sub:Fail(...) -- Also reflect failures up to the top! | |
maid:Destroy() | |
end, | |
function() | |
topComplete = true | |
if pendingCount == 0 then | |
sub:Complete() | |
maid:Destroy() | |
end | |
end) | |
return maid | |
end) | |
end | |
end | |
function export.switchMap(project: Project): Transformer | |
return export.pipe({ | |
export.map(project), | |
export.switchAll(), | |
}) | |
end | |
function export.takeUntil(notifier: Observable): Transformer | |
assert(isObservable(notifier)) | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local maid = Maid.new() | |
local cancelled = false | |
local function cancel() | |
maid:Destroy() | |
cancelled = true | |
end | |
-- Any value emitted will cancel (complete without any values allows all values to pass) | |
maid._ = notifier:Subscribe(cancel, cancel, nil) | |
-- Cancelled immediately? Oh boy. | |
if cancelled then | |
maid:Destroy() | |
return nil | |
end | |
-- Subscribe! | |
maid._ = source:Subscribe(sub:GetFireFailComplete()) | |
return maid | |
end) | |
end | |
end | |
--[=[ | |
Returns an observable that takes in a tuple, and emits that tuple, then | |
completes. | |
```lua | |
Rx.packed("a", "b") | |
:Subscribe(function(first, second) | |
print(first, second) --> a, b | |
end) | |
``` | |
@param ... any | |
@return Observable | |
]=] | |
function export.packed(...: {any}): Observable | |
local args = table.pack(...) | |
return newObservable(function(sub) | |
sub:Fire(table.unpack(args, 1, args.n)) | |
sub:Complete() | |
end) | |
end | |
--[=[ | |
Unpacks the observables value if a table is received | |
@param observable Observable<{T}> | |
@return Observable<T> | |
]=] | |
function export.unpacked(observable: Observable, i: number?, j: number?): Observable | |
assert(isObservable(observable), "observable must be Observable") | |
return newObservable(function(sub) | |
return observable:Subscribe(function(value) | |
if type(value) == "table" then | |
sub:Fire(table.unpack(value, i or 1, j or #value)) | |
else | |
warn(string.format("Observable result: table expected, got %s", typeof(value))) | |
end | |
end, sub:GetFailComplete()) | |
end) | |
end | |
--[=[ | |
Acts as a finalizer callback once the subscription is unsubscribed. | |
```lua | |
Rx.of("a", "b"):Pipe({ | |
Rx.finalize(function() | |
print("Subscription done!") | |
end) | |
}) | |
``` | |
http://reactivex.io/documentation/operators/do.html | |
https://rxjs-dev.firebaseapp.com/api/operators/finalize | |
https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/finalize.ts | |
@param finalizerCallback () -> () | |
@return (source: Observable<T>) -> Observable<T> | |
]=] | |
function export.finalize(finalizerCallback: () -> ()): Transformer | |
assert(type(finalizerCallback) == "function", "finalizerCallback must be function") | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
return { | |
source:Subscribe(sub:GetFireFailComplete()), | |
finalizerCallback, | |
} | |
end) | |
end | |
end | |
--[=[ | |
Given an observable that emits observables, emit an | |
observable that once the initial observable completes, | |
the latest values of each emitted observable will be | |
combined into an array that will be emitted. | |
https://rxjs.dev/api/operators/combineLatestAll | |
@return (source: Observable<Observable<T>>) -> Observable<{ T }> | |
]=] | |
function export.combineLatestAll(): Transformer | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local observables = {} | |
local alive = true | |
local maid = {} | |
table.insert(maid, function() alive = false end) | |
table.insert(maid, source:Subscribe( | |
function(value) | |
assert(isObservable(value)) | |
table.insert(observables, value) | |
end, | |
function(...) | |
sub:Fail(...) | |
end, | |
function() | |
if not alive then | |
return | |
end | |
table.insert(maid, export.combineLatest(observables):Subscribe(sub:GetFireFailComplete())) | |
end | |
)) | |
return maid | |
end) | |
end | |
end | |
--[=[ | |
The same as combineLatestAll. | |
This is for backwards compatability, and is deprecated. | |
@function combineAll | |
@deprecated 1.0.0 -- Use Rx.combineLatestAll | |
@within Rx | |
@return (source: Observable<Observable<T>>) -> Observable<{ T }> | |
]=] | |
export.combineAll = export.combineLatestAll | |
--[=[ | |
Catches an error, and allows another observable to be subscribed | |
in terms of handling the error. | |
:::warning | |
This method is not yet tested | |
::: | |
@param callback (error: TError) -> Observable<TErrorResult> | |
@return (source: Observable<T>) -> Observable<T | TErrorResult> | |
]=] | |
function export.catchError(callback: (...any) -> Observable): Transformer | |
assert(type(callback) == "function", "callback must be a function") | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local maid = {} | |
-- Yikes, let's hope event ordering is good | |
local alive = true | |
table.insert(maid, function() alive = false end) | |
table.insert(maid, source:Subscribe( | |
function(...) | |
sub:Fire(...) | |
end, | |
function(...) | |
if not alive then | |
-- if we failed because maid was cancelled, then we'll get called here? | |
-- I think. | |
return | |
end | |
-- at this point, we can only have one error, so we need to subscribe to the result | |
-- and continue the observiable | |
local observable = callback(...) | |
assert(isObservable(observable), "callback must return Observable") | |
table.insert(maid, observable:Subscribe(sub:GetFireFailComplete())) | |
end, | |
function() | |
sub:Complete() | |
end | |
)) | |
return maid | |
end) | |
end | |
end | |
--[=[ | |
One of the most useful functions this combines the latest values of | |
observables at each chance! | |
```lua | |
Rx.combineLatest({ | |
child = Rx.fromSignal(Workspace.ChildAdded), | |
lastChildRemoved = Rx.fromSignal(Workspace.ChildRemoved), | |
value = 5, | |
}):Subscribe(function(data) | |
print(data.child) --> last child | |
print(data.lastChildRemoved) --> other value | |
print(data.value) --> 5 | |
end) | |
``` | |
:::tip | |
Note that the resulting observable will not emit until all input | |
observables are emitted. | |
::: | |
@param observables { [TKey]: Observable<TEmitted> | TEmitted } | |
@return Observable<{ [TKey]: TEmitted }> | |
]=] | |
function export.combineLatest(observables: {[any]: Observable}): Observable | |
assert(type(observables) == "table", "observables must be a dictionary") | |
return newObservable(function(sub) | |
local pending = 0 | |
local latest = {} | |
for key, value in pairs(observables) do | |
if isObservable(value) then | |
pending = pending + 1 | |
latest[key] = UNSET | |
else | |
latest[key] = value | |
end | |
end | |
if pending == 0 then | |
sub:Fire(latest) | |
sub:Complete() | |
return | |
end | |
local maid = {} | |
local function fireIfAllSet() | |
for _, value in pairs(latest) do | |
if value == UNSET then | |
return | |
end | |
end | |
sub:Fire(table.clone(latest)) | |
end | |
for key, observer in pairs(observables) do | |
if isObservable(observer) then | |
table.insert(maid, observer:Subscribe( | |
function(value) | |
latest[key] = value | |
fireIfAllSet() | |
end, | |
function(...) | |
pending = pending - 1 | |
sub:Fail(...) | |
end, | |
function() | |
pending = pending - 1 | |
if pending == 0 then | |
sub:Complete() | |
end | |
end) | |
) | |
end | |
end | |
return maid | |
end) | |
end | |
--[=[ | |
http://reactivex.io/documentation/operators/using.html | |
Each time a subscription occurs, the resource is constructed | |
and exists for the lifetime of the observation. The observableFactory | |
uses the resource for subscription. | |
:::note | |
Note from Quenty: I haven't found this that useful. | |
::: | |
@param resourceFactory () -> MaidTask | |
@param observableFactory (MaidTask) -> Observable<T> | |
@return Observable<T> | |
]=] | |
function export.using(resourceFactory: () -> Task, observableFactory: (Task) -> Observable): Observable | |
return newObservable(function(sub) | |
local maid = {} | |
local resource = resourceFactory() | |
table.insert(maid, resource) | |
local observable = observableFactory(resource) | |
assert(isObservable(observable), "factory must return Observable") | |
table.insert(maid, observable:Subscribe(sub:GetFireFailComplete())) | |
return maid | |
end) | |
end | |
--[=[ | |
Takes the first entry and terminates the observable. Equivalent to the following: | |
```lua | |
Rx.take(1) | |
``` | |
https://reactivex.io/documentation/operators/first.html | |
@return (source: Observable<T>) -> Observable<T> | |
]=] | |
function export.first(): Transformer | |
return export.take(1) | |
end | |
--[=[ | |
Takes n entries and then completes the observation. | |
https://rxjs.dev/api/operators/take | |
@param number number | |
@return (source: Observable<T>) -> Observable<T> | |
]=] | |
function export.take(number: number): Transformer | |
assert(type(number) == "number", "number must be number") | |
assert(number > 0, "Bad number") | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local taken = 0 | |
return source:Subscribe(function(...) | |
if taken >= number then | |
warn("Rx.take: still getting values past subscription") | |
return | |
end | |
taken = taken + 1 | |
sub:Fire(...) | |
if taken >= number then | |
sub:Complete() | |
end | |
end, sub:GetFailComplete()) | |
end) | |
end | |
end | |
--[=[ | |
Takes n entries and then completes the observation. | |
https://rxjs.dev/api/operators/take | |
@param toSkip number | |
@return (source: Observable<T>) -> Observable<T> | |
]=] | |
function export.skip(toSkip: number): Transformer | |
assert(type(toSkip) == "number", "toSkip must be number") | |
assert(toSkip > 0, "toSkip must be greater than 0") | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local skipped = 0 | |
return source:Subscribe(function(...) | |
if skipped <= toSkip then | |
skipped = skipped + 1 | |
return | |
end | |
sub:Fire(...) | |
end, sub:GetFailComplete()) | |
end) | |
end | |
end | |
--[=[ | |
Defers the subscription and creation of the observable until the | |
actual subscription of the observable. | |
https://rxjs-dev.firebaseapp.com/api/index/function/defer | |
https://netbasal.com/getting-to-know-the-defer-observable-in-rxjs-a16f092d8c09 | |
@param observableFactory () -> Observable<T> | |
@return Observable<T> | |
]=] | |
function export.defer(observableFactory: () -> Observable): Observable | |
return newObservable(function(sub) | |
local observable | |
local ok, err = pcall(function() | |
observable = observableFactory() | |
end) | |
if not ok then | |
sub:Fail(err) | |
return | |
end | |
if not isObservable(observable) then | |
sub:Fail("Observable expected") | |
return | |
end | |
return observable:Subscribe(sub:GetFireFailComplete()) | |
end) | |
end | |
--[=[ | |
Shift the emissions from an Observable forward in time by a particular amount. | |
@param seconds number | |
@return (source: Observable<T>) -> Observable<T> | |
]=] | |
function export.delay(seconds: number): Transformer | |
assert(type(seconds) == "number", "seconds must be number") | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local maid = Maid.new() | |
maid._ = source:Subscribe(function(...) | |
local args = table.pack(...) | |
maid[args] = task.delay(seconds, function() | |
maid[args] = nil | |
sub:Fire(table.unpack(args, 1, args.n)) | |
end) | |
end, sub:GetFailComplete()) | |
return maid | |
end) | |
end | |
end | |
--[=[ | |
Emits output every `n` seconds | |
@param initialDelaySeconds number | |
@param seconds number | |
@return (source: Observable<number>) -> Observable<number> | |
]=] | |
function export.timer(initialDelaySeconds: number?, seconds: number): Observable | |
assert(type(initialDelaySeconds) == "number" or initialDelaySeconds == nil, "initialDelaySeconds must be number or nil") | |
assert(type(seconds) == "number", "seconds must be number") | |
return newObservable(function(sub) | |
local number = -1 | |
local running = true | |
local thread = task.spawn(function() | |
if initialDelaySeconds and initialDelaySeconds > 0 then | |
task.wait(initialDelaySeconds) | |
end | |
while running do | |
number += 1 | |
sub:Fire(number) | |
task.wait(seconds) | |
end | |
end) | |
return function() | |
running = false | |
task.cancel(thread) | |
end | |
end) | |
end | |
--[=[ | |
Honestly, I have not used this one much. | |
https://rxjs-dev.firebaseapp.com/api/operators/withLatestFrom | |
https://medium.com/js-in-action/rxjs-nosy-combinelatest-vs-selfish-withlatestfrom-a957e1af42bf | |
@param inputObservables {Observable<TInput>} | |
@return (source: Observable<T>) -> Observable<{T, ...TInput}> | |
]=] | |
function export.withLatestFrom(inputObservables: {Observable}): Transformer | |
assert(type(inputObservables) == "table", "inputObservables must be array of Observable values") | |
for _, observable in pairs(inputObservables) do | |
assert(isObservable(observable), "observable expected") | |
end | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local maid = {} | |
local latest = {} | |
for key, observable in pairs(inputObservables) do | |
latest[key] = UNSET | |
table.insert(maid, observable:Subscribe(function(value) | |
latest[key] = value | |
end, nil, nil)) | |
end | |
table.insert(maid, source:Subscribe(function(value) | |
for _, item in pairs(latest) do | |
if item == UNSET then | |
return | |
end | |
end | |
sub:Fire({value, unpack(latest)}) | |
end, sub:GetFailComplete())) | |
return maid | |
end) | |
end | |
end | |
--[=[ | |
https://rxjs-dev.firebaseapp.com/api/operators/scan | |
@param accumulator (current: TSeed, ...: TInput) -> TResult | |
@param seed TSeed | |
@return (source: Observable<TInput>) -> Observable<TResult> | |
]=] | |
function export.scan(accumulator: (any, ...any) -> any, seed: any): Transformer | |
assert(type(accumulator) == "function", "accumulator must be function") | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local current = seed | |
return source:Subscribe(function(...) | |
current = accumulator(current, ...) | |
sub:Fire(current) | |
end, sub:GetFailComplete()) | |
end) | |
end | |
end | |
local ThrottledFunction = {} | |
ThrottledFunction.ClassName = "ThrottledFunction" | |
ThrottledFunction.__index = ThrottledFunction | |
function ThrottledFunction.new(timeoutInSeconds, func, config) | |
assert(type(timeoutInSeconds) == "number", "timeoutInSeconds must be number") | |
assert(type(func) == "function", "func must be function") | |
local self = setmetatable({}, ThrottledFunction) | |
self._nextCallTimeStamp = 0 | |
self._timeout = timeoutInSeconds | |
self._func = func | |
self._trailingValue = nil | |
self._callLeading = true | |
self._callTrailing = true | |
self:_configureOrError(config) | |
return self | |
end | |
function ThrottledFunction:Call(...) | |
if self._trailingValue then | |
-- Update the next value to be dispatched | |
self._trailingValue = table.pack(...) | |
elseif self._nextCallTimeStamp <= tick() then | |
if self._callLeading or self._callLeadingFirstTime then | |
self._callLeadingFirstTime = false | |
-- Dispatch immediately | |
self._nextCallTimeStamp = tick() + self._timeout | |
self._func(...) | |
elseif self._callTrailing then | |
-- Schedule for trailing at exactly timeout | |
self._trailingValue = table.pack(...) | |
task.delay(self._timeout, function() | |
if self.Destroy then | |
self:_dispatch() | |
end | |
end) | |
else | |
error("trailing and leading are both disabled", 2) | |
end | |
elseif self._callLeading or self._callTrailing or self._callLeadingFirstTime then | |
self._callLeadingFirstTime = false | |
-- As long as either leading or trailing are set to true, we are good | |
local remainingTime = self._nextCallTimeStamp - tick() | |
self._trailingValue = table.pack(...) | |
task.delay(remainingTime, function() | |
if self.Destroy then | |
self:_dispatch() | |
end | |
end) | |
end | |
end | |
function ThrottledFunction:_dispatch() | |
self._nextCallTimeStamp = tick() + self._timeout | |
local trailingValue = self._trailingValue | |
if trailingValue then | |
-- Clear before call so we are in valid state! | |
self._trailingValue = nil | |
self._func(unpack(trailingValue, 1, trailingValue.n)) | |
end | |
end | |
function ThrottledFunction:_configureOrError(throttleConfig) | |
if throttleConfig == nil then | |
return | |
end | |
assert(type(throttleConfig) == "table", "throttleConfig must be table") | |
for key, value in pairs(throttleConfig) do | |
assert(type(value) == "boolean", "throttleConfig entry must be boolean") | |
if key == "leading" then | |
self._callLeading = value | |
elseif key == "trailing" then | |
self._callTrailing = value | |
elseif key == "leadingFirstTimeOnly" then | |
self._callLeadingFirstTime = value | |
else | |
error(string.format("bad key %q in throttleConfig", tostring(key))) | |
end | |
end | |
assert(self._callLeading or self._callTrailing, "cannot configure both leading and trailing disabled") | |
end | |
function ThrottledFunction:Destroy() | |
self._trailingValue = nil | |
self._func = nil | |
setmetatable(self, nil) | |
end | |
--[=[ | |
Throttles emission of observables. | |
https://rxjs-dev.firebaseapp.com/api/operators/throttleTime | |
:::note | |
Note that on complete, the last item is not included, for now, unlike the existing version in rxjs. | |
::: | |
@param duration number | |
@param throttleConfig { leading = true, trailing = true } | |
@return (source: Observable) -> Observable | |
]=] | |
function export.throttleTime(duration: number, throttleConfig: {leading: boolean?, trailing: boolean?}): Transformer | |
assert(type(duration) == "number", "duration must be number") | |
assert(type(throttleConfig) == "table" or throttleConfig == nil, "throttleConfig must be table or nil") | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local throttledFunction = ThrottledFunction.new(duration, function(...) | |
sub:Fire(...) | |
end, throttleConfig) | |
return { | |
throttledFunction, | |
source:Subscribe(function(...) | |
throttledFunction:Call(...) | |
end, sub:GetFailComplete()), | |
} | |
end) | |
end | |
end | |
--[=[ | |
Throttles emission of observables on the defer stack to the last emission. | |
@return (source: Observable) -> Observable | |
]=] | |
function export.throttleDefer(): Transformer | |
return function(source) | |
assert(isObservable(source), "source must be Observable") | |
return newObservable(function(sub) | |
local maid = Maid.new() | |
local lastResult | |
maid._ = source:Subscribe(function(...) | |
if not lastResult then | |
lastResult = table.pack(...) | |
-- Queue up our result | |
maid._currentQueue = task.defer(function() | |
local current = lastResult | |
lastResult = nil | |
if sub:IsPending() then | |
sub:Fire(table.unpack(current, 1, current.n)) | |
end | |
end) | |
else | |
lastResult = table.pack(...) | |
end | |
end, sub:GetFailComplete()) | |
return maid | |
end) | |
end | |
end | |
return table.freeze(export) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
local Maid = require(script.Parent.Maid) | |
local Rx = require(script.Parent.Rx) | |
-- Rxi provides Rx primitives for producing live queries of instances. | |
-- | |
-- In general, observables will emit nothing (that is, an empty tuple) when a | |
-- value is no longer observed. | |
local export = {} | |
local UNSET = newproxy() | |
-- Returns an observer that fires with no values, then completes. | |
local function nilobs() | |
return Rx.observable(function(sub) | |
sub:Fire() | |
sub:Complete() | |
end) | |
end | |
-- Returns an observable that emits the given instance, or nothing if it is not | |
-- an instance. | |
function export.of(instance: Instance): Rx.Observable | |
if typeof(instance) ~= "Instance" then | |
return nilobs() | |
end | |
return Rx.observable(function(sub) | |
sub:Fire(instance) | |
sub:Complete() | |
end) | |
end | |
-- Stops the chain unless the value is not nil. | |
function export.notNil(): Rx.Transformer | |
return Rx.where(function(value) | |
return value ~= nil | |
end) | |
end | |
-- Emits the value if it is of type *t*, or nothing otherwise. | |
function export.isTypeOf(t: string) | |
return Rx.whereElse(function(value) | |
return typeof(value) == t | |
end) | |
end | |
-- Emits the value if it is an instance of class *c*, or nothing otherwise. | |
function export.isClass(c: string) | |
return Rx.whereElse(function(value) | |
return typeof(value) == "Instance" and value.ClassName == c | |
end) | |
end | |
-- Emits the value if it is an instance inheriting class *c*, or nothing | |
-- otherwise. | |
function export.isClassOf(c: string) | |
return Rx.whereElse(function(value) | |
return typeof(value) == "Instance" and value:IsA(c) | |
end) | |
end | |
-- Emits the value if it is an item of enum *e*, or nothing otherwise. | |
function export.isEnum(e: Enum) | |
return Rx.whereElse(function(value) | |
return typeof(value) == "EnumItem" and value.EnumType == e | |
end) | |
end | |
-- Emits the value if it is an array where each element is of type *t*, or | |
-- nothing otherwise. | |
function export.isArrayOf(t: string) | |
return Rx.whereElse(function(value) | |
if type(value) ~= "table" then | |
return false | |
end | |
for _, v in value do | |
if typeof(v) ~= t then | |
return false | |
end | |
end | |
return true | |
end) | |
end | |
-- Returns whether *instance* has *property*. | |
local function hasProperty(instance, property) | |
return (pcall(instance.GetPropertyChangedSignal, instance, property)) | |
end | |
-- Observes from the first value the property named *property*. Emits nothing | |
-- while getting the property produces an error, or the first value is not an | |
-- Instance. | |
function export.property(property: string): Rx.Transformer | |
return Rx.pipe{ | |
export.isTypeOf("Instance"), | |
Rx.switchMap(function(instance: Instance) | |
if not instance then return nilobs() end | |
return Rx.observable(function(sub) | |
if not hasProperty(instance, property) then | |
sub:Fire() | |
sub:Complete() | |
return | |
end | |
local conn = instance:GetPropertyChangedSignal(property):Connect(function() | |
sub:Fire((instance::any)[property]) | |
end) | |
sub:Fire((instance::any)[property]) | |
return conn | |
end) | |
end), | |
} | |
end | |
-- Observes from the first value the attribute named *attribute*. Emits nothing | |
-- while the first value is not an Instance. | |
function export.attribute(attribute: string): Rx.Transformer | |
return Rx.pipe{ | |
export.isTypeOf("Instance"), | |
Rx.switchMap(function(instance: Instance) | |
if not instance then return nilobs() end | |
return Rx.observable(function(sub) | |
local conn = instance:GetAttributeChangedSignal(attribute):Connect(function() | |
sub:Fire(instance:GetAttribute(attribute)) | |
end) | |
sub:Fire(instance:GetAttribute(attribute)) | |
return conn | |
end) | |
end), | |
} | |
end | |
-- Observes from the first value the first child where the Name property equals | |
-- *name*. Emits nothing while no child is found, or the first value is not an | |
-- Instance. | |
function export.findFirstChild(name: string) | |
return Rx.pipe{ | |
export.isTypeOf("Instance"), | |
Rx.switchMap(function(instance) | |
return Rx.observable(function(sub) | |
local maid = Maid.new() | |
local current = UNSET | |
local function updateCurrent(child) | |
local next = instance:FindFirstChild(name) | |
if next ~= current then | |
current = next | |
sub:Fire(current) | |
end | |
end | |
maid.connAdded = instance.ChildAdded:Connect(function(child) | |
maid[child] = child:GetPropertyChangedSignal("Name"):Connect(function() | |
updateCurrent() | |
end) | |
updateCurrent() | |
end) | |
maid.connRemoved = instance.ChildRemoved:Connect(function(child) | |
maid[child] = nil | |
updateCurrent() | |
end) | |
for i, child in instance:GetChildren() do | |
maid[child] = child:GetPropertyChangedSignal("Name"):Connect(function() | |
updateCurrent() | |
end) | |
end | |
updateCurrent() | |
return maid | |
end) | |
end), | |
} | |
end | |
-- Observes from the first value the first child where the Name property equals | |
-- *name* and ClassName equals *className*. Emits nothing while no child is | |
-- found, or the first value is not an Instance. | |
function export.findFirstChildWithClass(className: string, name: string) | |
return Rx.pipe{ | |
export.isTypeOf("Instance"), | |
switch = Rx.switchMap(function(instance) | |
return Rx.observable(function(sub) | |
local maid = Maid.new() | |
local current = UNSET | |
local function updateCurrent() | |
local next | |
for _, child in instance:GetChildren() do | |
if child.ClassName == className and child.Name == name then | |
next = child | |
break | |
end | |
end | |
if next ~= current then | |
current = next | |
sub:Fire(current) | |
end | |
end | |
maid.connAdded = instance.ChildAdded:Connect(function(child) | |
if child.ClassName ~= className then return end | |
maid[child] = child:GetPropertyChangedSignal("Name"):Connect(function() | |
updateCurrent() | |
end) | |
updateCurrent() | |
end) | |
maid.connRemoved = instance.ChildRemoved:Connect(function(child) | |
if child.ClassName ~= className then return end | |
maid[child] = nil | |
updateCurrent() | |
end) | |
for i, child in instance:GetChildren() do | |
if child.ClassName ~= className then continue end | |
maid[child] = child:GetPropertyChangedSignal("Name"):Connect(function() | |
updateCurrent() | |
end) | |
end | |
updateCurrent() | |
return maid | |
end) | |
end) | |
} | |
end | |
-- Observes from the first value the first child where the Name property equals | |
-- *name* and ClassName inherits *className*. Emits nothing while no child is | |
-- found, or the first value is not an Instance. | |
function export.findFirstChildWithClassOf(className: string, name: string) | |
return Rx.pipe{ | |
export.isTypeOf("Instance"), | |
switch = Rx.switchMap(function(instance) | |
return Rx.observable(function(sub) | |
local maid = Maid.new() | |
local current = UNSET | |
local function updateCurrent() | |
local next | |
for _, child in instance:GetChildren() do | |
if child:IsA(className) and child.Name == name then | |
next = child | |
break | |
end | |
end | |
if next ~= current then | |
current = next | |
sub:Fire(current) | |
end | |
end | |
maid.connAdded = instance.ChildAdded:Connect(function(child) | |
if not child:IsA(className) then return end | |
maid[child] = child:GetPropertyChangedSignal("Name"):Connect(function() | |
updateCurrent() | |
end) | |
updateCurrent() | |
end) | |
maid.connRemoved = instance.ChildRemoved:Connect(function(child) | |
if not child:IsA(className) then return end | |
maid[child] = nil | |
updateCurrent() | |
end) | |
for i, child in instance:GetChildren() do | |
if not child:IsA(className) then continue end | |
maid[child] = child:GetPropertyChangedSignal("Name"):Connect(function() | |
updateCurrent() | |
end) | |
end | |
updateCurrent() | |
return maid | |
end) | |
end) | |
} | |
end | |
-- Observes from the first value the first child where the ClassName property | |
-- equals *className*. Emits nothing while no child is found, or the first value | |
-- is not an Instance. | |
function export.findFirstChildOfClass(className: string) | |
return Rx.pipe{ | |
export.isTypeOf("Instance"), | |
Rx.switchMap(function(instance) | |
if not instance then return nilobs() end | |
return Rx.observable(function(sub) | |
local maid = Maid.new() | |
local current = UNSET | |
local function updateCurrent() | |
local next = instance:FindFirstChildOfClass(className) | |
if next ~= current then | |
current = next | |
sub:Fire(current) | |
end | |
end | |
maid.connAdded = instance.ChildAdded:Connect(updateCurrent) | |
maid.connRemoved = instance.ChildRemoved:Connect(updateCurrent) | |
updateCurrent() | |
return maid | |
end) | |
end), | |
} | |
end | |
-- Observes from the first value the first child where the ClassName property | |
-- inherits from *className*. Emits nothing while no child is found, or the | |
-- first value is not an Instance. | |
function export.findFirstChildWhichIsA(className: string) | |
return Rx.pipe{ | |
export.isTypeOf("Instance"), | |
Rx.switchMap(function(instance) | |
if not instance then return nilobs() end | |
return Rx.observable(function(sub) | |
local maid = Maid.new() | |
local current = UNSET | |
local function updateCurrent() | |
local next = instance:FindFirstChildWhichIsA(className) | |
if next ~= current then | |
current = next | |
sub:Fire(current) | |
end | |
end | |
maid.connAdded = instance.ChildAdded:Connect(updateCurrent) | |
maid.connRemoved = instance.ChildRemoved:Connect(updateCurrent) | |
updateCurrent() | |
return maid | |
end) | |
end), | |
} | |
end | |
-- Observes the children of the first value. Emits nothing while the first value | |
-- is not an Instance. | |
function export.children() | |
return Rx.pipe{ | |
export.isTypeOf("Instance"), | |
Rx.switchMap(function(instance) | |
if not instance then return nilobs() end | |
return Rx.observable(function(sub) | |
local maid = Maid.new() | |
local current | |
local function updateCurrent() | |
current = instance:GetChildren() | |
sub:Fire(current) | |
end | |
maid.connAdded = instance.ChildAdded:Connect(updateCurrent) | |
maid.connRemoved = instance.ChildRemoved:Connect(updateCurrent) | |
updateCurrent() | |
return maid | |
end) | |
end), | |
} | |
end | |
return table.freeze(export) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Some of the findFirstChild transformers here have bugs. They need
if not instance then return nilobs() end
inside the switchMap.Also thanks for making this!