Skip to content

Instantly share code, notes, and snippets.

@master255
Last active July 29, 2023 20:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save master255/aa897f74e02265e9d4877a0f306a7309 to your computer and use it in GitHub Desktop.
Save master255/aa897f74e02265e9d4877a0f306a7309 to your computer and use it in GitHub Desktop.
my code
package mobile
import (
"bufio"
"context"
"fmt"
ds "github.com/ipfs/go-datastore"
dsync "github.com/ipfs/go-datastore/sync"
golog "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing"
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/security/noise"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
ma "github.com/multiformats/go-multiaddr"
"io"
"sync"
"time"
)
const randezvous = "example"
var dataReceived DataReceived
var closed bool
type MobileLibp2p struct {
host host.Host
kademlia *dht.IpfsDHT
peerId peer.ID
privateKey crypto.PrivKey
routing *drouting.RoutingDiscovery
ctx context.Context
}
type ReaderSendClose interface {
Close()
SendData([]byte)
}
type StreamStruct struct {
stream network.Stream
}
type DataReceived interface {
DataReceived([]byte, ReaderSendClose)
}
func (streamStruct StreamStruct) SendData(data []byte) {
wr := bufio.NewWriter(streamStruct.stream)
_, err := wr.Write(data)
if err != nil {
fmt.Println("Error writing to buffer")
}
err = wr.Flush()
if err != nil {
fmt.Println("Error flushing buffer")
}
fmt.Println("stream close2")
streamStruct.stream.Close()
}
func (streamStruct StreamStruct) Close() {
fmt.Println("stream close1")
streamStruct.stream.Close()
}
func handleStream(stream network.Stream) {
fmt.Println("Got a new stream!")
if dataReceived != nil {
fmt.Println("Start reader")
re := bufio.NewReader(stream)
fmt.Println("Start read")
buf, err := io.ReadAll(re)
fmt.Println("Start end read")
if err != nil {
fmt.Println("Error read: ", err)
} else {
var readerClose ReaderSendClose = StreamStruct{stream}
dataReceived.DataReceived(buf, readerClose)
return
}
} else {
fmt.Println("datareceived null")
}
fmt.Println("stream close")
stream.Close()
}
func StartLibp2p(privateKey []byte, port int) *MobileLibp2p {
golog.SetAllLoggers(golog.LevelError)
if closed {
fmt.Println("closed true!!!!!!!!!!!!!!")
}
var privKey crypto.PrivKey
if privateKey == nil {
privKey, _, _ = crypto.GenerateKeyPair(
crypto.Ed25519,
-1,
)
} else {
privKey, _ = crypto.UnmarshalPrivateKey(privateKey)
}
ctx := context.Background()
//connmgr, err := connmgr.NewConnManager(
// 100, // Lowwater
// 400, // HighWater,
// connmgr.WithGracePeriod(time.Minute),
//)
h, err := libp2p.New(
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port), fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic", port)),
libp2p.Identity(privKey),
libp2p.Security(libp2ptls.ID, libp2ptls.New),
libp2p.Security(noise.ID, noise.New),
libp2p.DefaultTransports,
libp2p.NATPortMap(),
libp2p.EnableRelay(),
libp2p.EnableHolePunching(),
libp2p.EnableRelayService(relay.WithInfiniteLimits()),
libp2p.UserAgent(randezvous))
if err != nil {
panic(err)
}
fmt.Println("MyID: ", h.ID())
h.SetStreamHandler("/example/1.1.0", handleStream)
_, err = relay.New(h)
kademliaDHT := initDHT(ctx, h)
routingDiscovery := drouting.NewRoutingDiscovery(kademliaDHT)
dutil.Advertise(ctx, routingDiscovery, randezvous)
//go discoverPeers(ctx, h)
//ps, err := pubsub.NewGossipSub(ctx, h)
//if err != nil {
// panic(err)
//}
//topic, err := ps.Join(*topicNameFlag)
//if err != nil {
// panic(err)
//}
//go streamConsoleTo(ctx, topic, ert)
//
//fmt.Println("start subscribe")
//sub, err := topic.Subscribe()
//if err != nil {
// panic(err)
//}
//fmt.Println("start print")
//printMessagesFrom(ctx, sub)
return &MobileLibp2p{
host: h,
kademlia: kademliaDHT,
peerId: h.ID(),
privateKey: privKey,
routing: routingDiscovery,
ctx: ctx,
}
}
func (m *MobileLibp2p) GetPrivateKey() []byte {
var privKey, _ = crypto.MarshalPrivateKey(m.privateKey)
return privKey
}
func (m *MobileLibp2p) GetNodeId() string {
return fmt.Sprint(m.peerId)
}
func (m *MobileLibp2p) InitialConnectPeers() bool {
peerChan, err := m.routing.FindPeers(m.ctx, randezvous)
if err != nil {
panic(err)
}
var i = 0
for peer := range peerChan {
fmt.Println("Try peer: ", peer.ID)
if peer.ID == m.host.ID() {
fmt.Println("Skip. My ID")
continue
} else {
fmt.Println("Try connect")
err = m.host.Connect(m.ctx, peer)
if err != nil {
fmt.Println("Failed connecting to ", peer.ID, ", error:", err)
} else {
fmt.Println("Connected!")
_, _ = client.Reserve(m.ctx, m.host, peer)
i++
//break //delete it in future
}
if closed {
return false
}
time.Sleep(time.Second)
}
if i > 5 {
return true
}
fmt.Println("Founded: ", peer.ID)
}
fmt.Println("Initial success")
return true
}
func (m *MobileLibp2p) ConnectSendData(peerId string, data []byte) bool {
peerChan, errorConnect := m.routing.FindPeers(m.ctx, randezvous)
if errorConnect != nil {
panic(errorConnect)
}
for peerCh := range peerChan {
fmt.Println("Try peer: ", peerCh.ID)
if fmt.Sprint(peerCh.ID) == peerId {
var u = 0
for u < 120 {
fmt.Println("Try connect")
errorConnect = m.host.Connect(m.ctx, peerCh)
if errorConnect != nil {
fmt.Println("Failed connecting to ", peerId, ", error:", errorConnect)
var relay1info peer.AddrInfo
for peerChRelay := range peerChan {
if peerChRelay.ID.String() != peerId && peerChRelay.ID.String() != m.peerId.String() {
fmt.Println("Found relay! ", peerChRelay.ID.String())
relay1info = peerChRelay
break
}
}
relayaddr, _ := ma.NewMultiaddr("/p2p/" + relay1info.ID.String() + "/p2p-circuit/p2p/" + peerId)
unreachable2relayinfo := peer.AddrInfo{
ID: peerCh.ID,
Addrs: []ma.Multiaddr{relayaddr},
}
errorConnect = m.host.Connect(m.ctx, unreachable2relayinfo)
}
if errorConnect != nil {
fmt.Println("Failed connecting1 to ", peerId, ", error:", errorConnect)
} else {
fmt.Println("Try open stream")
stream, err := m.host.NewStream(m.ctx, peerCh.ID, "/example/1.1.0")
if err != nil {
fmt.Println("Connection failed:", err)
} else {
fmt.Println("Connected to:", peerId)
wr := bufio.NewWriter(stream)
_, err := wr.Write(data)
if err != nil {
fmt.Println("Error writing to buffer")
continue
}
err = wr.Flush()
if err != nil {
fmt.Println("Error flushing buffer")
continue
}
stream.Close()
return true
}
}
if closed {
fmt.Println("CLOSED EXIT FROM PROGRAM")
return false
}
u++
time.Sleep(time.Second)
}
}
fmt.Println("Founded1: ", peerCh.ID)
}
return false
}
func (m *MobileLibp2p) ConnectSendGetData(peerId string, data []byte) []byte {
peerChan, errorConnect := m.routing.FindPeers(m.ctx, randezvous)
if errorConnect != nil {
panic(errorConnect)
}
for peerCh := range peerChan {
fmt.Println("Try peer: ", peerCh.ID)
if fmt.Sprint(peerCh.ID) == peerId {
var u = 0
for u < 120 {
fmt.Println("Try connect")
errorConnect = m.host.Connect(m.ctx, peerCh)
if errorConnect != nil {
m.host.Network().(*swarm.Swarm).Backoff().Clear(peerCh.ID)
fmt.Println("Failed connecting to ", peerId, ", error:", errorConnect)
var relay1info peer.AddrInfo
for peerChRelay := range peerChan {
if peerChRelay.ID.String() != peerId && peerChRelay.ID.String() != m.peerId.String() {
fmt.Println("Found relay! ", peerChRelay.ID.String())
relay1info = peerChRelay
break
}
}
errorConnect = m.host.Connect(m.ctx, relay1info)
if errorConnect == nil {
fmt.Println("Connected to relay!")
}
relayaddr, _ := ma.NewMultiaddr("/p2p/" + relay1info.ID.String() + "/p2p-circuit/p2p/" + peerId)
unreachable2relayinfo := peer.AddrInfo{
ID: peerCh.ID,
Addrs: []ma.Multiaddr{relayaddr},
}
errorConnect = m.host.Connect(m.ctx, unreachable2relayinfo)
}
if errorConnect != nil {
fmt.Println("Failed connecting2 to ", peerId, ", error:", errorConnect)
} else {
fmt.Println("Try open stream")
stream, err := m.host.NewStream(m.ctx, peerCh.ID, "/example/1.1.0")
if err != nil {
fmt.Println("Connection failed:", err)
} else {
fmt.Println("Connected to:", peerId)
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
_, err := rw.Write(data)
if err != nil {
fmt.Println("Error writing to buffer")
continue
}
fmt.Println("start flush")
err = rw.Flush()
fmt.Println("end flush")
if err != nil {
fmt.Println("Error flushing buffer")
continue
}
fmt.Println("start close")
stream.CloseWrite()
fmt.Println("end close")
buf, err := io.ReadAll(rw)
stream.Close()
if err != nil {
fmt.Println("Error read: ", err)
} else {
return buf
}
}
}
if closed {
return nil
}
u++
time.Sleep(time.Second)
}
}
fmt.Println("Founded: ", peerCh.ID)
}
return nil
}
func WaitGetData(received DataReceived) {
dataReceived = received
}
func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT {
dstore := dsync.MutexWrap(ds.NewMapDatastore())
kademliaDHT := dht.NewDHT(ctx, h, dstore)
if err := kademliaDHT.Bootstrap(ctx); err != nil {
panic(err)
}
var wg sync.WaitGroup
for _, peerAddr := range dht.DefaultBootstrapPeers {
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr)
wg.Add(1)
go func() {
defer wg.Done()
if err := h.Connect(ctx, *peerinfo); err != nil {
fmt.Println("Bootstrap warning:", err, " :", peerinfo.String())
}
}()
}
wg.Wait()
fmt.Println("Founded: ", len(h.Peerstore().Peers()))
return kademliaDHT
}
func (m *MobileLibp2p) GetData() {
fmt.Println("peer in store: ", len(m.host.Peerstore().Peers()))
for _, peerId := range m.host.Peerstore().Peers() {
fmt.Println("Peer: ", peerId, " connections: ", len(m.host.Network().ConnsToPeer(peerId)))
}
}
func (m *MobileLibp2p) Stop() {
closed = true
m.host.Close()
m.kademlia.Close()
}
//func streamConsoleTo(ctx context.Context, topic *pubsub.Topic, ert []byte) {
// var i = 0
// for {
// if err := topic.Publish(ctx, []byte(strconv.Itoa(i))); err != nil {
// fmt.Println("### Publish error:", err)
// }
// i++
// time.Sleep(time.Second)
// }
//
//func discoverPeers(ctx context.Context, h host.Host) {
//
// dutil.Advertise(ctx, routingDiscovery, "qqqqqq")
//
// // Look for others who have announced and attempt to connect to them
// anyConnected := false
// for !anyConnected {
// fmt.Println("Searching for peers...")
// peerChan, err := routingDiscovery.FindPeers(ctx, "qqqqqq")
// if err != nil {
// panic(err)
// }
// for peer := range peerChan {
// if peer.ID == h.ID() {
// continue // No self connection
// }
// err := h.Connect(ctx, peer)
// if err != nil {
// fmt.Println("Failed connecting to ", peer.ID.Pretty(), ", error:", err)
// } else {
// stream, err := h.NewStream(ctx, peer.ID, protocol.ID("/example/1.1.0"))
// if err != nil {
// fmt.Println("Connection failed:", err)
// continue
// } else {
// rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
//
// go readData(rw)
// go writeData(rw)
// }
// fmt.Println("Connected to:", peer.ID.Pretty())
// anyConnected = true
// }
// }
// time.Sleep(time.Second)
// }
// fmt.Println("Peer discovery complete")
//}
//func writeData(rw *bufio.ReadWriter) {
// var i = 0
// for i < 15 {
// _, err := rw.WriteString(fmt.Sprintf("%s\n", "1111111"))
// if err != nil {
// fmt.Println("Error writing to buffer")
// panic(err)
// }
// err = rw.Flush()
// if err != nil {
// fmt.Println("Error flushing buffer")
// panic(err)
// }
// i++
// }
//}
//func readData(rw *bufio.ReadWriter) {
// for {
// str, err := rw.ReadString('\n')
// if err != nil {
// fmt.Println("Error reading from buffer")
// panic(err)
// }
//
// if str == "" {
// return
// }
// if str != "\n" {
// // Green console colour: \x1b[32m
// // Reset console colour: \x1b[0m
// fmt.Printf("\x1b[32m%s\x1b[0m> ", str)
// }
//
// }
//}
//
//func printMessagesFrom(ctx context.Context, sub *pubsub.Subscription) {
// for {
// m, err := sub.Next(ctx)
// if err != nil {
// panic(err)
// }
// fmt.Println(m.ReceivedFrom, ": ", string(m.Message.Data))
// }
// fmt.Println("exit sub")
//}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment