Skip to content

Instantly share code, notes, and snippets.

@dannyrandall
Created April 6, 2021 04:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dannyrandall/07f47ec4a9a2e438a0bb8d974c96a209 to your computer and use it in GitHub Desktop.
Save dannyrandall/07f47ec4a9a2e438a0bb8d974c96a209 to your computer and use it in GitHub Desktop.
ruleset gossip {
meta {
name "Gossip"
author "Daniel Randall"
use module io.picolabs.subscription alias subscription
shares scheduledHeartbeat, peers, sensorID, processing, peersState, messages, getPeer, newMessage, newRumorMessage, newSeenMessage, getMissingMessages
}
global {
scheduledHeartbeat = function() {
schedule:list().filter(function(x) {
x{"type"} == "repeat" && x{["event", "domain"]} == "gossip" && x{["event", "name"]} == "heartbeat"
}).head(){"id"}
}
processing = function() {
ent:process.defaultsTo("on") == "on"
}
peers = function() {
subscription:established("Tx_role", "node")
}
sensorID = function() {
meta:picoId
}
messages = function() {
ent:messages
}
peersState = function() {
ent:peersState
}
msgSeq = function(msgID) {
split = msgID.split(re#:#)
split[split.length()-1].defaultsTo("0").as("Number")
}
getMissingMessages = function(peer) {
eci = peer{"Tx"}
peerState = ent:peersState{eci}.defaultsTo({})
msgs = ent:messages.map(function(msgs, sensorID) {
msgs.filter(function(msg, msgID) {
msgSeq(msgID) > peerState{sensorID}.defaultsTo(-1)
})
}).map(function(msgs, sensorID) {
msgs.values()
}).values().reduce(function(a, b) {
a.defaultsTo([]).append(b)
})
msgs
}
getPeer = function() {
peers = peers()
// find all peers missing something
missing = peers.map(function(peer) {
peer.put("missing", getMissingMessages(peer).length())
}).filter(function(peer) {
peer{"missing"} > 0
})
// return a peer that is missing information, else a random peer
peer = (missing.length() > 0 && random:integer(99) < 60)
=> missing[random:integer(missing.length()-1)] // return a missing peer
| peers[random:integer(peers.length()-1)] // return a random peer
peer
}
newMessage = function(peer) {
type = random:integer(99)
msg = (type < 80 && getMissingMessages(peer).length() > 0) => newRumorMessage(peer) | newSeenMessage()
msg
}
newRumorMessage = function(peer) {
missing = getMissingMessages(peer)
// pick a message at random (there is at least one, see newMessage())
msg = missing[random:integer(missing.length()-1)]
msg
}
newSeenMessage = function() {
// figure out the highest consecutive seq seen for each sensor
seqs = messages().map(function(msgs, sensorID) {
msgs.keys().map(function(msgID) {
msgSeq(msgID)
}).sort().reduce(function(a, b) {
(a+1 == b) => b | a
})
})
seqs
}
}
rule init {
select when wrangler ruleset_installed where event:attrs{"rids"} >< meta:rid
always {
raise gossip event "reset"
raise gossip event "heartbeat"
}
}
rule reset {
select when gossip reset
pre {
scheduledID = scheduledHeartbeat()
}
if scheduledID then
schedule:remove(scheduledID)
always {
ent:period := 5 // seconds
ent:messages := {} // sensorID -> messageID -> message
ent:peersState := {} // eci -> sensorID -> highest seq
ent:sequence := 0
schedule gossip event "heartbeat" repeat << */#{ent:period} * * * * * >> attributes {}
}
}
rule set_heartbeat_period {
select when gossip set_heartbeat_period
always {
ent:period := event:attrs{"period"}.defaultsTo(ent:period)
}
}
rule on_heartbeat {
select when gossip heartbeat where processing()
pre {
peer = getPeer().klog("selected peer")
msg = newMessage(peer).klog("sending message")
type = (msg >< "messageID") => "rumor" | "seen"
}
if peer then
event:send({
"eci": peer{"Tx"},
"domain": "gossip",
"type": type,
"attrs": {
"peer": peer,
"msg": msg
}
})
}
rule handle_rumor {
select when gossip rumor where processing()
pre {
msgID = event:attrs{["msg", "messageID"]}
sensorID = event:attrs{["msg", "sensorID"]}
temp = event:attrs{["msg", "temperature"]}
time = event:attrs{["msg", "timestamp"]}
msg = {
"messageID": msgID,
"sensorID": sensorID,
"temperature": temp,
"timestamp": time
}
}
if msgID && sensorID then
noop()
fired {
ent:messages := ent:messages.defaultsTo({}).put([sensorID, msgID], msg)
}
}
rule handle_seen {
select when gossip seen where processing()
pre {
eci = event:attrs{"peer"}{"Rx"}
msg = event:attrs{"msg"}
}
if eci && msg then
noop()
fired {
ent:peersState := ent:peersState.defaultsTo({}).put(eci, msg)
}
}
rule enable_disable_processing {
select when gossip process
pre {
state = event:attrs{"state"}
}
if state == "on" || state == "off" then
noop()
fired {
ent:process := state
}
}
rule new_temp {
select when wovyn new_temperature_reading
pre {
time = event:attrs{"timestamp"}
temp = event:attrs{"temperature"}
sensorID = sensorID()
msgID = <<#{sensorID}:#{ent:sequence}>>
msg = {
"messageID": msgID,
"sensorID": sensorID,
"temperature": temp,
"timestamp": time
}
}
if msgID && sensorID then
noop()
fired {
ent:messages := ent:messages.defaultsTo({}).put([sensorID, msgID], msg)
ent:sequence := ent:sequence + 1;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment