Skip to content

Instantly share code, notes, and snippets.

@dannyrandall
Created April 10, 2021 23:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dannyrandall/b056390c3e00b5716b560cfe0e2cbcbe to your computer and use it in GitHub Desktop.
Save dannyrandall/b056390c3e00b5716b560cfe0e2cbcbe to your computer and use it in GitHub Desktop.
ruleset gossip {
meta {
name "Gossip"
author "Daniel Randall"
use module io.picolabs.subscription alias subscription
shares inTempViolation, alertingNodes, scheduledHeartbeat, peers, sensorID, processing, peersState, messages, getPeer, newMessage, newRumorMessage, newSeenMessage, getMissingMessages
}
global {
inTempViolation = function() {
ent:inTempViolation
}
alertingNodes = function() {
messages().map(function(msgs, sensorID) {
msgs.filter(function(msg, msgID) {
msg{"type"} == "nodes_in_temp_violation" && msg{["payload", "add"]}
}).values().reduce(function(sum, msg) {
sum + msg{["payload", "add"]}
}, 0)
}).map(function(state) {
state == 1
})
}
scheduledHeartbeat = function() {
schedule:list().filter(function(x) {
x{"type"} == "repeat" && x{["event", "domain"]} == "gossip" && x{["event", "name"]} == "heartbeat"
}).head(){"id"}
}
processing = function() {
ent:process.defaultsTo("on") == "on"
}
peers = function() {
subscription:established("Tx_role", "node")
}
sensorID = function() {
meta:picoId
}
messages = function() {
ent:messages
}
peersState = function() {
ent:peersState
}
msgSeq = function(msgID) {
split = msgID.split(re#:#)
split[split.length()-1].defaultsTo("0").as("Number")
}
getMissingMessages = function(peer) {
eci = peer{"Tx"}
peerState = ent:peersState{eci}.defaultsTo({})
msgs = ent:messages.map(function(msgs, sensorID) {
msgs.filter(function(msg, msgID) {
msgSeq(msgID) > peerState{sensorID}.defaultsTo(-1)
})
}).map(function(msgs, sensorID) {
msgs.values()
}).values().reduce(function(a, b) {
a.defaultsTo([]).append(b)
})
msgs
}
getPeer = function() {
peers = peers()
// find all peers missing something
missing = peers.map(function(peer) {
peer.put("missing", getMissingMessages(peer).length())
}).filter(function(peer) {
peer{"missing"} > 0
})
// return a peer that is missing information, else a random peer
peer = (missing.length() > 0 && random:integer(99) < 60)
=> missing[random:integer(missing.length()-1)] // return a missing peer
| peers[random:integer(peers.length()-1)] // return a random peer
peer
}
newMessage = function(peer) {
type = random:integer(99)
msg = (type < 80 && getMissingMessages(peer).length() > 0) => newRumorMessage(peer) | newSeenMessage()
msg
}
newRumorMessage = function(peer) {
missing = getMissingMessages(peer)
// pick a message at random (there is at least one, see newMessage())
msg = missing[random:integer(missing.length()-1)]
msg
}
newSeenMessage = function() {
// figure out the highest consecutive seq seen for each sensor
seqs = messages().map(function(msgs, sensorID) {
msgs.keys().map(function(msgID) {
msgSeq(msgID)
}).sort().reduce(function(a, b) {
(a+1 == b) => b | a
})
})
seqs
}
}
rule init {
select when wrangler ruleset_installed where event:attrs{"rids"} >< meta:rid
always {
raise gossip event "reset"
raise gossip event "heartbeat"
}
}
rule reset {
select when gossip reset
pre {
scheduledID = scheduledHeartbeat()
}
if scheduledID then
schedule:remove(scheduledID)
always {
ent:period := 5 // seconds
ent:messages := {} // sensorID -> messageID -> message
ent:peersState := {} // eci -> sensorID -> highest seq
ent:inTempViolation := false
ent:sequence := 0
schedule gossip event "heartbeat" repeat << */#{ent:period} * * * * * >> attributes {}
}
}
rule set_heartbeat_period {
select when gossip set_heartbeat_period
always {
ent:period := event:attrs{"period"}.defaultsTo(ent:period)
}
}
rule on_heartbeat {
select when gossip heartbeat where processing()
pre {
peer = getPeer().klog("selected peer")
msg = newMessage(peer).klog("sending message")
type = (msg >< "messageID") => "rumor" | "seen"
}
if peer then
event:send({
"eci": peer{"Tx"},
"domain": "gossip",
"type": type,
"attrs": {
"peer": peer,
"msg": msg
}
})
}
rule handle_rumor {
select when gossip rumor where processing()
pre {
msgID = event:attrs{["msg", "messageID"]}
sensorID = event:attrs{["msg", "sensorID"]}
time = event:attrs{["msg", "timestamp"]}
type = event:attrs{["msg", "type"]}
payload = event:attrs{["msg", "payload"]}
msg = {
"messageID": msgID,
"sensorID": sensorID,
"timestamp": time,
"type": type,
"payload": payload
}
}
if msgID && sensorID then
noop()
fired {
ent:messages := ent:messages.defaultsTo({}).put([sensorID, msgID], msg)
}
}
rule handle_seen {
select when gossip seen where processing()
pre {
eci = event:attrs{"peer"}{"Rx"}
msg = event:attrs{"msg"}
}
if eci && msg then
noop()
fired {
ent:peersState := ent:peersState.defaultsTo({}).put(eci, msg)
}
}
rule enable_disable_processing {
select when gossip process
pre {
state = event:attrs{"state"}
}
if state == "on" || state == "off" then
noop()
fired {
ent:process := state
}
}
rule new_temp {
select when wovyn new_temperature_reading
pre {
time = time:now()
temp = event:attrs{"temperature"}
sensorID = sensorID()
msgID = <<#{sensorID}:#{ent:sequence}>>
msg = {
"messageID": msgID,
"sensorID": sensorID,
"timestamp": time,
"type": "new_temperature_reading",
"payload": {
"temperature": temp,
},
}
}
if msgID && sensorID && time then
noop()
fired {
ent:messages := ent:messages.defaultsTo({}).put([sensorID, msgID], msg)
ent:sequence := ent:sequence + 1;
}
}
//*******************************************************
// lab 10 temp violation stuff
//*******************************************************
rule toggle_temp_violation {
select when wovyn toggle_temp_violation
pre {
time = time:now()
sensorID = sensorID()
msgID = <<#{sensorID}:#{ent:sequence}>>
add = inTempViolation() => -1 | 1 // opposite because we are toggling the state
msg = {
"messageID": msgID,
"sensorID": sensorID,
"timestamp": time,
"type": "nodes_in_temp_violation",
"payload": {
"add": add,
},
}
}
if msgID && sensorID && time && add then
noop()
fired {
ent:inTempViolation := not inTempViolation()
ent:messages := ent:messages.defaultsTo({}).put([sensorID, msgID], msg)
ent:sequence := ent:sequence + 1;
}
}
rule pulse_temp_violation {
select when wovyn pulse_temp_violation
pre {
time = time:now()
sensorID = sensorID()
msgID = <<#{sensorID}:#{ent:sequence}>>
msg = {
"messageID": msgID,
"sensorID": sensorID,
"timestamp": time,
"type": "nodes_in_temp_violation",
"payload": {
"add": 0,
},
}
}
if msgID && sensorID && time then
noop()
fired {
ent:messages := ent:messages.defaultsTo({}).put([sensorID, msgID], msg)
ent:sequence := ent:sequence + 1;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment