Last active
February 26, 2024 07:32
-
-
Save master255/c63091a1b05a67ed33c8c248f748fd0f to your computer and use it in GitHub Desktop.
work relays
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mobile | |
import ( | |
"bufio" | |
"context" | |
"fmt" | |
ds "github.com/ipfs/go-datastore" | |
dsync "github.com/ipfs/go-datastore/sync" | |
golog "github.com/ipfs/go-log/v2" | |
"github.com/libp2p/go-libp2p" | |
dht "github.com/libp2p/go-libp2p-kad-dht" | |
"github.com/libp2p/go-libp2p/core/crypto" | |
"github.com/libp2p/go-libp2p/core/discovery" | |
"github.com/libp2p/go-libp2p/core/host" | |
"github.com/libp2p/go-libp2p/core/network" | |
"github.com/libp2p/go-libp2p/core/peer" | |
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing" | |
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util" | |
"github.com/libp2p/go-libp2p/p2p/host/autorelay" | |
"github.com/libp2p/go-libp2p/p2p/net/swarm" | |
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" | |
ma "github.com/multiformats/go-multiaddr" | |
"io" | |
"log" | |
"sync" | |
"time" | |
) | |
const randezvous = "example" | |
var dataReceived DataReceived | |
var closed bool | |
//var staticRelays []peer.AddrInfo | |
type MobileLibp2p struct { | |
host host.Host | |
relay *relay.Relay | |
kademlia *dht.IpfsDHT | |
peerId peer.ID | |
privateKey crypto.PrivKey | |
routing *drouting.RoutingDiscovery | |
ctx context.Context | |
} | |
type ReaderSendClose interface { | |
Close() | |
SendData([]byte) | |
} | |
type StreamStruct struct { | |
stream network.Stream | |
} | |
type DataReceived interface { | |
DataReceived([]byte, ReaderSendClose) | |
} | |
func (streamStruct StreamStruct) SendData(data []byte) { | |
wr := bufio.NewWriter(streamStruct.stream) | |
_, err := wr.Write(data) | |
if err != nil { | |
fmt.Println("Error writing to buffer") | |
} | |
err = wr.Flush() | |
if err != nil { | |
fmt.Println("Error flushing buffer") | |
} | |
fmt.Println("stream close2") | |
streamStruct.stream.Close() | |
} | |
func (streamStruct StreamStruct) Close() { | |
fmt.Println("stream close1") | |
streamStruct.stream.Close() | |
} | |
func handleStream(stream network.Stream) { | |
fmt.Println("Got a new stream!") | |
if dataReceived != nil { | |
fmt.Println("Start reader") | |
re := bufio.NewReader(stream) | |
fmt.Println("Start read") | |
buf, err := io.ReadAll(re) | |
fmt.Println("Start end read") | |
if err != nil { | |
fmt.Println("Error read: ", err) | |
} else { | |
var readerClose ReaderSendClose = StreamStruct{stream} | |
dataReceived.DataReceived(buf, readerClose) | |
return | |
} | |
} else { | |
fmt.Println("data-received null") | |
} | |
fmt.Println("stream close") | |
stream.Close() | |
} | |
func StartLibp2p(privateKey []byte, port int) *MobileLibp2p { | |
golog.SetAllLoggers(golog.LevelError) | |
if closed { | |
fmt.Println("closed true!!!!!!!!!!!!!!") | |
} | |
var privKey crypto.PrivKey | |
if privateKey == nil { | |
privKey, _, _ = crypto.GenerateKeyPair( | |
crypto.Ed25519, | |
-1, | |
) | |
} else { | |
privKey, _ = crypto.UnmarshalPrivateKey(privateKey) | |
} | |
ctx := context.Background() | |
//connmgr, err := connmgr.NewConnManager( | |
// 100, // Lowwater | |
// 400, // HighWater, | |
// connmgr.WithGracePeriod(time.Minute), | |
//) | |
var routingDiscovery *drouting.RoutingDiscovery | |
h, err := libp2p.New( | |
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port), fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port)), | |
libp2p.Identity(privKey), | |
libp2p.NATPortMap(), | |
libp2p.EnableRelay(), | |
libp2p.EnableHolePunching(), | |
libp2p.EnableAutoRelayWithPeerSource(func(ctx context.Context, numPeers int) <-chan peer.AddrInfo { | |
peerChan, _ := routingDiscovery.FindPeers(ctx, randezvous) | |
//peerChanResult := make(chan peer.AddrInfo, len(peerChan)+len(staticRelays)) | |
//for peerCh := range peerChan { | |
// peerChanResult <- peer.AddrInfo{ID: peerCh.ID, Addrs: peerCh.Addrs} | |
//} | |
//for _, info := range staticRelays { | |
// peerChanResult <- info | |
//} | |
fmt.Println("refresh relays!!!!!!!!!!!!!!") | |
//close(peerChanResult) | |
return peerChan | |
}, autorelay.WithMinCandidates(2)), | |
libp2p.UserAgent(randezvous)) | |
fmt.Println("MyID: ", h.ID()) | |
h.SetStreamHandler("/example/1.1.0", handleStream) | |
kademliaDHT := initDHT(ctx, h) | |
routingDiscovery = drouting.NewRoutingDiscovery(kademliaDHT) | |
dutil.Advertise(ctx, routingDiscovery, randezvous, discovery.TTL(time.Minute*2)) | |
relay, err := relay.New(h, relay.WithInfiniteLimits()) | |
if err != nil { | |
log.Printf("Failed to instantiate the relay: %v", err) | |
} | |
return &MobileLibp2p{ | |
host: h, | |
relay: relay, | |
kademlia: kademliaDHT, | |
peerId: h.ID(), | |
privateKey: privKey, | |
routing: routingDiscovery, | |
ctx: ctx, | |
} | |
} | |
func (m *MobileLibp2p) GetPrivateKey() []byte { | |
var privKey, _ = crypto.MarshalPrivateKey(m.privateKey) | |
return privKey | |
} | |
func (m *MobileLibp2p) GetNodeId() string { | |
return fmt.Sprint(m.peerId) | |
} | |
func (m *MobileLibp2p) InitialConnectPeers() bool { | |
peerChan, err := m.routing.FindPeers(m.ctx, randezvous) | |
if err != nil { | |
panic(err) | |
} | |
var i = 0 | |
for peer := range peerChan { | |
fmt.Println("Try peer: ", peer.ID) | |
if peer.ID == m.host.ID() { | |
fmt.Println("Skip. My ID") | |
continue | |
} else { | |
fmt.Println("Try connect") | |
err = m.host.Connect(m.ctx, peer) | |
//_, _ = client.Reserve(m.ctx, m.host, peer) | |
if err != nil { | |
fmt.Println("Failed connecting to ", peer.ID, ", error:", err) | |
} else { | |
fmt.Println("Connected!") | |
i++ | |
//break //delete it in future | |
} | |
if closed { | |
return false | |
} | |
time.Sleep(time.Second) | |
} | |
if i > 5 { | |
return true | |
} | |
fmt.Println("Founded: ", peer.ID) | |
} | |
fmt.Println("Initial success") | |
return true | |
} | |
func (m *MobileLibp2p) ConnectSendData(peerId string, data []byte) bool { | |
peerChan, errorConnect := m.routing.FindPeers(m.ctx, randezvous) | |
peerChanResult := make([]peer.AddrInfo, 0) | |
for element := range peerChan { | |
peerChanResult = append(peerChanResult, element) | |
} | |
for _, peerServer := range peerChanResult { | |
fmt.Println("Try peer: ", peerServer.ID) | |
if fmt.Sprint(peerServer.ID) == peerId { | |
var u = 0 | |
for u < 120 { | |
fmt.Println("Try connect") | |
errorConnect = m.host.Connect(m.ctx, peerServer) | |
if errorConnect != nil { | |
fmt.Println("Failed connecting1 to ", peerId, ", error:", errorConnect) | |
var relayInfo peer.AddrInfo | |
for _, peerRelay := range peerChanResult { | |
var idString = peerRelay.ID.String() | |
fmt.Println("Found relay id: ", idString) | |
if idString != peerId && idString != m.peerId.String() { | |
fmt.Println("Found relay! ", idString) | |
relayInfo = peerRelay | |
break | |
} | |
} | |
errorConnect = m.host.Connect(m.ctx, relayInfo) | |
if errorConnect == nil { | |
fmt.Println("Connected to relay!") | |
} else { | |
for _, peerRelay := range peerChanResult { | |
var idString = peerRelay.ID.String() | |
fmt.Println("Found relay1 id: ", idString) | |
if idString != peerId && idString != m.peerId.String() && idString != relayInfo.ID.String() { | |
fmt.Println("Found relay! ", idString) | |
relayInfo = peerRelay | |
break | |
} | |
} | |
errorConnect = m.host.Connect(m.ctx, relayInfo) | |
} | |
relayAddr, _ := ma.NewMultiaddr("/p2p/" + relayInfo.ID.String() + "/p2p-circuit/p2p/" + peerServer.ID.String()) | |
unreachableRelayInfo := peer.AddrInfo{ | |
ID: peerServer.ID, | |
Addrs: []ma.Multiaddr{relayAddr}, | |
} | |
m.host.Network().(*swarm.Swarm).Backoff().Clear(peerServer.ID) | |
fmt.Println("Try connect to relay") | |
errorConnect = m.host.Connect(m.ctx, unreachableRelayInfo) | |
} | |
if errorConnect != nil { | |
fmt.Println("Failed connecting1 to ", peerId, ", error:", errorConnect) | |
} else { | |
fmt.Println("Try open stream") | |
stream, err := m.host.NewStream(m.ctx, peerServer.ID, "/example/1.1.0") | |
if err != nil { | |
fmt.Println("Connection failed:", err) | |
} else { | |
fmt.Println("Connected to:", peerId) | |
wr := bufio.NewWriter(stream) | |
_, err := wr.Write(data) | |
if err != nil { | |
fmt.Println("Error writing to buffer") | |
continue | |
} | |
err = wr.Flush() | |
if err != nil { | |
fmt.Println("Error flushing buffer") | |
continue | |
} | |
stream.Close() | |
return true | |
} | |
} | |
if closed { | |
fmt.Println("CLOSED EXIT FROM PROGRAM") | |
return false | |
} | |
u++ | |
time.Sleep(time.Second) | |
} | |
} | |
fmt.Println("Founded1: ", peerServer.ID) | |
} | |
return false | |
} | |
func (m *MobileLibp2p) ConnectSendGetData(peerId string, data []byte) []byte { | |
peerChan, errorConnect := m.routing.FindPeers(m.ctx, randezvous) | |
peerChanResult := make([]peer.AddrInfo, 0) | |
for element := range peerChan { | |
peerChanResult = append(peerChanResult, element) | |
} | |
for _, peerServer := range peerChanResult { | |
fmt.Println("Try peer: ", peerServer.ID) | |
if peerServer.ID.String() == peerId { | |
var u = 0 | |
for u < 120 { | |
fmt.Println("Try connect777") | |
errorConnect = m.host.Connect(m.ctx, peerServer) | |
if errorConnect != nil { | |
fmt.Println("Failed connecting to ", peerId, ", error:", errorConnect) | |
var relayInfo peer.AddrInfo | |
for _, peerRelay := range peerChanResult { | |
var idString = peerRelay.ID.String() | |
fmt.Println("Found relay id: ", idString) | |
if idString != peerId && idString != m.peerId.String() { | |
fmt.Println("Found relay! ", idString) | |
relayInfo = peerRelay | |
break | |
} | |
} | |
errorConnect = m.host.Connect(m.ctx, relayInfo) | |
if errorConnect == nil { | |
fmt.Println("Connected to relay!") | |
} else { | |
for _, peerRelay := range peerChanResult { | |
var idString = peerRelay.ID.String() | |
fmt.Println("Found relay1 id: ", idString) | |
if idString != peerId && idString != m.peerId.String() && idString != relayInfo.ID.String() { | |
fmt.Println("Found relay! ", idString) | |
relayInfo = peerRelay | |
break | |
} | |
} | |
errorConnect = m.host.Connect(m.ctx, relayInfo) | |
} | |
relayAddr, _ := ma.NewMultiaddr("/p2p/" + relayInfo.ID.String() + "/p2p-circuit/p2p/" + peerServer.ID.String()) | |
unreachableRelayInfo := peer.AddrInfo{ | |
ID: peerServer.ID, | |
Addrs: []ma.Multiaddr{relayAddr}, | |
} | |
m.host.Network().(*swarm.Swarm).Backoff().Clear(peerServer.ID) | |
fmt.Println("Try connect to relay") | |
errorConnect = m.host.Connect(m.ctx, unreachableRelayInfo) | |
} | |
if errorConnect != nil { | |
fmt.Println("Failed connecting2 to ", peerId, ", error:", errorConnect) | |
} else { | |
fmt.Println("Try open stream") | |
stream, err := m.host.NewStream(m.ctx, peerServer.ID, "/example/1.1.0") | |
if err != nil { | |
fmt.Println("Connection failed:", err) | |
} else { | |
fmt.Println("Connected to:", peerId) | |
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) | |
_, err := rw.Write(data) | |
if err != nil { | |
fmt.Println("Error writing to buffer") | |
continue | |
} | |
fmt.Println("start flush") | |
err = rw.Flush() | |
fmt.Println("end flush") | |
if err != nil { | |
fmt.Println("Error flushing buffer") | |
continue | |
} | |
fmt.Println("start close") | |
stream.CloseWrite() | |
fmt.Println("end close") | |
buf, err := io.ReadAll(rw) | |
stream.Close() | |
if err != nil { | |
fmt.Println("Error read: ", err) | |
} else { | |
return buf | |
} | |
} | |
} | |
if closed { | |
return nil | |
} | |
u++ | |
time.Sleep(time.Second) | |
} | |
} | |
fmt.Println("Founded: ", peerServer.ID) | |
} | |
return nil | |
} | |
func WaitGetData(received DataReceived) { | |
dataReceived = received | |
} | |
func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT { | |
dstore := dsync.MutexWrap(ds.NewMapDatastore()) | |
kademliaDHT := dht.NewDHT(ctx, h, dstore) | |
if err := kademliaDHT.Bootstrap(ctx); err != nil { | |
panic(err) | |
} | |
var wg sync.WaitGroup | |
for _, peerAddr := range dht.DefaultBootstrapPeers { | |
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr) | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
if err := h.Connect(ctx, *peerinfo); err != nil { | |
fmt.Println("Bootstrap warning:", err, " :", peerinfo.String()) | |
} | |
}() | |
} | |
wg.Wait() | |
fmt.Println("Founded: ", len(h.Peerstore().Peers())) | |
return kademliaDHT | |
} | |
func (m *MobileLibp2p) GetData() { | |
fmt.Println("peer in store: ", len(m.host.Peerstore().Peers())) | |
for _, peerId := range m.host.Peerstore().Peers() { | |
fmt.Println("Peer: ", peerId, " connections: ", len(m.host.Network().ConnsToPeer(peerId))) | |
} | |
} | |
func (m *MobileLibp2p) Stop() { | |
closed = true | |
m.host.Close() | |
m.kademlia.Close() | |
m.relay.Close() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment