Skip to content

Instantly share code, notes, and snippets.

@brandonsturgeon
Last active May 27, 2023 04:06
Show Gist options
  • Save brandonsturgeon/2e73b6e4595dd4476d87494ba4cb73b0 to your computer and use it in GitHub Desktop.
Save brandonsturgeon/2e73b6e4595dd4476d87494ba4cb73b0 to your computer and use it in GitHub Desktop.
-- Quick & dirty, sorry!
if CLIENT then
function ExpressCompareDone()
net.Start( "express_compare_done" )
net.SendToServer()
end
return
end
util.AddNetworkString( "express_compare_done" )
local tests = {
{ name = "Manual Chunks", func = SendWithManualChunks },
{ name = "NetStream", func = SendWithNetStream },
{ name = "Express", func = SendWithExpress }
}
local netCalls = 0
Results = {}
local startTime = SysTime()
local countNets = {
netstreamrequest = true,
netstreamdownload = true,
myaddon_data = true,
myaddon_datachunks = true,
express = true
}
net._Start = net._Start or net.Start
net.Start = function( msg, ... )
if countNets[string.lower( msg )] then
netCalls = netCalls + 1
end
return net._Start( msg, ... )
end
local idx = 0
local function runNext()
netCalls = 0
idx = idx + 1
if idx > #tests then
print( "Job done!" )
print( util.TableToJSON( Results ) )
net.Start = net._Start
return
end
startTime = SysTime()
tests[idx].func()
end
function RunExpressTests( elements )
DATA = gb.RandomTable( elements )
print( "Starting tests!" )
print( "Table Elements: ", string.Comma( elements ) )
local json = util.TableToJSON( DATA )
local jsonsize = string.NiceSize( #json )
print( "Data Size: ", jsonsize )
local compSize = string.NiceSize( #util.Compress( json ) )
print( "Compressed Size: ", compSize )
Results.DataSize = {
json = jsonsize,
compressed = compSize,
tableElements = string.Comma( elements )
}
idx = 0
runNext()
end
net.Receive( "express_compare_done", function()
local duration = SysTime() - startTime
local name = tests[idx].name
Results[name] = {
duration = duration,
messagesSent = string.Comma( netCalls )
}
runNext()
end )
--A net extension which allows sending large streams of data without overflowing the reliable channel
--Keep it in lua/autorun so it will be shared between addons
AddCSLuaFile()
net.Stream = {}
net.Stream.ReadStreamQueues = {} --This holds a read stream for each player, or one read stream for the server if running on the CLIENT
net.Stream.WriteStreams = {} --This holds the write streams
net.Stream.SendSize = 20000 --This is the maximum size of each stream to send
net.Stream.Timeout = 30 --How long the data should exist in the store without being used before being destroyed
net.Stream.MaxServerReadStreams = 128 --The maximum number of keep-alives to have queued. This should prevent naughty players from flooding the network with keep-alive messages.
net.Stream.MaxServerChunks = 3200 --Maximum number of pieces the stream can send to the server. 64 MB
net.Stream.MaxTries = 3 --Maximum times the client may retry downloading the whole data
net.Stream.MaxKeepalive = 15 --Maximum times the client may request data stay live
net.Stream.ReadStream = {}
--Send the data sender a request for data
function net.Stream.ReadStream:Request()
if self.downloads == net.Stream.MaxTries * self.numchunks then self:Remove() return end
self.downloads = self.downloads + 1
-- print("Requesting",self.identifier,false,false,#self.chunks)
net.Start("NetStreamRequest")
net.WriteUInt(self.identifier, 32)
net.WriteBit(false)
net.WriteBit(false)
net.WriteUInt(#self.chunks, 32)
if CLIENT then net.SendToServer() else net.Send(self.player) end
timer.Create("NetStreamReadTimeout" .. self.identifier, net.Stream.Timeout/2, 1, function() self:Request() end)
end
--Received data so process it
function net.Stream.ReadStream:Read(size)
timer.Remove("NetStreamReadTimeout" .. self.identifier)
local progress = net.ReadUInt(32)
if self.chunks[progress] then return end
local crc = net.ReadString()
local data = net.ReadData(size)
if crc == util.CRC(data) then
self.chunks[progress] = data
else
pac.Message("net.Stream.ReadStream:Read(): hash received and hash of chunk do not match match")
end
if #self.chunks == self.numchunks then
self.returndata = table.concat(self.chunks)
if self.compressed then
self.returndata = util.Decompress(self.returndata)
if not self.returndata then
pac.Message("net.Stream.ReadStream:Read(): Failed to decompress data")
end
end
self:Remove()
else
self:Request()
end
end
--Gets the download progress
function net.Stream.ReadStream:GetProgress()
return #self.chunks/self.numchunks
end
--Pop the queue and start the next task
function net.Stream.ReadStream:Remove()
local ok, err = xpcall(self.callback, debug.traceback, self.returndata)
if not ok then ErrorNoHalt(err) end
net.Start("NetStreamRequest")
net.WriteUInt(self.identifier, 32)
net.WriteBit(false)
net.WriteBit(true)
if CLIENT then net.SendToServer() else net.Send(self.player) end
timer.Remove("NetStreamReadTimeout" .. self.identifier)
timer.Remove("NetStreamKeepAlive" .. self.identifier)
if self == self.queue[1] then
table.remove(self.queue, 1)
local nextInQueue = self.queue[1]
if nextInQueue then
timer.Remove("NetStreamKeepAlive" .. nextInQueue.identifier)
nextInQueue:Request()
else
net.Stream.ReadStreamQueues[self.player] = nil
end
else
for k, v in ipairs(self.queue) do
if v == self then
table.remove(self.queue, k)
break
end
end
end
end
net.Stream.ReadStream.__index = net.Stream.ReadStream
net.Stream.WriteStream = {}
-- The player wants some data
function net.Stream.WriteStream:Write(ply)
local progress = net.ReadUInt(32)+1
local chunk = self.chunks[progress]
if chunk then
self.clients[ply].progress = progress
net.Start("NetStreamDownload")
net.WriteUInt(#chunk.data, 32)
net.WriteUInt(progress, 32)
net.WriteString(chunk.crc)
net.WriteData(chunk.data, #chunk.data)
if CLIENT then net.SendToServer() else net.Send(ply) end
end
end
-- The player notified us they finished downloading or cancelled
function net.Stream.WriteStream:Finished(ply)
self.clients[ply].finished = true
if self.callback then
local ok, err = xpcall(self.callback, debug.traceback, ply)
if not ok then ErrorNoHalt(err) end
end
end
-- Get player's download progress
function net.Stream.WriteStream:GetProgress(ply)
return self.clients[ply].progress / #self.chunks
end
-- If the stream owner cancels it, notify everyone who is subscribed
function net.Stream.WriteStream:Remove()
local sendTo = {}
for ply, client in pairs(self.clients) do
if not client.finished then
client.finished = true
if ply:IsValid() then sendTo[#sendTo+1] = ply end
end
end
net.Start("NetStreamDownload")
net.WriteUInt(0, 32)
net.WriteUInt(self.identifier, 32)
if SERVER then net.Send(sendTo) else net.SendToServer() end
net.Stream.WriteStreams[self.identifier] = nil
end
net.Stream.WriteStream.__index = net.Stream.WriteStream
--Store the data and write the file info so receivers can request it.
local identifier = 1
function net.WriteStream(data, callback, dontcompress)
if not isstring(data) then
error("bad argument #1 to 'WriteStream' (string expected, got " .. type(data) .. ")", 2)
end
if callback ~= nil and not isfunction(callback) then
error("bad argument #2 to 'WriteStream' (function expected, got " .. type(callback) .. ")", 2)
end
local compressed = not dontcompress
if compressed then
data = util.Compress(data) or ""
end
if #data == 0 then
net.WriteUInt(0, 32)
return
end
local numchunks = math.ceil(#data / net.Stream.SendSize)
if CLIENT and numchunks > net.Stream.MaxServerChunks then
ErrorNoHalt("net.WriteStream request is too large! ", #data/1048576, "MiB")
net.WriteUInt(0, 32)
return
end
local chunks = {}
for i=1, numchunks do
local datachunk = string.sub(data, (i - 1) * net.Stream.SendSize + 1, i * net.Stream.SendSize)
chunks[i] = {
data = datachunk,
crc = util.CRC(datachunk),
}
end
local startid = identifier
while net.Stream.WriteStreams[identifier] do
identifier = identifier % 1024 + 1
if identifier == startid then
ErrorNoHalt("Netstream is full of WriteStreams!\n" .. debug.traceback() .. "\n")
net.WriteUInt(0, 32)
return
end
end
local stream = {
identifier = identifier,
chunks = chunks,
compressed = compressed,
numchunks = numchunks,
callback = callback,
clients = setmetatable({},{__index = function(t,k)
local r = {
finished = false,
downloads = 0,
keepalives = 0,
progress = 0,
} t[k]=r return r
end})
}
setmetatable(stream, net.Stream.WriteStream)
net.Stream.WriteStreams[identifier] = stream
timer.Create("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1, function() stream:Remove() end)
net.WriteUInt(numchunks, 32)
net.WriteUInt(identifier, 32)
net.WriteBool(compressed)
return stream
end
--If the receiver is a player then add it to a queue.
--If the receiver is the server then add it to a queue for each individual player
function net.ReadStream(ply, callback)
if CLIENT then
ply = NULL
else
if type(ply) ~= "Player" then
error("bad argument #1 to 'ReadStream' (Player expected, got " .. type(ply) .. ")", 2)
elseif not ply:IsValid() then
error("bad argument #1 to 'ReadStream' (Tried to use a NULL entity!)", 2)
end
end
if not isfunction(callback) then
error("bad argument #2 to 'ReadStream' (function expected, got " .. type(callback) .. ")", 2)
end
local queue = net.Stream.ReadStreamQueues[ply]
if queue then
if SERVER and #queue == net.Stream.MaxServerReadStreams then
ErrorNoHalt("Receiving too many ReadStream requests from ", ply)
return
end
else
queue = {} net.Stream.ReadStreamQueues[ply] = queue
end
local numchunks = net.ReadUInt(32)
if numchunks == nil then
return
elseif numchunks == 0 then
local ok, err = xpcall(callback, debug.traceback, "")
if not ok then ErrorNoHalt(err) end
return
end
if SERVER and numchunks > net.Stream.MaxServerChunks then
ErrorNoHalt("ReadStream requests from ", ply, " is too large! ", numchunks * net.Stream.SendSize / 1048576, "MiB")
return
end
local identifier = net.ReadUInt(32)
local compressed = net.ReadBool()
--print("Got info", numchunks, identifier, compressed)
for _, v in ipairs(queue) do
if v.identifier == identifier then
ErrorNoHalt("Tried to start a new ReadStream for an already existing stream!\n" .. debug.traceback() .. "\n")
return
end
end
local stream = {
identifier = identifier,
chunks = {},
compressed = compressed,
numchunks = numchunks,
callback = callback,
queue = queue,
player = ply,
downloads = 0
}
setmetatable(stream, net.Stream.ReadStream)
queue[#queue + 1] = stream
if #queue > 1 then
timer.Create("NetStreamKeepAlive" .. identifier, net.Stream.Timeout / 2, 0, function()
net.Start("NetStreamRequest")
net.WriteUInt(identifier, 32)
net.WriteBit(true)
if CLIENT then net.SendToServer() else net.Send(ply) end
end)
else
stream:Request()
end
return stream
end
if SERVER then
util.AddNetworkString("NetStreamRequest")
util.AddNetworkString("NetStreamDownload")
end
--Stream data is requested
net.Receive("NetStreamRequest", function(len, ply)
local identifier = net.ReadUInt(32)
local stream = net.Stream.WriteStreams[identifier]
if stream then
ply = ply or NULL
local client = stream.clients[ply]
if not client.finished then
local keepalive = net.ReadBit() == 1
if keepalive then
if client.keepalives < net.Stream.MaxKeepalive then
client.keepalives = client.keepalives + 1
timer.Adjust("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1)
end
else
local completed = net.ReadBit() == 1
if completed then
stream:Finished(ply)
else
if client.downloads < net.Stream.MaxTries * #stream.chunks then
client.downloads = client.downloads + 1
stream:Write(ply)
timer.Adjust("NetStreamWriteTimeout" .. identifier, net.Stream.Timeout, 1)
else
client.finished = true
end
end
end
end
end
end)
--Download the stream data
net.Receive("NetStreamDownload", function(len, ply)
ply = ply or NULL
local queue = net.Stream.ReadStreamQueues[ply]
if queue then
local size = net.ReadUInt(32)
if size > 0 then
queue[1]:Read(size)
else
local id = net.ReadUInt(32)
for k, v in ipairs(queue) do
if v.identifier == id then
v:Remove()
break
end
end
end
end
end)
if SERVER then
util.AddNetworkString( "myaddon_datachunks" )
local function sendChunk( ply )
local pending = ply.pending
if not pending then return end
local chunkSize, isLast = math.min( 63000, #pending ), false
ply.pending = string.sub( pending, chunkSize + 1 )
if #pending <= chunkSize then
ply.pending, isLast = nil, true
end
net.Start( "myaddon_datachunks" )
net.WriteUInt( chunkSize, 16 )
net.WriteData( string.sub( pending, 1, chunkSize ), chunkSize )
net.WriteBool( isLast )
net.Send( ply )
end
local interval = engine.TickInterval() * 8
timer.Create( "MyAddon_DataSender", interval, 0, function()
sendChunk( Entity( 1 ) )
end )
function SendWithManualChunks()
print( "[SERVER] Starting manual chunk send", SysTime() )
Entity( 1 ).pending = util.Compress( util.TableToJSON( DATA ) )
end
end
if CLIENT then
local buffer = ""
net.Receive( "myaddon_datachunks", function()
buffer = buffer .. net.ReadData( net.ReadUInt( 16 ) )
if net.ReadBool() then
local decomp = util.Decompress( buffer )
util.JSONToTable( decomp )
ExpressCompareDone()
end
end )
end
if SERVER then
function SendWithExpress()
print( "[SERVER] Starting express send", SysTime() )
express.Send( "myaddon_data", DATA, Entity( 1 ) )
end
end
if CLIENT then
express.Receive( "myaddon_data", function( data )
ExpressCompareDone()
end )
end
if SERVER then
util.AddNetworkString( "myaddon_data" )
function SendWithNetStream()
print( "[SERVER] Starting netstream send", SysTime() )
net.Start( "myaddon_data" )
net.WriteStream( util.TableToJSON( DATA ), nil, true )
net.Send( Player( 1 ) )
end
end
if CLIENT then
net.Receive( "myaddon_data", function()
print( "received myaddon_data" )
net.ReadStream( nil, function( data )
util.TableToJSON( data )
ExpressCompareDone()
end )
end )
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment