Created
April 10, 2021 23:06
-
-
Save dannyrandall/b056390c3e00b5716b560cfe0e2cbcbe to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ruleset gossip { | |
meta { | |
name "Gossip" | |
author "Daniel Randall" | |
use module io.picolabs.subscription alias subscription | |
shares inTempViolation, alertingNodes, scheduledHeartbeat, peers, sensorID, processing, peersState, messages, getPeer, newMessage, newRumorMessage, newSeenMessage, getMissingMessages | |
} | |
global { | |
inTempViolation = function() { | |
ent:inTempViolation | |
} | |
alertingNodes = function() { | |
messages().map(function(msgs, sensorID) { | |
msgs.filter(function(msg, msgID) { | |
msg{"type"} == "nodes_in_temp_violation" && msg{["payload", "add"]} | |
}).values().reduce(function(sum, msg) { | |
sum + msg{["payload", "add"]} | |
}, 0) | |
}).map(function(state) { | |
state == 1 | |
}) | |
} | |
scheduledHeartbeat = function() { | |
schedule:list().filter(function(x) { | |
x{"type"} == "repeat" && x{["event", "domain"]} == "gossip" && x{["event", "name"]} == "heartbeat" | |
}).head(){"id"} | |
} | |
processing = function() { | |
ent:process.defaultsTo("on") == "on" | |
} | |
peers = function() { | |
subscription:established("Tx_role", "node") | |
} | |
sensorID = function() { | |
meta:picoId | |
} | |
messages = function() { | |
ent:messages | |
} | |
peersState = function() { | |
ent:peersState | |
} | |
msgSeq = function(msgID) { | |
split = msgID.split(re#:#) | |
split[split.length()-1].defaultsTo("0").as("Number") | |
} | |
getMissingMessages = function(peer) { | |
eci = peer{"Tx"} | |
peerState = ent:peersState{eci}.defaultsTo({}) | |
msgs = ent:messages.map(function(msgs, sensorID) { | |
msgs.filter(function(msg, msgID) { | |
msgSeq(msgID) > peerState{sensorID}.defaultsTo(-1) | |
}) | |
}).map(function(msgs, sensorID) { | |
msgs.values() | |
}).values().reduce(function(a, b) { | |
a.defaultsTo([]).append(b) | |
}) | |
msgs | |
} | |
getPeer = function() { | |
peers = peers() | |
// find all peers missing something | |
missing = peers.map(function(peer) { | |
peer.put("missing", getMissingMessages(peer).length()) | |
}).filter(function(peer) { | |
peer{"missing"} > 0 | |
}) | |
// return a peer that is missing information, else a random peer | |
peer = (missing.length() > 0 && random:integer(99) < 60) | |
=> missing[random:integer(missing.length()-1)] // return a missing peer | |
| peers[random:integer(peers.length()-1)] // return a random peer | |
peer | |
} | |
newMessage = function(peer) { | |
type = random:integer(99) | |
msg = (type < 80 && getMissingMessages(peer).length() > 0) => newRumorMessage(peer) | newSeenMessage() | |
msg | |
} | |
newRumorMessage = function(peer) { | |
missing = getMissingMessages(peer) | |
// pick a message at random (there is at least one, see newMessage()) | |
msg = missing[random:integer(missing.length()-1)] | |
msg | |
} | |
newSeenMessage = function() { | |
// figure out the highest consecutive seq seen for each sensor | |
seqs = messages().map(function(msgs, sensorID) { | |
msgs.keys().map(function(msgID) { | |
msgSeq(msgID) | |
}).sort().reduce(function(a, b) { | |
(a+1 == b) => b | a | |
}) | |
}) | |
seqs | |
} | |
} | |
rule init { | |
select when wrangler ruleset_installed where event:attrs{"rids"} >< meta:rid | |
always { | |
raise gossip event "reset" | |
raise gossip event "heartbeat" | |
} | |
} | |
rule reset { | |
select when gossip reset | |
pre { | |
scheduledID = scheduledHeartbeat() | |
} | |
if scheduledID then | |
schedule:remove(scheduledID) | |
always { | |
ent:period := 5 // seconds | |
ent:messages := {} // sensorID -> messageID -> message | |
ent:peersState := {} // eci -> sensorID -> highest seq | |
ent:inTempViolation := false | |
ent:sequence := 0 | |
schedule gossip event "heartbeat" repeat << */#{ent:period} * * * * * >> attributes {} | |
} | |
} | |
rule set_heartbeat_period { | |
select when gossip set_heartbeat_period | |
always { | |
ent:period := event:attrs{"period"}.defaultsTo(ent:period) | |
} | |
} | |
rule on_heartbeat { | |
select when gossip heartbeat where processing() | |
pre { | |
peer = getPeer().klog("selected peer") | |
msg = newMessage(peer).klog("sending message") | |
type = (msg >< "messageID") => "rumor" | "seen" | |
} | |
if peer then | |
event:send({ | |
"eci": peer{"Tx"}, | |
"domain": "gossip", | |
"type": type, | |
"attrs": { | |
"peer": peer, | |
"msg": msg | |
} | |
}) | |
} | |
rule handle_rumor { | |
select when gossip rumor where processing() | |
pre { | |
msgID = event:attrs{["msg", "messageID"]} | |
sensorID = event:attrs{["msg", "sensorID"]} | |
time = event:attrs{["msg", "timestamp"]} | |
type = event:attrs{["msg", "type"]} | |
payload = event:attrs{["msg", "payload"]} | |
msg = { | |
"messageID": msgID, | |
"sensorID": sensorID, | |
"timestamp": time, | |
"type": type, | |
"payload": payload | |
} | |
} | |
if msgID && sensorID then | |
noop() | |
fired { | |
ent:messages := ent:messages.defaultsTo({}).put([sensorID, msgID], msg) | |
} | |
} | |
rule handle_seen { | |
select when gossip seen where processing() | |
pre { | |
eci = event:attrs{"peer"}{"Rx"} | |
msg = event:attrs{"msg"} | |
} | |
if eci && msg then | |
noop() | |
fired { | |
ent:peersState := ent:peersState.defaultsTo({}).put(eci, msg) | |
} | |
} | |
rule enable_disable_processing { | |
select when gossip process | |
pre { | |
state = event:attrs{"state"} | |
} | |
if state == "on" || state == "off" then | |
noop() | |
fired { | |
ent:process := state | |
} | |
} | |
rule new_temp { | |
select when wovyn new_temperature_reading | |
pre { | |
time = time:now() | |
temp = event:attrs{"temperature"} | |
sensorID = sensorID() | |
msgID = <<#{sensorID}:#{ent:sequence}>> | |
msg = { | |
"messageID": msgID, | |
"sensorID": sensorID, | |
"timestamp": time, | |
"type": "new_temperature_reading", | |
"payload": { | |
"temperature": temp, | |
}, | |
} | |
} | |
if msgID && sensorID && time then | |
noop() | |
fired { | |
ent:messages := ent:messages.defaultsTo({}).put([sensorID, msgID], msg) | |
ent:sequence := ent:sequence + 1; | |
} | |
} | |
//******************************************************* | |
// lab 10 temp violation stuff | |
//******************************************************* | |
rule toggle_temp_violation { | |
select when wovyn toggle_temp_violation | |
pre { | |
time = time:now() | |
sensorID = sensorID() | |
msgID = <<#{sensorID}:#{ent:sequence}>> | |
add = inTempViolation() => -1 | 1 // opposite because we are toggling the state | |
msg = { | |
"messageID": msgID, | |
"sensorID": sensorID, | |
"timestamp": time, | |
"type": "nodes_in_temp_violation", | |
"payload": { | |
"add": add, | |
}, | |
} | |
} | |
if msgID && sensorID && time && add then | |
noop() | |
fired { | |
ent:inTempViolation := not inTempViolation() | |
ent:messages := ent:messages.defaultsTo({}).put([sensorID, msgID], msg) | |
ent:sequence := ent:sequence + 1; | |
} | |
} | |
rule pulse_temp_violation { | |
select when wovyn pulse_temp_violation | |
pre { | |
time = time:now() | |
sensorID = sensorID() | |
msgID = <<#{sensorID}:#{ent:sequence}>> | |
msg = { | |
"messageID": msgID, | |
"sensorID": sensorID, | |
"timestamp": time, | |
"type": "nodes_in_temp_violation", | |
"payload": { | |
"add": 0, | |
}, | |
} | |
} | |
if msgID && sensorID && time then | |
noop() | |
fired { | |
ent:messages := ent:messages.defaultsTo({}).put([sensorID, msgID], msg) | |
ent:sequence := ent:sequence + 1; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment