Skip to content

Instantly share code, notes, and snippets.

@steamraven
Created February 13, 2023 17:43
Show Gist options
  • Save steamraven/41ec1867b7677d380555cc541c2bd2e7 to your computer and use it in GitHub Desktop.
Save steamraven/41ec1867b7677d380555cc541c2bd2e7 to your computer and use it in GitHub Desktop.
Simple go file to test pubsub over relay
package main
import (
"context"
"flag"
"fmt"
"math/rand"
"sync"
"time"
"github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
"github.com/multiformats/go-multiaddr"
)
var (
topicNameFlag = flag.String("topicName", "beetjuice", "name of topic to join")
relay = flag.Bool("relay", false, "Enable AutoRelay")
verbose = flag.Bool("verbose", false, "Enable extra logging")
logger = log.Logger("chat")
)
// Simple struct to attach a human readable name
type NamedHost struct {
host.Host
name string
}
func main() {
flag.Parse()
if *verbose {
log.SetAllLoggers(log.LevelDebug)
} else {
log.SetAllLoggers(log.LevelError)
}
log.SetLogLevel("chat", "info") // local logging is at info level
ctx := context.Background()
// Options to not listen on relay
no_relay_opts := []libp2p.Option{
libp2p.DisableRelay(),
}
// Options to only listen on relay
relay_opts := []libp2p.Option{
libp2p.NoListenAddrs,
libp2p.ForceReachabilityPrivate(),
libp2p.EnableRelay(),
libp2p.EnableAutoRelay(autorelay.WithStaticRelays(dht.GetDefaultBootstrapPeerAddrInfos())),
}
var opts []libp2p.Option
if *relay {
opts = relay_opts
} else {
opts = no_relay_opts
}
h1_host, err := libp2p.New(opts...)
if err != nil {
panic(err)
}
h1 := NamedHost{h1_host, "Alice"}
h2_host, err := libp2p.New(opts...)
if err != nil {
panic(err)
}
h2 := NamedHost{h2_host, "Bob"}
wg := sync.WaitGroup{}
// Try to connect peers everytime listen addrs change. Needed as relays take a bit to come up
wg.Add(1)
go connectPeer(ctx, &wg, h1, h2)
wg.Add(1)
go connectPeer(ctx, &wg, h2, h1)
wg.Wait()
// init gossip channels
go gossip(ctx, h1)
go gossip(ctx, h2)
<-ctx.Done()
}
// Connect peers whenever listen addresses change
func connectPeer(ctx context.Context, wg *sync.WaitGroup, h1, h2 NamedHost) {
// monitor for new relays
sub2, err := h2.EventBus().Subscribe([]interface{}{new(event.EvtLocalAddressesUpdated), new(event.EvtPeerConnectednessChanged)})
if err != nil {
panic(err)
}
wgDone := false
// make sure to continously drain event sub to prevent stalling
for {
select {
case e, ok := <-sub2.Out():
if !ok {
return
}
switch evt := e.(type) {
case event.EvtLocalAddressesUpdated:
// convert evt.Current multiaddrs into a AddrInfo
logger.Info(h2.name, " New listen addresses:")
addrInfo := peer.AddrInfo{ID: h2.ID(), Addrs: make([]multiaddr.Multiaddr, 0, len(evt.Current))}
for _, ua := range evt.Current {
logger.Info(" * ", ua.Address)
addrInfo.Addrs = append(addrInfo.Addrs, ua.Address)
}
// Initiate connection. Fairly low cost if already connected
err := h1.Connect(ctx, addrInfo)
if err != nil {
panic(err)
}
case event.EvtPeerConnectednessChanged:
if evt.Peer == h1.ID() {
logger.Info(h2.name, " asserts ", h1.name, " is now ", evt.Connectedness.String())
if !wgDone {
wg.Done()
wgDone = true
}
}
}
case <-ctx.Done():
return
}
}
}
// Create a pubsub Gossip connection
func gossip(ctx context.Context, h NamedHost) {
tracer, err := pubsub.NewJSONTracer(h.name + "_pubsub_trace.json")
if err != nil {
panic(err)
}
ps, err := pubsub.NewGossipSub(ctx, h, pubsub.WithEventTracer(tracer))
//ps, err := pubsub.NewFloodSub(ctx, h)
if err != nil {
panic(err)
}
topic, err := ps.Join(*topicNameFlag)
if err != nil {
panic(err)
}
sub, err := topic.Subscribe()
if err != nil {
panic(err)
}
// needs its own go routine as sub doesn't use channels
go printMessagesFrom(ctx, h, sub)
duration := time.Second + time.Duration(1000*rand.Float32())*time.Millisecond
logger.Info(h.name, " tick time: ", duration)
tick := time.Tick(duration)
// continously publish data
for {
topic.Publish(ctx, []byte(fmt.Sprint("From ", h.name)))
select {
case <-tick:
case <-ctx.Done():
return
}
}
}
func printMessagesFrom(ctx context.Context, h NamedHost, sub *pubsub.Subscription) {
for {
m, err := sub.Next(ctx)
if err != nil {
panic(err)
}
// Filter out messages from self
if m.ReceivedFrom != h.ID() {
fmt.Println(h.name, ": ", m.ReceivedFrom, " says ", string(m.Message.Data))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment