Skip to content

Instantly share code, notes, and snippets.

@boatbomber
Last active March 28, 2024 00:20
Show Gist options
  • Save boatbomber/7c0cd41461bd682e7f4fa0ebca36ae21 to your computer and use it in GitHub Desktop.
Save boatbomber/7c0cd41461bd682e7f4fa0ebca36ae21 to your computer and use it in GitHub Desktop.
This is a module for handling data that can be read from/written to from multiple servers at a time. It is made only for commutative updates. This is so that your operations can be applied locally and globally at different times and still end up at the same value eventually. Uses MemoryStore for atomic locking.
--[[
GlobalStorage
by boatbomber (c) 2021
This is a module for handling data that can be read from/written to
from multiple servers at a time. It is made only for commutative updates.
This is so that your operations can be applied locally and globally at different
times and still end up at the same value eventually. Uses MemoryStore for atomic locking.
Make sure your transform function is deterministic and has no side effects,
as it will be run twice- once locally and once globally. If it absolutely must,
then it can take in a second arguement, a boolean "isGlobal" that is true when being
run globally and false during the local run.
Examples:
local PlayerStore = GlobalStorage.new("User_1234", 1234)
local DEFAULT_COINS = 100
-- Can be used to safely +/- number stores
local coins = PlayerStore:Get("Coins", DEFAULT_COINS)
PlayerStore:Update("Coins", function(oldValue)
return (oldValue or DEFAULT_COINS) + 5
end)
coins = PlayerStore:Get("Coins", DEFAULT_COINS)
-- Can be used to safely add/remove unique dict keys
local notifications = PlayerStore:Get("Notifications", {})
PlayerStore:Update("Notifications", function(oldValue)
return (oldValue or {})[GUID] = newNotif
end)
notifications = PlayerStore:Get("Notifications", {})
--]]
local DEBUG = false
local function dprint(...)
if DEBUG then
print(...)
end
end
local DataStoreService = game:GetService("DataStoreService")
local MemoryStoreService = game:GetService("MemoryStoreService")
local MessagingService = game:GetService("MessagingService")
local GlobalStorage = {
_cache = {},
}
function GlobalStorage.new(name: string, associatedUserId: number?)
local debugInfo = string.format("GlobalStorage '%s'", name)
-- Get existing store object if possible
if GlobalStorage._cache[name] then
dprint(debugInfo, "got existing store")
local cachedStore = GlobalStorage._cache[name]
cachedStore._references += 1
return cachedStore
end
-- Create new store object
dprint(debugInfo, "created new store")
local Store = {
_dsStore = DataStoreService:GetDataStore(name),
_msMap = MemoryStoreService:GetSortedMap(name),
_associatedUserId = associatedUserId,
_references = 1,
_cache = {},
_msgId = "BS_" .. name,
_updateQueue = {},
_events = {},
_destroyed = false,
_flushesInProgress = {},
}
function Store:_flushUpdateQueue()
if next(self._updateQueue) == nil then
dprint(debugInfo, "update queue empty, cancelling flush")
return
end
dprint(debugInfo, "flushing update queue")
for key, transformers in pairs(self._updateQueue) do
if self._flushesInProgress[key] then
dprint(debugInfo, "flush already in progress for key", key)
continue
end
if #transformers < 1 then
dprint(debugInfo, "no transformers for key", key)
self._updateQueue[key] = nil
continue
end
self._flushesInProgress[key] = true
-- DataStore UpdateAsync can conflict with other servers if called at the exact same time
-- and race so whichever finishes last will overwrite the previous.
-- MemoryStore UpdateAsync solves this by retrying if two are called at once, so we use
-- that as a locking mechanism to avoid two DataStore updates overwriting. If two try to grab
-- while unlocked, MemoryStore will force one of them to retry later.
local unlocked, lockWaitTime = false, 0
while unlocked == false do
local success, message = pcall(function()
dprint(debugInfo, "attempting to retrieve lock for", key)
self._msMap:UpdateAsync(key, function(lockOwner)
if lockOwner ~= nil then
dprint("Lock already taken by " .. lockOwner)
return nil -- Someone else has this key rn, we must wait
end
unlocked = true
-- Since other servers trying to take it will be returning
-- different JobId, memorystore will know its a conflict
-- and force the others to retry
return game.JobId
end, 15)
end)
if not success then
warn(message)
end
if unlocked == false then
lockWaitTime += task.wait()
if lockWaitTime > 60 then
warn(
"Update flush for "
.. key
.. " expired after 60 seconds while waiting for lock to be available."
)
self._flushesInProgress[key] = nil
return
end
dprint(debugInfo, "waiting for lock for", key, "for", lockWaitTime, "seconds so far")
end
end
dprint(debugInfo, "received lock for", key)
-- Update the global value
dprint(debugInfo, "updating global value for", key)
self._dsStore:UpdateAsync(key, function(storedValue)
local value = storedValue
for i, transformer in ipairs(transformers) do
local success, newValue = pcall(transformer, value, true)
if not success then
warn(newValue)
dprint(debugInfo, "cancelled transformer", i, "on", key)
continue -- skip this one, transform errored
end
if newValue == nil then
dprint(debugInfo, "skipped transformer", i, "on", key)
continue -- skip this one, transform exited
end
dprint(debugInfo, "applied transformer", i, "on", key)
value = newValue
end
table.clear(transformers)
self._cache[key] = value
-- Inform other servers they need to refresh
task.defer(function()
local publishSuccess, publishResult = pcall(function()
dprint(debugInfo, "informing other servers of changes to", key)
MessagingService:PublishAsync(self._msgId, {
JobId = game.JobId,
Key = key,
})
end)
if not publishSuccess then
warn(publishResult)
end
end)
return value
end)
-- Unlock this key for the next server to take
dprint(debugInfo, "unlocking", key)
self._flushesInProgress[key] = nil
pcall(self._msMap.RemoveAsync, self._msMap, key)
end
end
function Store:GetKeyChangedSignal(key: string)
local event = self._events[key]
if not event then
event = Instance.new("BindableEvent")
self._events[key] = event
end
return event.Event
end
function Store:Get(key: string, default: any?, skipCache: boolean?)
if not skipCache and self._cache[key] ~= nil then
dprint(debugInfo, "getting local value of", key)
return self._cache[key] or default
end
dprint(debugInfo, "getting global value of", key)
local value = self._dsStore:GetAsync(key)
if value == nil then
value = default
end
self._cache[key] = value
return value
end
function Store:Update(key: string, transformer: (any?, boolean?) -> any?)
-- Queue it up for updating on the latest real value & replication
dprint(debugInfo, "queuing global transformer for", key)
if self._updateQueue[key] == nil then
self._updateQueue[key] = { transformer }
else
table.insert(self._updateQueue[key], transformer)
end
-- First, perform it locally
dprint(debugInfo, "applying local transformer for", key)
local success, newValue = pcall(transformer, self._cache[key], false)
if not success then
warn(newValue)
return -- cancel, transform errored
end
if newValue == nil then
return -- cancel, transform exited
end
self._cache[key] = newValue
local event = self._events[key]
if event then
event:Fire(newValue)
end
end
function Store:Destroy()
if self._destroyed then
dprint(debugInfo, "is already destroyed")
return
end
self._references -= 1
if self._references > 0 then
dprint(debugInfo, "removing a reference to the store, there are now", self._references, "references")
return
end
dprint(debugInfo, "destroying store!")
self._destroyed = true
if self._msgConnection ~= nil then
self._msgConnection:Disconnect()
end
for _, event in pairs(self._events) do
event:Destroy()
end
GlobalStorage._cache[name] = nil
self:_flushUpdateQueue()
while next(self._flushesInProgress) do
dprint(debugInfo, "Waiting for flushes to finish before wiping")
task.wait()
end
table.clear(self)
self._destroyed = true
end
task.spawn(function()
-- Subscribe to store's msg for cross-server updates
local subscribeSuccess, subscribeConnection = pcall(function()
return MessagingService:SubscribeAsync(Store._msgId, function(message)
if game.JobId == message.Data.JobId then
return
end
if (Store._destroyed) or (Store._cache == nil) then
error(debugInfo .. " received update from another server after being destroyed")
return
end
local key = message.Data.Key
dprint(debugInfo, "received update to", key, "from another server")
local newValue = Store:Get(key, Store._cache[key], true)
local event = Store._events[key]
if event then
event:Fire(newValue)
end
end)
end)
if subscribeSuccess then
if Store._destroyed then
dprint(debugInfo, "destroyed during subscribe, disconnecting subscription")
subscribeConnection:Disconnect()
else
Store._msgConnection = subscribeConnection
end
else
warn(subscribeConnection)
end
-- Start update queue flush thread
while not Store._destroyed do
local jitter = math.random(0, 100) / 100 -- Reduce server conflicts?
task.wait(6 + jitter)
-- Check if destroyed during wait
if Store._destroyed then break end
dprint(debugInfo, "periodically flushing update queue")
Store:_flushUpdateQueue()
end
end)
-- Cache the store object for future GetStore sharing
GlobalStorage._cache[name] = Store
return Store
end
game:BindToClose(function()
for _, Store in pairs(GlobalStorage._cache) do
task.spawn(Store._flushUpdateQueue, Store)
end
end)
game.Players.PlayerRemoving:Connect(function(Player)
for name, Store in pairs(GlobalStorage._cache) do
if Store._associatedUserId == Player.UserId then
dprint(string.format("Destroying %s store for %s on leave", name, Player.Name))
Store:Destroy()
end
end
end)
return GlobalStorage
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment