Created
June 15, 2023 13:30
-
-
Save master255/bdf351e32cb10aa900ac6b80c79d90e0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package mobile | |
import ( | |
"bufio" | |
"context" | |
"fmt" | |
ds "github.com/ipfs/go-datastore" | |
dsync "github.com/ipfs/go-datastore/sync" | |
golog "github.com/ipfs/go-log/v2" | |
"github.com/libp2p/go-libp2p" | |
dht "github.com/libp2p/go-libp2p-kad-dht" | |
"github.com/libp2p/go-libp2p/core/crypto" | |
"github.com/libp2p/go-libp2p/core/discovery" | |
"github.com/libp2p/go-libp2p/core/host" | |
"github.com/libp2p/go-libp2p/core/network" | |
"github.com/libp2p/go-libp2p/core/peer" | |
drouting "github.com/libp2p/go-libp2p/p2p/discovery/routing" | |
dutil "github.com/libp2p/go-libp2p/p2p/discovery/util" | |
"io" | |
"sync" | |
"time" | |
) | |
const randezvous = "example777" | |
var dataReceived DataReceived | |
var closed bool | |
type MobileLibp2p struct { | |
host host.Host | |
kademlia *dht.IpfsDHT | |
peerId peer.ID | |
privateKey crypto.PrivKey | |
routing *drouting.RoutingDiscovery | |
ctx context.Context | |
} | |
type ReaderSendClose interface { | |
Close() | |
SendData([]byte) | |
} | |
type StreamStruct struct { | |
stream network.Stream | |
} | |
type DataReceived interface { | |
DataReceived([]byte, ReaderSendClose) | |
} | |
func (streamStruct StreamStruct) SendData(data []byte) { | |
wr := bufio.NewWriter(streamStruct.stream) | |
_, err := wr.Write(data) | |
if err != nil { | |
fmt.Println("Error writing to buffer") | |
} | |
err = wr.Flush() | |
if err != nil { | |
fmt.Println("Error flushing buffer") | |
} | |
fmt.Println("stream close2") | |
streamStruct.stream.Close() | |
} | |
func (streamStruct StreamStruct) Close() { | |
fmt.Println("stream close1") | |
streamStruct.stream.Close() | |
} | |
func handleStream(stream network.Stream) { | |
fmt.Println("Got a new stream!") | |
if dataReceived != nil { | |
fmt.Println("Start reader") | |
re := bufio.NewReader(stream) | |
fmt.Println("Start read") | |
buf, err := io.ReadAll(re) | |
fmt.Println("Start end read") | |
if err != nil { | |
fmt.Println("Error read: ", err) | |
} else { | |
var readerClose ReaderSendClose = StreamStruct{stream} | |
dataReceived.DataReceived(buf, readerClose) | |
return | |
} | |
} else { | |
fmt.Println("datareceived null") | |
} | |
fmt.Println("stream close") | |
stream.Close() | |
} | |
func StartLibp2p(privateKey []byte, port int) *MobileLibp2p { | |
golog.SetAllLoggers(golog.LevelError) | |
closed = false | |
var privKey crypto.PrivKey | |
if privateKey == nil { | |
privKey, _, _ = crypto.GenerateKeyPair( | |
crypto.Ed25519, | |
-1, | |
) | |
} else { | |
privKey, _ = crypto.UnmarshalPrivateKey(privateKey) | |
} | |
ctx := context.Background() | |
//connmgr, err := connmgr.NewConnManager( | |
// 100, // Lowwater | |
// 400, // HighWater, | |
// connmgr.WithGracePeriod(time.Minute), | |
//) | |
h, err := libp2p.New( | |
libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", port)), | |
libp2p.Identity(privKey), | |
libp2p.NATPortMap(), | |
libp2p.UserAgent(randezvous)) | |
if err != nil { | |
panic(err) | |
} | |
fmt.Println("MyID: ", h.ID()) | |
h.SetStreamHandler("/protocol/1.1.0", handleStream) | |
kademliaDHT := initDHT(ctx, h) | |
routingDiscovery := drouting.NewRoutingDiscovery(kademliaDHT) | |
dutil.Advertise(ctx, routingDiscovery, randezvous, discovery.TTL(time.Hour/2)) | |
//go discoverPeers(ctx, h) | |
//ps, err := pubsub.NewGossipSub(ctx, h) | |
//if err != nil { | |
// panic(err) | |
//} | |
//topic, err := ps.Join(*topicNameFlag) | |
//if err != nil { | |
// panic(err) | |
//} | |
//go streamConsoleTo(ctx, topic, ert) | |
// | |
//fmt.Println("start subscribe") | |
//sub, err := topic.Subscribe() | |
//if err != nil { | |
// panic(err) | |
//} | |
//fmt.Println("start print") | |
//printMessagesFrom(ctx, sub) | |
return &MobileLibp2p{ | |
host: h, | |
kademlia: kademliaDHT, | |
peerId: h.ID(), | |
privateKey: privKey, | |
routing: routingDiscovery, | |
ctx: ctx, | |
} | |
} | |
func (m *MobileLibp2p) GetPrivateKey() []byte { | |
var privKey, _ = crypto.MarshalPrivateKey(m.privateKey) | |
return privKey | |
} | |
func (m *MobileLibp2p) GetNodeId() string { | |
return fmt.Sprint(m.peerId) | |
} | |
func (m *MobileLibp2p) InitialConnectPeers() bool { | |
peerChan, err := m.routing.FindPeers(m.ctx, randezvous) | |
if err != nil { | |
panic(err) | |
} | |
var i = 0 | |
for peer := range peerChan { | |
fmt.Println("Try peer: ", peer.ID) | |
if peer.ID == m.host.ID() { | |
continue | |
} else { | |
fmt.Println("Try connect") | |
err = m.host.Connect(m.ctx, peer) | |
if err != nil { | |
fmt.Println("Failed connecting to ", peer.ID, ", error:", err) | |
} else { | |
fmt.Println("Connected!") | |
i++ | |
break | |
} | |
if closed { | |
return false | |
} | |
time.Sleep(time.Second * 6) | |
} | |
if i > 5 { | |
return true | |
} | |
fmt.Println("Founded: ", peer.ID) | |
} | |
fmt.Println("Initial success") | |
return true | |
} | |
func (m *MobileLibp2p) ConnectSendData(peerId string, data []byte) bool { | |
peerChan, err := m.routing.FindPeers(m.ctx, randezvous) | |
if err != nil { | |
panic(err) | |
} | |
for peer := range peerChan { | |
fmt.Println("Try peer: ", peer.ID) | |
if peer.ID == m.host.ID() { | |
continue // No self connection | |
} else if fmt.Sprint(peer.ID) == peerId { | |
var u = 0 | |
for u < 120 { | |
fmt.Println("Try connect") | |
err = m.host.Connect(m.ctx, peer) | |
if err != nil { | |
fmt.Println("Failed connecting to ", peerId, ", error:", err) | |
} else { | |
fmt.Println("Try open stream") | |
stream, err := m.host.NewStream(m.ctx, peer.ID, "/protocol/1.1.0") | |
if err != nil { | |
fmt.Println("Connection failed:", err) | |
} else { | |
fmt.Println("Connected to:", peerId) | |
wr := bufio.NewWriter(stream) | |
_, err := wr.Write(data) | |
if err != nil { | |
fmt.Println("Error writing to buffer") | |
continue | |
} | |
err = wr.Flush() | |
if err != nil { | |
fmt.Println("Error flushing buffer") | |
continue | |
} | |
stream.Close() | |
return true | |
} | |
} | |
if closed { | |
fmt.Println("CLOSED EXIT FROM PROGRAM") | |
return false | |
} | |
u++ | |
time.Sleep(time.Second * 6) | |
} | |
} | |
fmt.Println("Founded: ", peer.ID) | |
} | |
return false | |
} | |
func (m *MobileLibp2p) ConnectSendGetData(peerId string, data []byte) []byte { | |
peerChan, err := m.routing.FindPeers(m.ctx, randezvous) | |
if err != nil { | |
panic(err) | |
} | |
for peer := range peerChan { | |
fmt.Println("Try peer: ", peer.ID) | |
if peer.ID == m.host.ID() { | |
continue // No self connection | |
} else if fmt.Sprint(peer.ID) == peerId { | |
var u = 0 | |
for u < 120 { | |
fmt.Println("Try connect") | |
err = m.host.Connect(m.ctx, peer) | |
if err != nil { | |
fmt.Println("Failed connecting to ", peerId, ", error:", err) | |
} else { | |
fmt.Println("Try open stream") | |
stream, err := m.host.NewStream(m.ctx, peer.ID, "/protocol/1.1.0") | |
if err != nil { | |
fmt.Println("Connection failed:", err) | |
} else { | |
fmt.Println("Connected to:", peerId) | |
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) | |
_, err := rw.Write(data) | |
if err != nil { | |
fmt.Println("Error writing to buffer") | |
continue | |
} | |
fmt.Println("start flush") | |
err = rw.Flush() | |
fmt.Println("end flush") | |
if err != nil { | |
fmt.Println("Error flushing buffer") | |
continue | |
} | |
fmt.Println("start close") | |
stream.CloseWrite() | |
fmt.Println("end close") | |
buf, err := io.ReadAll(rw) | |
stream.Close() | |
if err != nil { | |
fmt.Println("Error read: ", err) | |
} else { | |
return buf | |
} | |
} | |
} | |
if closed { | |
return nil | |
} | |
u++ | |
time.Sleep(time.Second * 6) | |
} | |
} | |
fmt.Println("Founded: ", peer.ID) | |
} | |
return nil | |
} | |
func WaitGetData(received DataReceived) { | |
dataReceived = received | |
} | |
func initDHT(ctx context.Context, h host.Host) *dht.IpfsDHT { | |
dstore := dsync.MutexWrap(ds.NewMapDatastore()) | |
kademliaDHT := dht.NewDHT(ctx, h, dstore) | |
if err := kademliaDHT.Bootstrap(ctx); err != nil { | |
panic(err) | |
} | |
var wg sync.WaitGroup | |
for _, peerAddr := range dht.DefaultBootstrapPeers { | |
peerinfo, _ := peer.AddrInfoFromP2pAddr(peerAddr) | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
if err := h.Connect(ctx, *peerinfo); err != nil { | |
fmt.Println("Bootstrap warning:", err, " :", peerinfo.String()) | |
} | |
}() | |
} | |
wg.Wait() | |
fmt.Println("Founded: ", len(h.Peerstore().Peers())) | |
return kademliaDHT | |
} | |
func (m *MobileLibp2p) GetData() { | |
fmt.Println("peer in store: ", len(m.host.Peerstore().Peers())) | |
for _, peerId := range m.host.Peerstore().Peers() { | |
fmt.Println("Peer: ", peerId, " connections: ", len(m.host.Network().ConnsToPeer(peerId))) | |
} | |
} | |
func (m *MobileLibp2p) Stop() { | |
closed = true | |
m.host.Close() | |
m.kademlia.Close() | |
} | |
//func streamConsoleTo(ctx context.Context, topic *pubsub.Topic, ert []byte) { | |
// var i = 0 | |
// for { | |
// if err := topic.Publish(ctx, []byte(strconv.Itoa(i))); err != nil { | |
// fmt.Println("### Publish error:", err) | |
// } | |
// i++ | |
// time.Sleep(time.Second) | |
// } | |
// | |
//func discoverPeers(ctx context.Context, h host.Host) { | |
// | |
// dutil.Advertise(ctx, routingDiscovery, "qqqqqq") | |
// | |
// // Look for others who have announced and attempt to connect to them | |
// anyConnected := false | |
// for !anyConnected { | |
// fmt.Println("Searching for peers...") | |
// peerChan, err := routingDiscovery.FindPeers(ctx, "qqqqqq") | |
// if err != nil { | |
// panic(err) | |
// } | |
// for peer := range peerChan { | |
// if peer.ID == h.ID() { | |
// continue // No self connection | |
// } | |
// err := h.Connect(ctx, peer) | |
// if err != nil { | |
// fmt.Println("Failed connecting to ", peer.ID.Pretty(), ", error:", err) | |
// } else { | |
// stream, err := h.NewStream(ctx, peer.ID, protocol.ID("/protocol/1.1.0")) | |
// if err != nil { | |
// fmt.Println("Connection failed:", err) | |
// continue | |
// } else { | |
// rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) | |
// | |
// go readData(rw) | |
// go writeData(rw) | |
// } | |
// fmt.Println("Connected to:", peer.ID.Pretty()) | |
// anyConnected = true | |
// } | |
// } | |
// time.Sleep(time.Second) | |
// } | |
// fmt.Println("Peer discovery complete") | |
//} | |
//func writeData(rw *bufio.ReadWriter) { | |
// var i = 0 | |
// for i < 15 { | |
// _, err := rw.WriteString(fmt.Sprintf("%s\n", "1111111")) | |
// if err != nil { | |
// fmt.Println("Error writing to buffer") | |
// panic(err) | |
// } | |
// err = rw.Flush() | |
// if err != nil { | |
// fmt.Println("Error flushing buffer") | |
// panic(err) | |
// } | |
// i++ | |
// } | |
//} | |
//func readData(rw *bufio.ReadWriter) { | |
// for { | |
// str, err := rw.ReadString('\n') | |
// if err != nil { | |
// fmt.Println("Error reading from buffer") | |
// panic(err) | |
// } | |
// | |
// if str == "" { | |
// return | |
// } | |
// if str != "\n" { | |
// // Green console colour: \x1b[32m | |
// // Reset console colour: \x1b[0m | |
// fmt.Printf("\x1b[32m%s\x1b[0m> ", str) | |
// } | |
// | |
// } | |
//} | |
// | |
//func printMessagesFrom(ctx context.Context, sub *pubsub.Subscription) { | |
// for { | |
// m, err := sub.Next(ctx) | |
// if err != nil { | |
// panic(err) | |
// } | |
// fmt.Println(m.ReceivedFrom, ": ", string(m.Message.Data)) | |
// } | |
// fmt.Println("exit sub") | |
//} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment