Skip to content

Instantly share code, notes, and snippets.

@ExFed
Last active October 3, 2023 10:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ExFed/5f061330377551d5c72e6c2cdcf96cc5 to your computer and use it in GitHub Desktop.
Save ExFed/5f061330377551d5c72e6c2cdcf96cc5 to your computer and use it in GitHub Desktop.
Stupid Gossip Network
#!/usr/bin/env groovy
import groovy.transform.*
import java.util.concurrent.atomic.*
@Immutable(knownImmutableClasses = [InetSocketAddress])
class Peer {
private static final SELF_HOSTS = (InetAddress.getAllByName('localhost') as List
+ InetAddress.localHost
+ InetAddress.loopbackAddress
+ InetAddress.getByName('::0')
+ InetAddress.getByName('0')).asUnmodifiable()
static Peer decode(String str) {
// localhost is all that matters
return new Peer(new InetSocketAddress('localhost', str as int))
}
InetSocketAddress socketAddress
String encode() {
"$socketAddress.port" // we only care about localhost
}
boolean equals(o) {
if (this === o) return true
if (o !instanceof Peer) return false
def that = o as Peer
if (this.socketAddress.port != that.socketAddress.port) return false
if (this.socketAddress in SELF_HOSTS && that.socketAddress in SELF_HOSTS) return true
return socketAddress == that.socketAddress
}
int hashCode() {
if (socketAddress in SELF_HOSTS) return socketAddress.port
return socketAddress.hashCode()
}
}
@Immutable
@ToString(includeNames = true, excludes = 'unbannedPeers')
class State {
static State init(int timestamp, String... peerStrs) {
def peers = peerStrs.collectEntries {
[Peer.decode(it), 0]
}
return new State(timestamp, peers, [] as Set)
}
static State decode(String str) {
def toks = str.split(' ')
def timestamp = toks.head() as int
def peers = toks.tail().collectEntries {
def (addrStr, tsStr) = it.split(';')
[Peer.decode(addrStr), tsStr as int]
}
return new State(timestamp, peers, [] as Set)
}
int timestamp
Map<Peer, Integer> peers
Set<Peer> banlist
Map<Peer, Integer> getUnbannedPeers() { peers.findAll { it.key !in banlist } }
State plus(Map<Peer, Integer> peers) {
def pm = this.peers + peers
def ts = pm.values().max()
return new State(Math.max(timestamp, ts), pm, banlist)
}
State plus(State that) {
def timestamp = Math.max(this.timestamp, that.timestamp)
def peers = this.peers + that.peers
return new State(timestamp, peers, banlist)
}
State minus(Peer peer) {
return new State(timestamp, peers.findAll { it.key != peer }, banlist)
}
State next() {
return new State(timestamp + 1, peers, banlist)
}
State ban(Peer peer) {
return peer in banlist ? this : new State(timestamp, peers, banlist + peer) - peer
}
State unban(Peer peer) {
return peer in banlist ? new State(timestamp, peers, banlist - peer) : this
}
String encode() {
def peerTokens = unbannedPeers.collect { p, ts -> "${p.encode()};$ts"}
return ([timestamp] + peerTokens).join(' ')
}
}
def peer = { int listenPort = 0, initPeers = [] ->
def sock = listenPort > 0 ? new DatagramSocket(listenPort) : new DatagramSocket()
def self = new Peer(new InetSocketAddress(InetAddress.loopbackAddress, sock.localPort))
def state = new AtomicReference(State.init(0, initPeers))
def log = { msg ->
println "$sock.localSocketAddress.port :: $msg"
}
def send = { String message, SocketAddress... dests ->
def buf = message.bytes
assert buf.length <= 512
dests.each { dest ->
sock.send(new DatagramPacket(buf, buf.length, dest))
}
}
def broadcast = { String message ->
def peerAddrs = state.get().peers.keySet()*.socketAddress
send(message, *peerAddrs)
}
def receive = { int timeout = 0 ->
def pack = new DatagramPacket(new byte[512], 512)
sock.soTimeout = timeout
try {
sock.receive(pack)
} catch (SocketTimeoutException e) {
return null
}
return [pack.socketAddress, new String(pack.data, 0, pack.length)]
}
def postReceive = { SocketAddress sender, String msg ->
try {
def peer = new Peer(sender)
def peerState = State.decode(msg)
def peerTimestamp = peerState.timestamp
def peers = peerState.peers + [(peer): peerTimestamp]
peers.remove(self)
return state.updateAndGet { (it + peers).unban(peer).next() }
} catch (Exception e) {
e.printStackTrace(System.err)
// remove peer that sent a faulty packet
return state.updateAndGet { it.ban(peer) - peer }
}
}
Thread.startDaemon {
def grace = 8
def rand = new Random()
def calcSleep = { n, base, jitter -> n * base + jitter - rand.nextInt(jitter * 2) }
def cleanPeers = { State currState ->
def threshold = currState.timestamp - grace * (1 + currState.peers.size())
def delinquent = currState.unbannedPeers.findAll { k, v -> v < threshold }.keySet()
return delinquent.inject(currState) { acc, p -> acc.ban(p) }
}
while (true) {
def currState = state.getAndUpdate(cleanPeers)
broadcast(currState.encode())
Thread.sleep(calcSleep(currState.peers.size() + 1, 500, 250))
}
}
def currState = state.get()
log(currState.encode())
while (currState = receive()?.with(postReceive)) {
log(currState.encode())
}
}
if (args) {
peer(args.head() as int, args.tail())
} else {
peer()
}
#!/bin/sh
trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
for N in $(seq $(($1 + 1)) $2)
do
groovy gossip.groovy $N $1 > gossip.$N.out &
done
groovy gossip.groovy $1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment