Created
March 27, 2022 05:37
-
-
Save spikeekips/c1e756385218de97f9bf83605edd7e60 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bytes" | |
"crypto/rand" | |
"crypto/rsa" | |
"crypto/tls" | |
"crypto/x509" | |
"encoding/pem" | |
"fmt" | |
"io" | |
stdlog "log" | |
"math/big" | |
"net" | |
"os" | |
"time" | |
"github.com/lucas-clemente/quic-go" | |
"github.com/rs/zerolog" | |
"github.com/spikeekips/mitum/base" | |
"github.com/spikeekips/mitum/util" | |
"github.com/spikeekips/mitum/util/encoder" | |
jsonenc "github.com/spikeekips/mitum/util/encoder/json" | |
"github.com/spikeekips/mitum/util/logging" | |
"github.com/spikeekips/quictransport" | |
"github.com/spikeekips/quictransport/quicstream" | |
) | |
var ( | |
log *zerolog.Logger | |
locallogging *logging.Logging | |
quicproto = "hihihi" | |
) | |
func main() { // nolint | |
locallogging = logging.Setup(os.Stderr, zerolog.DebugLevel, "", false) | |
log = locallogging.Log() | |
var bindaddr *net.UDPAddr | |
switch addr, err := net.ResolveUDPAddr("udp", os.Args[1]); { | |
case err != nil: | |
log.Error().Err(err).Str("address", os.Args[1]).Msg("failed to parse local address") | |
os.Exit(1) | |
default: | |
bindaddr = addr | |
} | |
remotecis := make([]quictransport.ConnInfo, len(os.Args[2:])) | |
for i := range os.Args[2:] { | |
a := os.Args[i+2] | |
switch ci, err := quictransport.NewBaseConnInfoFromString(a); { | |
case err != nil: | |
log.Error().Err(err).Str("address", a).Msg("failed to parse remote address") | |
os.Exit(1) | |
default: | |
remotecis[i] = ci | |
} | |
} | |
log.Debug(). | |
Stringer("bind_address", bindaddr). | |
Interface("remote_addresses", remotecis). | |
Msg("trying to start memberlist") | |
advertiseaddr, _ := net.ResolveUDPAddr("udp", fmt.Sprintf("localhost:%d", bindaddr.Port)) | |
enc := jsonenc.NewEncoder() | |
_ = enc.Add(encoder.DecodeDetail{Hint: base.StringAddressHint, Instance: base.StringAddress{}}) | |
_ = enc.Add(encoder.DecodeDetail{Hint: quictransport.NodeHint, Instance: quictransport.BaseNode{}}) | |
_ = enc.Add(encoder.DecodeDetail{Hint: quictransport.NodeMetaHint, Instance: quictransport.NodeMeta{}}) | |
// NOTE make local | |
local, err := quictransport.NewNode( | |
advertiseaddr.String(), | |
advertiseaddr, | |
quictransport.NewNodeMeta(base.NewStringAddress(util.UUID().String()), true), | |
) | |
if err != nil { | |
log.Error().Err(err).Msg("failed to create Node") | |
os.Exit(1) | |
} | |
poolclient := quicstream.NewPoolClient() | |
newClient := func(addr *net.UDPAddr, insecure bool) *quicstream.Client { | |
return quicstream.NewClient( | |
addr, | |
&tls.Config{ | |
InsecureSkipVerify: insecure, // nolint:gosec | |
NextProtos: []string{quicproto}, | |
}, | |
&quic.Config{ | |
HandshakeIdleTimeout: time.Second * 2, | |
MaxIdleTimeout: time.Second * 3, | |
}, | |
nil, nil, | |
) | |
} | |
transport := quictransport.NewTransportWithQuicstream( | |
advertiseaddr, | |
"memberlist", | |
poolclient, | |
func(ci quictransport.ConnInfo) func(*net.UDPAddr) *quicstream.Client { | |
return func(*net.UDPAddr) *quicstream.Client { | |
return newClient(ci.Address(), ci.Insecure()) | |
} | |
}, | |
) | |
_ = transport.SetLogging(locallogging) | |
handler := quicstream.NewPrefixHandler(nil) | |
handler.Add("memberlist", func(addr net.Addr, r io.ReadCloser, w io.WriteCloser) error { | |
b, err := io.ReadAll(r) | |
if err != nil { | |
log.Error().Err(err).Stringer("remote_address", addr).Msg("failed to read") | |
return err | |
} | |
if err := transport.ReceiveRaw(b, addr); err != nil { | |
log.Error().Err(err).Stringer("remote_address", addr).Msg("invalid message received") | |
return err | |
} | |
return nil | |
}) | |
tlsconfig := generateTLSConfig(quicproto) | |
quicstreamsrv := quicstream.NewServer(bindaddr, tlsconfig, nil, handler.Handler) | |
if err := quicstreamsrv.Start(); err != nil { | |
log.Error().Err(err).Msg("failed quicstream.NewServer") | |
os.Exit(1) | |
} | |
var mem *quictransport.Memberlist | |
memberlistconfig := quictransport.BasicMemberlistConfig(local.Name(), bindaddr, advertiseaddr) | |
memberlistconfig.Logger = stdlog.New(os.Stderr, "", stdlog.LstdFlags) | |
memberlistconfig.Transport = transport | |
aliveDelegate := quictransport.NewAliveDelegate(enc, advertiseaddr, func(quictransport.Node) error { return nil }) | |
aliveDelegate.SetLogging(locallogging) | |
memberlistconfig.Alive = aliveDelegate | |
whenJoined := func(quictransport.Node) { | |
if mem == nil { | |
return | |
} | |
_, _ = fmt.Printf("> %d: node joined: members: %d\n", time.Now().Unix(), mem.MembersLen()) | |
mem.Members(func(node quictransport.Node) bool { | |
fmt.Println("Member:", node.Address(), ":", node.JoinedAt()) | |
return true | |
}) | |
} | |
whenLeft := func(quictransport.Node) { | |
if mem == nil { | |
return | |
} | |
_, _ = fmt.Printf("> %d: node left: members: %d\n", time.Now().Unix(), mem.MembersLen()) | |
mem.Members(func(node quictransport.Node) bool { | |
fmt.Println("Member:", node.Address(), ":", node.JoinedAt()) | |
return true | |
}) | |
} | |
eventsDelegate := quictransport.NewEventsDelegate( | |
enc, | |
whenJoined, | |
whenLeft, | |
) | |
eventsDelegate.SetLogging(locallogging) | |
memberlistconfig.Events = eventsDelegate | |
delegate := quictransport.NewDelegate(local, nil) | |
delegate.SetLogging(locallogging) | |
memberlistconfig.Delegate = delegate | |
switch i, err := quictransport.NewMemberlist(local, enc, memberlistconfig, 3); { | |
case err != nil: | |
log.Error().Err(err).Msg("failed to make memberlist") | |
os.Exit(1) | |
default: | |
mem = i | |
_ = mem.SetLogging(locallogging) | |
} | |
if err := mem.Start(); err != nil { | |
log.Error().Err(err).Msg("failed to start memberlist") | |
os.Exit(1) | |
} | |
if len(remotecis) > 0 { | |
go func() { | |
ticker := time.NewTicker(time.Second) | |
for range ticker.C { | |
log.Debug().Msg("trying to join") | |
if err := mem.Join(remotecis); err != nil { | |
log.Error().Err(err).Msg("failed to join") | |
continue | |
} | |
break | |
} | |
log.Debug().Msg("joined") | |
}() | |
} | |
ticker := time.NewTicker(time.Second) | |
for range ticker.C { | |
if n, _ := rand.Int(rand.Reader, big.NewInt(100)); n.Int64()%10 != 0 { | |
continue | |
} | |
s := time.Now() | |
mb := bytes.Repeat([]byte("k"), memberlistconfig.UDPBufferSize-100) | |
bch := make(chan struct{}) | |
b := quictransport.NewBroadcast( | |
mb, | |
util.UUID().String(), | |
bch, | |
) | |
mem.Broadcast(b) | |
<-bch | |
_, _ = fmt.Println(">> b done", time.Since(s)) | |
} | |
os.Exit(0) | |
} | |
func generateTLSConfig(proto string) *tls.Config { | |
key, err := rsa.GenerateKey(rand.Reader, 2048) | |
if err != nil { | |
panic(err) | |
} | |
template := x509.Certificate{SerialNumber: big.NewInt(1)} | |
certDER, err := x509.CreateCertificate(rand.Reader, &template, &template, &key.PublicKey, key) | |
if err != nil { | |
panic(err) | |
} | |
keyPEM := pem.EncodeToMemory(&pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(key)}) | |
certPEM := pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certDER}) | |
tlsCert, err := tls.X509KeyPair(certPEM, keyPEM) | |
if err != nil { | |
panic(err) | |
} | |
return &tls.Config{ | |
MinVersion: tls.VersionTLS13, | |
Certificates: []tls.Certificate{tlsCert}, | |
NextProtos: []string{proto}, | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment