Skip to content

Instantly share code, notes, and snippets.

@spikeekips
Created March 27, 2022 05:37
Show Gist options
  • Save spikeekips/c1e756385218de97f9bf83605edd7e60 to your computer and use it in GitHub Desktop.
Save spikeekips/c1e756385218de97f9bf83605edd7e60 to your computer and use it in GitHub Desktop.
package main
import (
"bytes"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"encoding/pem"
"fmt"
"io"
stdlog "log"
"math/big"
"net"
"os"
"time"
"github.com/lucas-clemente/quic-go"
"github.com/rs/zerolog"
"github.com/spikeekips/mitum/base"
"github.com/spikeekips/mitum/util"
"github.com/spikeekips/mitum/util/encoder"
jsonenc "github.com/spikeekips/mitum/util/encoder/json"
"github.com/spikeekips/mitum/util/logging"
"github.com/spikeekips/quictransport"
"github.com/spikeekips/quictransport/quicstream"
)
var (
log *zerolog.Logger
locallogging *logging.Logging
quicproto = "hihihi"
)
func main() { // nolint
locallogging = logging.Setup(os.Stderr, zerolog.DebugLevel, "", false)
log = locallogging.Log()
var bindaddr *net.UDPAddr
switch addr, err := net.ResolveUDPAddr("udp", os.Args[1]); {
case err != nil:
log.Error().Err(err).Str("address", os.Args[1]).Msg("failed to parse local address")
os.Exit(1)
default:
bindaddr = addr
}
remotecis := make([]quictransport.ConnInfo, len(os.Args[2:]))
for i := range os.Args[2:] {
a := os.Args[i+2]
switch ci, err := quictransport.NewBaseConnInfoFromString(a); {
case err != nil:
log.Error().Err(err).Str("address", a).Msg("failed to parse remote address")
os.Exit(1)
default:
remotecis[i] = ci
}
}
log.Debug().
Stringer("bind_address", bindaddr).
Interface("remote_addresses", remotecis).
Msg("trying to start memberlist")
advertiseaddr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", bindaddr.Port))
enc := jsonenc.NewEncoder()
_ = enc.Add(encoder.DecodeDetail{Hint: base.StringAddressHint, Instance: base.StringAddress{}})
_ = enc.Add(encoder.DecodeDetail{Hint: quictransport.NodeHint, Instance: quictransport.BaseNode{}})
_ = enc.Add(encoder.DecodeDetail{Hint: quictransport.NodeMetaHint, Instance: quictransport.NodeMeta{}})
// NOTE make local
local, err := quictransport.NewNode(
advertiseaddr.String(),
advertiseaddr,
quictransport.NewNodeMeta(base.NewStringAddress(util.UUID().String()), true),
)
if err != nil {
log.Error().Err(err).Msg("failed to create Node")
os.Exit(1)
}
poolclient := quicstream.NewPoolClient()
newClient := func(addr *net.UDPAddr, insecure bool) *quicstream.Client {
return quicstream.NewClient(
addr,
&tls.Config{
InsecureSkipVerify: insecure, // nolint:gosec
NextProtos: []string{quicproto},
},
&quic.Config{
HandshakeIdleTimeout: time.Second * 2,
MaxIdleTimeout: time.Second * 3,
},
nil, nil,
)
}
transport := quictransport.NewTransportWithQuicstream(
advertiseaddr,
"memberlist",
poolclient,
func(ci quictransport.ConnInfo) func(*net.UDPAddr) *quicstream.Client {
return func(*net.UDPAddr) *quicstream.Client {
return newClient(ci.Address(), ci.Insecure())
}
},
)
_ = transport.SetLogging(locallogging)
handler := quicstream.NewPrefixHandler(nil)
handler.Add("memberlist", func(addr net.Addr, r io.ReadCloser, w io.WriteCloser) error {
b, err := io.ReadAll(r)
if err != nil {
log.Error().Err(err).Stringer("remote_address", addr).Msg("failed to read")
return err
}
if err := transport.ReceiveRaw(b, addr); err != nil {
log.Error().Err(err).Stringer("remote_address", addr).Msg("invalid message received")
return err
}
return nil
})
tlsconfig := generateTLSConfig(quicproto)
quicstreamsrv := quicstream.NewServer(bindaddr, tlsconfig, nil, handler.Handler)
if err := quicstreamsrv.Start(); err != nil {
log.Error().Err(err).Msg("failed quicstream.NewServer")
os.Exit(1)
}
var mem *quictransport.Memberlist
memberlistconfig := quictransport.BasicMemberlistConfig(local.Name(), bindaddr, advertiseaddr)
memberlistconfig.Logger = stdlog.New(os.Stderr, "", stdlog.LstdFlags)
memberlistconfig.Transport = transport
aliveDelegate := quictransport.NewAliveDelegate(enc, advertiseaddr, func(quictransport.Node) error { return nil })
aliveDelegate.SetLogging(locallogging)
memberlistconfig.Alive = aliveDelegate
whenJoined := func(quictransport.Node) {
if mem == nil {
return
}
_, _ = fmt.Printf("> %d: node joined: members: %d\n", time.Now().Unix(), mem.MembersLen())
mem.Members(func(node quictransport.Node) bool {
fmt.Println("Member:", node.Address(), ":", node.JoinedAt())
return true
})
}
whenLeft := func(quictransport.Node) {
if mem == nil {
return
}
_, _ = fmt.Printf("> %d: node left: members: %d\n", time.Now().Unix(), mem.MembersLen())
mem.Members(func(node quictransport.Node) bool {
fmt.Println("Member:", node.Address(), ":", node.JoinedAt())
return true
})
}
eventsDelegate := quictransport.NewEventsDelegate(
enc,
whenJoined,
whenLeft,
)
eventsDelegate.SetLogging(locallogging)
memberlistconfig.Events = eventsDelegate
delegate := quictransport.NewDelegate(local, nil)
delegate.SetLogging(locallogging)
memberlistconfig.Delegate = delegate
switch i, err := quictransport.NewMemberlist(local, enc, memberlistconfig, 3); {
case err != nil:
log.Error().Err(err).Msg("failed to make memberlist")
os.Exit(1)
default:
mem = i
_ = mem.SetLogging(locallogging)
}
if err := mem.Start(); err != nil {
log.Error().Err(err).Msg("failed to start memberlist")
os.Exit(1)
}
if len(remotecis) > 0 {
go func() {
ticker := time.NewTicker(time.Second)
for range ticker.C {
log.Debug().Msg("trying to join")
if err := mem.Join(remotecis); err != nil {
log.Error().Err(err).Msg("failed to join")
continue
}
break
}
log.Debug().Msg("joined")
}()
}
ticker := time.NewTicker(time.Second)
for range ticker.C {
if n, _ := rand.Int(rand.Reader, big.NewInt(100)); n.Int64()%10 != 0 {
continue
}
s := time.Now()
mb := bytes.Repeat([]byte("k"), memberlistconfig.UDPBufferSize-100)
bch := make(chan struct{})
b := quictransport.NewBroadcast(
mb,
util.UUID().String(),
bch,
)
mem.Broadcast(b)
<-bch
_, _ = fmt.Println(">> b done", time.Since(s))
}
os.Exit(0)
}
func generateTLSConfig(proto string) *tls.Config {
key, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
panic(err)
}
template := x509.Certificate{SerialNumber: big.NewInt(1)}
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key)
if err != nil {
panic(err)
}
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)})
certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER})
tlsCert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
panic(err)
}
return &tls.Config{
MinVersion: tls.VersionTLS13,
Certificates: []tls.Certificate{tlsCert},
NextProtos: []string{proto},
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment