Skip to content

Instantly share code, notes, and snippets.

@hfiennes
Created August 21, 2018 16:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save hfiennes/56b9c359479b7d13245956f3d573f357 to your computer and use it in GitHub Desktop.
Save hfiennes/56b9c359479b7d13245956f3d573f357 to your computer and use it in GitHub Desktop.
Streaming with NO_DISCONNECT
start <- null;
end <- null;
size <- 0;
sofar <- 0;
device.on("net", function(n) { server.log(http.jsonencode(n)); });
device.on("start", function(s) {
server.log("Receiving "+s+" bytes");
size = s;
sofar = 0;
start = date();
});
device.on("data", function(d) {
// Ignore it
sofar += d.len();
if (sofar == size) {
end = date();
// Work out diff
local seconds = end.time - start.time;
seconds += (end.usec - start.usec) / 1000000.0;
server.log("Took "+seconds+" Rate "+(size/1024)/seconds+"kB/s");
}
});
last <- time();
count <- 0;
server.log(imp.getsoftwareversion());
imp.enableblinkup(true);
agent.send("net", imp.net.info())
imp.setsendbuffersize(65535);
// Use nodisconnect
server.setsendtimeoutpolicy(RETURN_ON_ERROR_NO_DISCONNECT, WAIT_TIL_SENT, 1);
class filesender {
_sending = null;
_length = 0;
_offset = 0;
_chunksize = 4096;
_callback = null;
function constructor() {
}
function send(file, cb = null) {
_sending = file;
_length = _sending.len();
_offset = 0;
local r = agent.send("start", _length);
if (r != 0) {
// just try again, buffers may be full
imp.wakeup(0, function() { send(file, cb); });
return;
}
sendblock();
_callback = cb;
}
function sendblock() {
if (_offset == _length) {
_sending = null;
_length = _offset = 0;
if (_callback != null) _callback();
return;
}
local chunk = (_length - _offset);
if (chunk > _chunksize) chunk = _chunksize;
_sending.seek(_offset, 'b');
local r = agent.send("data", _sending.readblob(chunk));
if (r == 0) {
// If we sent ok, increment _offset for next time
_offset += chunk;
} else if (r != SEND_ERROR_WOULDBLOCK) {
// Some other error, log it
server.setsendtimeoutpolicy(SUSPEND_ON_ERROR, WAIT_TIL_SENT, 10);
server.connect();
server.log("send returned "+r);
return;
}
// If we got here, we need to be requeued
imp.wakeup(0, sendblock.bindenv(this));
}
}
// Send 64kB of data 5 times and show throughput info on the agent side
local b = blob(64*1024);
f <- filesender();
loops <- 5;
function sendit() {
f.send(b, function() { if (--loops) sendit(); });
}
sendit();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment