Last active
July 29, 2023 20:26
-
-
Save master255/aa897f74e02265e9d4877a0f306a7309 to your computer and use it in GitHub Desktop.
my code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mobile | |
import ( | |
"bufio" | |
"context" | |
"fmt" | |
ds "github.com/ipfs/go-datastore" | |
dsync "github.com/ipfs/go-datastore/sync" | |
golog "github.com/ipfs/go-log/v2" | |
"github.com/libp2p/go-libp2p" | |
dht "github.com/libp2p/go-libp2p-kad-dht" | |
"github.com/libp2p/go-libp2p/core/crypto" | |
"github.com/libp2p/go-libp2p/core/host" | |
"github.com/libp2p/go-libp2p/core/network" | |
"github.com/libp2p/go-libp2p/core/peer" | |
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing" | |
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util" | |
"github.com/libp2p/go-libp2p/p2p/net/swarm" | |
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client" | |
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay" | |
"github.com/libp2p/go-libp2p/p2p/security/noise" | |
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" | |
ma "github.com/multiformats/go-multiaddr" | |
"io" | |
"sync" | |
"time" | |
) | |
const randezvous = "example" | |
var dataReceived DataReceived | |
var closed bool | |
type MobileLibp2p struct { | |
host host.Host | |
kademlia *dht.IpfsDHT | |
peerId peer.ID | |
privateKey crypto.PrivKey | |
routing *drouting.RoutingDiscovery | |
ctx context.Context | |
} | |
type ReaderSendClose interface { | |
Close() | |
SendData([]byte) | |
} | |
type StreamStruct struct { | |
stream network.Stream | |
} | |
type DataReceived interface { | |
DataReceived([]byte, ReaderSendClose) | |
} | |
func (streamStruct StreamStruct) SendData(data []byte) { | |
wr := bufio.NewWriter(streamStruct.stream) | |
_, err := wr.Write(data) | |
if err != nil { | |
fmt.Println("Error writing to buffer") | |
} | |
err = wr.Flush() | |
if err != nil { | |
fmt.Println("Error flushing buffer") | |
} | |
fmt.Println("stream close2") | |
streamStruct.stream.Close() | |
} | |
func (streamStruct StreamStruct) Close() { | |
fmt.Println("stream close1") | |
streamStruct.stream.Close() | |
} | |
func handleStream(stream network.Stream) { | |
fmt.Println("Got a new stream!") | |
if dataReceived != nil { | |
fmt.Println("Start reader") | |
re := bufio.NewReader(stream) | |
fmt.Println("Start read") | |
buf, err := io.ReadAll(re) | |
fmt.Println("Start end read") | |
if err != nil { | |
fmt.Println("Error read: ", err) | |
} else { | |
var readerClose ReaderSendClose = StreamStruct{stream} | |
dataReceived.DataReceived(buf, readerClose) | |
return | |
} | |
} else { | |
fmt.Println("datareceived null") | |
} | |
fmt.Println("stream close") | |
stream.Close() | |
} | |
func StartLibp2p(privateKey []byte, port int) *MobileLibp2p { | |
golog.SetAllLoggers(golog.LevelError) | |
if closed { | |
fmt.Println("closed true!!!!!!!!!!!!!!") | |
} | |
var privKey crypto.PrivKey | |
if privateKey == nil { | |
privKey, _, _ = crypto.GenerateKeyPair( | |
crypto.Ed25519, | |
-1, | |
) | |
} else { | |
privKey, _ = crypto.UnmarshalPrivateKey(privateKey) | |
} | |
ctx := context.Background() | |
//connmgr, err := connmgr.NewConnManager( | |
// 100, // Lowwater | |
// 400, // HighWater, | |
// connmgr.WithGracePeriod(time.Minute), | |
//) | |
h, err := libp2p.New( | |
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port), fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port)), | |
libp2p.Identity(privKey), | |
libp2p.Security(libp2ptls.ID, libp2ptls.New), | |
libp2p.Security(noise.ID, noise.New), | |
libp2p.DefaultTransports, | |
libp2p.NATPortMap(), | |
libp2p.EnableRelay(), | |
libp2p.EnableHolePunching(), | |
libp2p.EnableRelayService(relay.WithInfiniteLimits()), | |
libp2p.UserAgent(randezvous)) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Println("MyID: ", h.ID()) | |
h.SetStreamHandler("/example/1.1.0", handleStream) | |
_, err = relay.New(h) | |
kademliaDHT := initDHT(ctx, h) | |
routingDiscovery := drouting.NewRoutingDiscovery(kademliaDHT) | |
dutil.Advertise(ctx, routingDiscovery, randezvous) | |
//go discoverPeers(ctx, h) | |
//ps, err := pubsub.NewGossipSub(ctx, h) | |
//if err != nil { | |
// panic(err) | |
//} | |
//topic, err := ps.Join(*topicNameFlag) | |
//if err != nil { | |
// panic(err) | |
//} | |
//go streamConsoleTo(ctx, topic, ert) | |
// | |
//fmt.Println("start subscribe") | |
//sub, err := topic.Subscribe() | |
//if err != nil { | |
// panic(err) | |
//} | |
//fmt.Println("start print") | |
//printMessagesFrom(ctx, sub) | |
return &MobileLibp2p{ | |
host: h, | |
kademlia: kademliaDHT, | |
peerId: h.ID(), | |
privateKey: privKey, | |
routing: routingDiscovery, | |
ctx: ctx, | |
} | |
} | |
func (m *MobileLibp2p) GetPrivateKey() []byte { | |
var privKey, _ = crypto.MarshalPrivateKey(m.privateKey) | |
return privKey | |
} | |
func (m *MobileLibp2p) GetNodeId() string { | |
return fmt.Sprint(m.peerId) | |
} | |
func (m *MobileLibp2p) InitialConnectPeers() bool { | |
peerChan, err := m.routing.FindPeers(m.ctx, randezvous) | |
if err != nil { | |
panic(err) | |
} | |
var i = 0 | |
for peer := range peerChan { | |
fmt.Println("Try peer: ", peer.ID) | |
if peer.ID == m.host.ID() { | |
fmt.Println("Skip. My ID") | |
continue | |
} else { | |
fmt.Println("Try connect") | |
err = m.host.Connect(m.ctx, peer) | |
if err != nil { | |
fmt.Println("Failed connecting to ", peer.ID, ", error:", err) | |
} else { | |
fmt.Println("Connected!") | |
_, _ = client.Reserve(m.ctx, m.host, peer) | |
i++ | |
//break //delete it in future | |
} | |
if closed { | |
return false | |
} | |
time.Sleep(time.Second) | |
} | |
if i > 5 { | |
return true | |
} | |
fmt.Println("Founded: ", peer.ID) | |
} | |
fmt.Println("Initial success") | |
return true | |
} | |
func (m *MobileLibp2p) ConnectSendData(peerId string, data []byte) bool { | |
peerChan, errorConnect := m.routing.FindPeers(m.ctx, randezvous) | |
if errorConnect != nil { | |
panic(errorConnect) | |
} | |
for peerCh := range peerChan { | |
fmt.Println("Try peer: ", peerCh.ID) | |
if fmt.Sprint(peerCh.ID) == peerId { | |
var u = 0 | |
for u < 120 { | |
fmt.Println("Try connect") | |
errorConnect = m.host.Connect(m.ctx, peerCh) | |
if errorConnect != nil { | |
fmt.Println("Failed connecting to ", peerId, ", error:", errorConnect) | |
var relay1info peer.AddrInfo | |
for peerChRelay := range peerChan { | |
if peerChRelay.ID.String() != peerId && peerChRelay.ID.String() != m.peerId.String() { | |
fmt.Println("Found relay! ", peerChRelay.ID.String()) | |
relay1info = peerChRelay | |
break | |
} | |
} | |
relayaddr, _ := ma.NewMultiaddr("/p2p/" + relay1info.ID.String() + "/p2p-circuit/p2p/" + peerId) | |
unreachable2relayinfo := peer.AddrInfo{ | |
ID: peerCh.ID, | |
Addrs: []ma.Multiaddr{relayaddr}, | |
} | |
errorConnect = m.host.Connect(m.ctx, unreachable2relayinfo) | |
} | |
if errorConnect != nil { | |
fmt.Println("Failed connecting1 to ", peerId, ", error:", errorConnect) | |
} else { | |
fmt.Println("Try open stream") | |
stream, err := m.host.NewStream(m.ctx, peerCh.ID, "/example/1.1.0") | |
if err != nil { | |
fmt.Println("Connection failed:", err) | |
} else { | |
fmt.Println("Connected to:", peerId) | |
wr := bufio.NewWriter(stream) | |
_, err := wr.Write(data) | |
if err != nil { | |
fmt.Println("Error writing to buffer") | |
continue | |
} | |
err = wr.Flush() | |
if err != nil { | |
fmt.Println("Error flushing buffer") | |
continue | |
} | |
stream.Close() | |
return true | |
} | |
} | |
if closed { | |
fmt.Println("CLOSED EXIT FROM PROGRAM") | |
return false | |
} | |
u++ | |
time.Sleep(time.Second) | |
} | |
} | |
fmt.Println("Founded1: ", peerCh.ID) | |
} | |
return false | |
} | |
func (m *MobileLibp2p) ConnectSendGetData(peerId string, data []byte) []byte { | |
peerChan, errorConnect := m.routing.FindPeers(m.ctx, randezvous) | |
if errorConnect != nil { | |
panic(errorConnect) | |
} | |
for peerCh := range peerChan { | |
fmt.Println("Try peer: ", peerCh.ID) | |
if fmt.Sprint(peerCh.ID) == peerId { | |
var u = 0 | |
for u < 120 { | |
fmt.Println("Try connect") | |
errorConnect = m.host.Connect(m.ctx, peerCh) | |
if errorConnect != nil { | |
m.host.Network().(*swarm.Swarm).Backoff().Clear(peerCh.ID) | |
fmt.Println("Failed connecting to ", peerId, ", error:", errorConnect) | |
var relay1info peer.AddrInfo | |
for peerChRelay := range peerChan { | |
if peerChRelay.ID.String() != peerId && peerChRelay.ID.String() != m.peerId.String() { | |
fmt.Println("Found relay! ", peerChRelay.ID.String()) | |
relay1info = peerChRelay | |
break | |
} | |
} | |
errorConnect = m.host.Connect(m.ctx, relay1info) | |
if errorConnect == nil { | |
fmt.Println("Connected to relay!") | |
} | |
relayaddr, _ := ma.NewMultiaddr("/p2p/" + relay1info.ID.String() + "/p2p-circuit/p2p/" + peerId) | |
unreachable2relayinfo := peer.AddrInfo{ | |
ID: peerCh.ID, | |
Addrs: []ma.Multiaddr{relayaddr}, | |
} | |
errorConnect = m.host.Connect(m.ctx, unreachable2relayinfo) | |
} | |
if errorConnect != nil { | |
fmt.Println("Failed connecting2 to ", peerId, ", error:", errorConnect) | |
} else { | |
fmt.Println("Try open stream") | |
stream, err := m.host.NewStream(m.ctx, peerCh.ID, "/example/1.1.0") | |
if err != nil { | |
fmt.Println("Connection failed:", err) | |
} else { | |
fmt.Println("Connected to:", peerId) | |
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) | |
_, err := rw.Write(data) | |
if err != nil { | |
fmt.Println("Error writing to buffer") | |
continue | |
} | |
fmt.Println("start flush") | |
err = rw.Flush() | |
fmt.Println("end flush") | |
if err != nil { | |
fmt.Println("Error flushing buffer") | |
continue | |
} | |
fmt.Println("start close") | |
stream.CloseWrite() | |
fmt.Println("end close") | |
buf, err := io.ReadAll(rw) | |
stream.Close() | |
if err != nil { | |
fmt.Println("Error read: ", err) | |
} else { | |
return buf | |
} | |
} | |
} | |
if closed { | |
return nil | |
} | |
u++ | |
time.Sleep(time.Second) | |
} | |
} | |
fmt.Println("Founded: ", peerCh.ID) | |
} | |
return nil | |
} | |
func WaitGetData(received DataReceived) { | |
dataReceived = received | |
} | |
func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT { | |
dstore := dsync.MutexWrap(ds.NewMapDatastore()) | |
kademliaDHT := dht.NewDHT(ctx, h, dstore) | |
if err := kademliaDHT.Bootstrap(ctx); err != nil { | |
panic(err) | |
} | |
var wg sync.WaitGroup | |
for _, peerAddr := range dht.DefaultBootstrapPeers { | |
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr) | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
if err := h.Connect(ctx, *peerinfo); err != nil { | |
fmt.Println("Bootstrap warning:", err, " :", peerinfo.String()) | |
} | |
}() | |
} | |
wg.Wait() | |
fmt.Println("Founded: ", len(h.Peerstore().Peers())) | |
return kademliaDHT | |
} | |
func (m *MobileLibp2p) GetData() { | |
fmt.Println("peer in store: ", len(m.host.Peerstore().Peers())) | |
for _, peerId := range m.host.Peerstore().Peers() { | |
fmt.Println("Peer: ", peerId, " connections: ", len(m.host.Network().ConnsToPeer(peerId))) | |
} | |
} | |
func (m *MobileLibp2p) Stop() { | |
closed = true | |
m.host.Close() | |
m.kademlia.Close() | |
} | |
//func streamConsoleTo(ctx context.Context, topic *pubsub.Topic, ert []byte) { | |
// var i = 0 | |
// for { | |
// if err := topic.Publish(ctx, []byte(strconv.Itoa(i))); err != nil { | |
// fmt.Println("### Publish error:", err) | |
// } | |
// i++ | |
// time.Sleep(time.Second) | |
// } | |
// | |
//func discoverPeers(ctx context.Context, h host.Host) { | |
// | |
// dutil.Advertise(ctx, routingDiscovery, "qqqqqq") | |
// | |
// // Look for others who have announced and attempt to connect to them | |
// anyConnected := false | |
// for !anyConnected { | |
// fmt.Println("Searching for peers...") | |
// peerChan, err := routingDiscovery.FindPeers(ctx, "qqqqqq") | |
// if err != nil { | |
// panic(err) | |
// } | |
// for peer := range peerChan { | |
// if peer.ID == h.ID() { | |
// continue // No self connection | |
// } | |
// err := h.Connect(ctx, peer) | |
// if err != nil { | |
// fmt.Println("Failed connecting to ", peer.ID.Pretty(), ", error:", err) | |
// } else { | |
// stream, err := h.NewStream(ctx, peer.ID, protocol.ID("/example/1.1.0")) | |
// if err != nil { | |
// fmt.Println("Connection failed:", err) | |
// continue | |
// } else { | |
// rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) | |
// | |
// go readData(rw) | |
// go writeData(rw) | |
// } | |
// fmt.Println("Connected to:", peer.ID.Pretty()) | |
// anyConnected = true | |
// } | |
// } | |
// time.Sleep(time.Second) | |
// } | |
// fmt.Println("Peer discovery complete") | |
//} | |
//func writeData(rw *bufio.ReadWriter) { | |
// var i = 0 | |
// for i < 15 { | |
// _, err := rw.WriteString(fmt.Sprintf("%s\n", "1111111")) | |
// if err != nil { | |
// fmt.Println("Error writing to buffer") | |
// panic(err) | |
// } | |
// err = rw.Flush() | |
// if err != nil { | |
// fmt.Println("Error flushing buffer") | |
// panic(err) | |
// } | |
// i++ | |
// } | |
//} | |
//func readData(rw *bufio.ReadWriter) { | |
// for { | |
// str, err := rw.ReadString('\n') | |
// if err != nil { | |
// fmt.Println("Error reading from buffer") | |
// panic(err) | |
// } | |
// | |
// if str == "" { | |
// return | |
// } | |
// if str != "\n" { | |
// // Green console colour: \x1b[32m | |
// // Reset console colour: \x1b[0m | |
// fmt.Printf("\x1b[32m%s\x1b[0m> ", str) | |
// } | |
// | |
// } | |
//} | |
// | |
//func printMessagesFrom(ctx context.Context, sub *pubsub.Subscription) { | |
// for { | |
// m, err := sub.Next(ctx) | |
// if err != nil { | |
// panic(err) | |
// } | |
// fmt.Println(m.ReceivedFrom, ": ", string(m.Message.Data)) | |
// } | |
// fmt.Println("exit sub") | |
//} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment