Created
April 6, 2021 04:21
-
-
Save dannyrandall/07f47ec4a9a2e438a0bb8d974c96a209 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ruleset gossip { | |
meta { | |
name "Gossip" | |
author "Daniel Randall" | |
use module io.picolabs.subscription alias subscription | |
shares scheduledHeartbeat, peers, sensorID, processing, peersState, messages, getPeer, newMessage, newRumorMessage, newSeenMessage, getMissingMessages | |
} | |
global { | |
scheduledHeartbeat = function() { | |
schedule:list().filter(function(x) { | |
x{"type"} == "repeat" && x{["event", "domain"]} == "gossip" && x{["event", "name"]} == "heartbeat" | |
}).head(){"id"} | |
} | |
processing = function() { | |
ent:process.defaultsTo("on") == "on" | |
} | |
peers = function() { | |
subscription:established("Tx_role", "node") | |
} | |
sensorID = function() { | |
meta:picoId | |
} | |
messages = function() { | |
ent:messages | |
} | |
peersState = function() { | |
ent:peersState | |
} | |
msgSeq = function(msgID) { | |
split = msgID.split(re#:#) | |
split[split.length()-1].defaultsTo("0").as("Number") | |
} | |
getMissingMessages = function(peer) { | |
eci = peer{"Tx"} | |
peerState = ent:peersState{eci}.defaultsTo({}) | |
msgs = ent:messages.map(function(msgs, sensorID) { | |
msgs.filter(function(msg, msgID) { | |
msgSeq(msgID) > peerState{sensorID}.defaultsTo(-1) | |
}) | |
}).map(function(msgs, sensorID) { | |
msgs.values() | |
}).values().reduce(function(a, b) { | |
a.defaultsTo([]).append(b) | |
}) | |
msgs | |
} | |
getPeer = function() { | |
peers = peers() | |
// find all peers missing something | |
missing = peers.map(function(peer) { | |
peer.put("missing", getMissingMessages(peer).length()) | |
}).filter(function(peer) { | |
peer{"missing"} > 0 | |
}) | |
// return a peer that is missing information, else a random peer | |
peer = (missing.length() > 0 && random:integer(99) < 60) | |
=> missing[random:integer(missing.length()-1)] // return a missing peer | |
| peers[random:integer(peers.length()-1)] // return a random peer | |
peer | |
} | |
newMessage = function(peer) { | |
type = random:integer(99) | |
msg = (type < 80 && getMissingMessages(peer).length() > 0) => newRumorMessage(peer) | newSeenMessage() | |
msg | |
} | |
newRumorMessage = function(peer) { | |
missing = getMissingMessages(peer) | |
// pick a message at random (there is at least one, see newMessage()) | |
msg = missing[random:integer(missing.length()-1)] | |
msg | |
} | |
newSeenMessage = function() { | |
// figure out the highest consecutive seq seen for each sensor | |
seqs = messages().map(function(msgs, sensorID) { | |
msgs.keys().map(function(msgID) { | |
msgSeq(msgID) | |
}).sort().reduce(function(a, b) { | |
(a+1 == b) => b | a | |
}) | |
}) | |
seqs | |
} | |
} | |
rule init { | |
select when wrangler ruleset_installed where event:attrs{"rids"} >< meta:rid | |
always { | |
raise gossip event "reset" | |
raise gossip event "heartbeat" | |
} | |
} | |
rule reset { | |
select when gossip reset | |
pre { | |
scheduledID = scheduledHeartbeat() | |
} | |
if scheduledID then | |
schedule:remove(scheduledID) | |
always { | |
ent:period := 5 // seconds | |
ent:messages := {} // sensorID -> messageID -> message | |
ent:peersState := {} // eci -> sensorID -> highest seq | |
ent:sequence := 0 | |
schedule gossip event "heartbeat" repeat << */#{ent:period} * * * * * >> attributes {} | |
} | |
} | |
rule set_heartbeat_period { | |
select when gossip set_heartbeat_period | |
always { | |
ent:period := event:attrs{"period"}.defaultsTo(ent:period) | |
} | |
} | |
rule on_heartbeat { | |
select when gossip heartbeat where processing() | |
pre { | |
peer = getPeer().klog("selected peer") | |
msg = newMessage(peer).klog("sending message") | |
type = (msg >< "messageID") => "rumor" | "seen" | |
} | |
if peer then | |
event:send({ | |
"eci": peer{"Tx"}, | |
"domain": "gossip", | |
"type": type, | |
"attrs": { | |
"peer": peer, | |
"msg": msg | |
} | |
}) | |
} | |
rule handle_rumor { | |
select when gossip rumor where processing() | |
pre { | |
msgID = event:attrs{["msg", "messageID"]} | |
sensorID = event:attrs{["msg", "sensorID"]} | |
temp = event:attrs{["msg", "temperature"]} | |
time = event:attrs{["msg", "timestamp"]} | |
msg = { | |
"messageID": msgID, | |
"sensorID": sensorID, | |
"temperature": temp, | |
"timestamp": time | |
} | |
} | |
if msgID && sensorID then | |
noop() | |
fired { | |
ent:messages := ent:messages.defaultsTo({}).put([sensorID, msgID], msg) | |
} | |
} | |
rule handle_seen { | |
select when gossip seen where processing() | |
pre { | |
eci = event:attrs{"peer"}{"Rx"} | |
msg = event:attrs{"msg"} | |
} | |
if eci && msg then | |
noop() | |
fired { | |
ent:peersState := ent:peersState.defaultsTo({}).put(eci, msg) | |
} | |
} | |
rule enable_disable_processing { | |
select when gossip process | |
pre { | |
state = event:attrs{"state"} | |
} | |
if state == "on" || state == "off" then | |
noop() | |
fired { | |
ent:process := state | |
} | |
} | |
rule new_temp { | |
select when wovyn new_temperature_reading | |
pre { | |
time = event:attrs{"timestamp"} | |
temp = event:attrs{"temperature"} | |
sensorID = sensorID() | |
msgID = <<#{sensorID}:#{ent:sequence}>> | |
msg = { | |
"messageID": msgID, | |
"sensorID": sensorID, | |
"temperature": temp, | |
"timestamp": time | |
} | |
} | |
if msgID && sensorID then | |
noop() | |
fired { | |
ent:messages := ent:messages.defaultsTo({}).put([sensorID, msgID], msg) | |
ent:sequence := ent:sequence + 1; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment