Skip to content

Instantly share code, notes, and snippets.

@master255
Last active February 26, 2024 07:32
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save master255/c63091a1b05a67ed33c8c248f748fd0f to your computer and use it in GitHub Desktop.
Save master255/c63091a1b05a67ed33c8c248f748fd0f to your computer and use it in GitHub Desktop.
work relays
package mobile
import (
"bufio"
"context"
"fmt"
ds "github.com/ipfs/go-datastore"
dsync "github.com/ipfs/go-datastore/sync"
golog "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
ma "github.com/multiformats/go-multiaddr"
"io"
"log"
"sync"
"time"
)
const randezvous = "example"
var dataReceived DataReceived
var closed bool
//var staticRelays []peer.AddrInfo
type MobileLibp2p struct {
host host.Host
relay *relay.Relay
kademlia *dht.IpfsDHT
peerId peer.ID
privateKey crypto.PrivKey
routing *drouting.RoutingDiscovery
ctx context.Context
}
type ReaderSendClose interface {
Close()
SendData([]byte)
}
type StreamStruct struct {
stream network.Stream
}
type DataReceived interface {
DataReceived([]byte, ReaderSendClose)
}
func (streamStruct StreamStruct) SendData(data []byte) {
wr := bufio.NewWriter(streamStruct.stream)
_, err := wr.Write(data)
if err != nil {
fmt.Println("Error writing to buffer")
}
err = wr.Flush()
if err != nil {
fmt.Println("Error flushing buffer")
}
fmt.Println("stream close2")
streamStruct.stream.Close()
}
func (streamStruct StreamStruct) Close() {
fmt.Println("stream close1")
streamStruct.stream.Close()
}
func handleStream(stream network.Stream) {
fmt.Println("Got a new stream!")
if dataReceived != nil {
fmt.Println("Start reader")
re := bufio.NewReader(stream)
fmt.Println("Start read")
buf, err := io.ReadAll(re)
fmt.Println("Start end read")
if err != nil {
fmt.Println("Error read: ", err)
} else {
var readerClose ReaderSendClose = StreamStruct{stream}
dataReceived.DataReceived(buf, readerClose)
return
}
} else {
fmt.Println("data-received null")
}
fmt.Println("stream close")
stream.Close()
}
func StartLibp2p(privateKey []byte, port int) *MobileLibp2p {
golog.SetAllLoggers(golog.LevelError)
if closed {
fmt.Println("closed true!!!!!!!!!!!!!!")
}
var privKey crypto.PrivKey
if privateKey == nil {
privKey, _, _ = crypto.GenerateKeyPair(
crypto.Ed25519,
-1,
)
} else {
privKey, _ = crypto.UnmarshalPrivateKey(privateKey)
}
ctx := context.Background()
//connmgr, err := connmgr.NewConnManager(
// 100, // Lowwater
// 400, // HighWater,
// connmgr.WithGracePeriod(time.Minute),
//)
var routingDiscovery *drouting.RoutingDiscovery
h, err := libp2p.New(
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port), fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port)),
libp2p.Identity(privKey),
libp2p.NATPortMap(),
libp2p.EnableRelay(),
libp2p.EnableHolePunching(),
libp2p.EnableAutoRelayWithPeerSource(func(ctx context.Context, numPeers int) <-chan peer.AddrInfo {
peerChan, _ := routingDiscovery.FindPeers(ctx, randezvous)
//peerChanResult := make(chan peer.AddrInfo, len(peerChan)+len(staticRelays))
//for peerCh := range peerChan {
// peerChanResult <- peer.AddrInfo{ID: peerCh.ID, Addrs: peerCh.Addrs}
//}
//for _, info := range staticRelays {
// peerChanResult <- info
//}
fmt.Println("refresh relays!!!!!!!!!!!!!!")
//close(peerChanResult)
return peerChan
}, autorelay.WithMinCandidates(2)),
libp2p.UserAgent(randezvous))
fmt.Println("MyID: ", h.ID())
h.SetStreamHandler("/example/1.1.0", handleStream)
kademliaDHT := initDHT(ctx, h)
routingDiscovery = drouting.NewRoutingDiscovery(kademliaDHT)
dutil.Advertise(ctx, routingDiscovery, randezvous, discovery.TTL(time.Minute*2))
relay, err := relay.New(h, relay.WithInfiniteLimits())
if err != nil {
log.Printf("Failed to instantiate the relay: %v", err)
}
return &MobileLibp2p{
host: h,
relay: relay,
kademlia: kademliaDHT,
peerId: h.ID(),
privateKey: privKey,
routing: routingDiscovery,
ctx: ctx,
}
}
func (m *MobileLibp2p) GetPrivateKey() []byte {
var privKey, _ = crypto.MarshalPrivateKey(m.privateKey)
return privKey
}
func (m *MobileLibp2p) GetNodeId() string {
return fmt.Sprint(m.peerId)
}
func (m *MobileLibp2p) InitialConnectPeers() bool {
peerChan, err := m.routing.FindPeers(m.ctx, randezvous)
if err != nil {
panic(err)
}
var i = 0
for peer := range peerChan {
fmt.Println("Try peer: ", peer.ID)
if peer.ID == m.host.ID() {
fmt.Println("Skip. My ID")
continue
} else {
fmt.Println("Try connect")
err = m.host.Connect(m.ctx, peer)
//_, _ = client.Reserve(m.ctx, m.host, peer)
if err != nil {
fmt.Println("Failed connecting to ", peer.ID, ", error:", err)
} else {
fmt.Println("Connected!")
i++
//break //delete it in future
}
if closed {
return false
}
time.Sleep(time.Second)
}
if i > 5 {
return true
}
fmt.Println("Founded: ", peer.ID)
}
fmt.Println("Initial success")
return true
}
func (m *MobileLibp2p) ConnectSendData(peerId string, data []byte) bool {
peerChan, errorConnect := m.routing.FindPeers(m.ctx, randezvous)
peerChanResult := make([]peer.AddrInfo, 0)
for element := range peerChan {
peerChanResult = append(peerChanResult, element)
}
for _, peerServer := range peerChanResult {
fmt.Println("Try peer: ", peerServer.ID)
if fmt.Sprint(peerServer.ID) == peerId {
var u = 0
for u < 120 {
fmt.Println("Try connect")
errorConnect = m.host.Connect(m.ctx, peerServer)
if errorConnect != nil {
fmt.Println("Failed connecting1 to ", peerId, ", error:", errorConnect)
var relayInfo peer.AddrInfo
for _, peerRelay := range peerChanResult {
var idString = peerRelay.ID.String()
fmt.Println("Found relay id: ", idString)
if idString != peerId && idString != m.peerId.String() {
fmt.Println("Found relay! ", idString)
relayInfo = peerRelay
break
}
}
errorConnect = m.host.Connect(m.ctx, relayInfo)
if errorConnect == nil {
fmt.Println("Connected to relay!")
} else {
for _, peerRelay := range peerChanResult {
var idString = peerRelay.ID.String()
fmt.Println("Found relay1 id: ", idString)
if idString != peerId && idString != m.peerId.String() && idString != relayInfo.ID.String() {
fmt.Println("Found relay! ", idString)
relayInfo = peerRelay
break
}
}
errorConnect = m.host.Connect(m.ctx, relayInfo)
}
relayAddr, _ := ma.NewMultiaddr("/p2p/" + relayInfo.ID.String() + "/p2p-circuit/p2p/" + peerServer.ID.String())
unreachableRelayInfo := peer.AddrInfo{
ID: peerServer.ID,
Addrs: []ma.Multiaddr{relayAddr},
}
m.host.Network().(*swarm.Swarm).Backoff().Clear(peerServer.ID)
fmt.Println("Try connect to relay")
errorConnect = m.host.Connect(m.ctx, unreachableRelayInfo)
}
if errorConnect != nil {
fmt.Println("Failed connecting1 to ", peerId, ", error:", errorConnect)
} else {
fmt.Println("Try open stream")
stream, err := m.host.NewStream(m.ctx, peerServer.ID, "/example/1.1.0")
if err != nil {
fmt.Println("Connection failed:", err)
} else {
fmt.Println("Connected to:", peerId)
wr := bufio.NewWriter(stream)
_, err := wr.Write(data)
if err != nil {
fmt.Println("Error writing to buffer")
continue
}
err = wr.Flush()
if err != nil {
fmt.Println("Error flushing buffer")
continue
}
stream.Close()
return true
}
}
if closed {
fmt.Println("CLOSED EXIT FROM PROGRAM")
return false
}
u++
time.Sleep(time.Second)
}
}
fmt.Println("Founded1: ", peerServer.ID)
}
return false
}
func (m *MobileLibp2p) ConnectSendGetData(peerId string, data []byte) []byte {
peerChan, errorConnect := m.routing.FindPeers(m.ctx, randezvous)
peerChanResult := make([]peer.AddrInfo, 0)
for element := range peerChan {
peerChanResult = append(peerChanResult, element)
}
for _, peerServer := range peerChanResult {
fmt.Println("Try peer: ", peerServer.ID)
if peerServer.ID.String() == peerId {
var u = 0
for u < 120 {
fmt.Println("Try connect777")
errorConnect = m.host.Connect(m.ctx, peerServer)
if errorConnect != nil {
fmt.Println("Failed connecting to ", peerId, ", error:", errorConnect)
var relayInfo peer.AddrInfo
for _, peerRelay := range peerChanResult {
var idString = peerRelay.ID.String()
fmt.Println("Found relay id: ", idString)
if idString != peerId && idString != m.peerId.String() {
fmt.Println("Found relay! ", idString)
relayInfo = peerRelay
break
}
}
errorConnect = m.host.Connect(m.ctx, relayInfo)
if errorConnect == nil {
fmt.Println("Connected to relay!")
} else {
for _, peerRelay := range peerChanResult {
var idString = peerRelay.ID.String()
fmt.Println("Found relay1 id: ", idString)
if idString != peerId && idString != m.peerId.String() && idString != relayInfo.ID.String() {
fmt.Println("Found relay! ", idString)
relayInfo = peerRelay
break
}
}
errorConnect = m.host.Connect(m.ctx, relayInfo)
}
relayAddr, _ := ma.NewMultiaddr("/p2p/" + relayInfo.ID.String() + "/p2p-circuit/p2p/" + peerServer.ID.String())
unreachableRelayInfo := peer.AddrInfo{
ID: peerServer.ID,
Addrs: []ma.Multiaddr{relayAddr},
}
m.host.Network().(*swarm.Swarm).Backoff().Clear(peerServer.ID)
fmt.Println("Try connect to relay")
errorConnect = m.host.Connect(m.ctx, unreachableRelayInfo)
}
if errorConnect != nil {
fmt.Println("Failed connecting2 to ", peerId, ", error:", errorConnect)
} else {
fmt.Println("Try open stream")
stream, err := m.host.NewStream(m.ctx, peerServer.ID, "/example/1.1.0")
if err != nil {
fmt.Println("Connection failed:", err)
} else {
fmt.Println("Connected to:", peerId)
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
_, err := rw.Write(data)
if err != nil {
fmt.Println("Error writing to buffer")
continue
}
fmt.Println("start flush")
err = rw.Flush()
fmt.Println("end flush")
if err != nil {
fmt.Println("Error flushing buffer")
continue
}
fmt.Println("start close")
stream.CloseWrite()
fmt.Println("end close")
buf, err := io.ReadAll(rw)
stream.Close()
if err != nil {
fmt.Println("Error read: ", err)
} else {
return buf
}
}
}
if closed {
return nil
}
u++
time.Sleep(time.Second)
}
}
fmt.Println("Founded: ", peerServer.ID)
}
return nil
}
func WaitGetData(received DataReceived) {
dataReceived = received
}
func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT {
dstore := dsync.MutexWrap(ds.NewMapDatastore())
kademliaDHT := dht.NewDHT(ctx, h, dstore)
if err := kademliaDHT.Bootstrap(ctx); err != nil {
panic(err)
}
var wg sync.WaitGroup
for _, peerAddr := range dht.DefaultBootstrapPeers {
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
wg.Add(1)
go func() {
defer wg.Done()
if err := h.Connect(ctx, *peerinfo); err != nil {
fmt.Println("Bootstrap warning:", err, " :", peerinfo.String())
}
}()
}
wg.Wait()
fmt.Println("Founded: ", len(h.Peerstore().Peers()))
return kademliaDHT
}
func (m *MobileLibp2p) GetData() {
fmt.Println("peer in store: ", len(m.host.Peerstore().Peers()))
for _, peerId := range m.host.Peerstore().Peers() {
fmt.Println("Peer: ", peerId, " connections: ", len(m.host.Network().ConnsToPeer(peerId)))
}
}
func (m *MobileLibp2p) Stop() {
closed = true
m.host.Close()
m.kademlia.Close()
m.relay.Close()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment