Skip to content

Instantly share code, notes, and snippets.

@b5
Last active June 22, 2020 21:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save b5/f7481bea8c7dc77ec1895d7df0a4e432 to your computer and use it in GitHub Desktop.
Save b5/f7481bea8c7dc77ec1895d7df0a4e432 to your computer and use it in GitHub Desktop.
"local" dsync
package main
import (
"bufio"
"context"
"os"
"path/filepath"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/qri-io/dag/dsync"
"github.com/qri-io/qfs/qipfs"
)
const listenAddr = "http://localhost:123456/dsync"
func main() {
ctx, cancel := context.Background()
defer cancel()
// make a filesystem from the existing node
aFS, err := qipfs.NewFilesystem(ctx, map[string]interface{}{
"path": os.Getenv("MY_IPFS_PATH"),
})
if err != nil {
panic(err)
}
a := fs.(*qipfs.Filestore).IPFSCoreAPI()
// init a new IPFS repo at "./ipfs"
newRepoPath := filepath.Join(os.Getwd(), "ipfs")
if err := qipfs.InitRepo(newRepoPath, ""); err != nil {
panic(err)
}
bFS, err = qipfs.NewFilesystem(ctx, map[string]interface{}{
"path": newRepoPath,
})
if err != nil {
panic(err)
}
b := fs.(*qipfs.Filestore).IPFSCoreAPI()
aDsync, bDsync, err := setupDsync(a, b)
if err != nil {
panic(er)
}
// scan a file of line-delimied root hashes
file, err := os.Open("../ipfs_hashes.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
fmt.Printf("copying %q\n", scanner.Text())
if err := cpHash(scanner.Text(), aDsync); err != nil {
panic(err)
}
}
if err := scanner.Err(); err != nil {
panic(err)
}
fmt.Printf("created repo at %q", newRepoPath)
}
func cpHash(path string, source *dsync.Dsync) error {
// Create a Push:
push, err := source.NewPush(path, listenAddr, true)
if err != nil {
return err
}
// We want to see progress, so we spin up a goroutine to listen for updates
waitForFmt := make(chan struct{})
go func() {
updates := push.Updates()
for {
select {
case update := <-updates:
if update.Complete() {
fmt.Println("done!")
waitForFmt <- struct{}{}
}
case <-ctx.Done():
// don't leak goroutines
waitForFmt <- struct{}{}
return
}
}
}()
// Do the push
if err := push.Do(ctx); err != nil {
return err
}
// block until updates has had a chance to print
<-waitForFmt
}
func setupDsync(a, b coreiface.CoreAPI) (a, b dsync.Dsync, err error) {
// make a localNodeGetter, when performing dsync we don't want to fetch
// blocks from the dweb
aNG, err := dsync.NewLocalNodeGetter(a)
if err != nil {
return nil, nil, err
}
// make a localNodeGetter, when performing dsync we don't want to fetch
// blocks from the dweb
bNG, err := dsync.NewLocalNodeGetter(b)
if err != nil {
return nil, nil, err
}
aDsync, err := dsync.New(aNG, a.Block())
if err != nil {
return nil, nil, err
}
// create the remote instance, configuring it to accept DAGs
bDsync, err := dsync.New(aNG, a.Block(), func(cfg *Config) {
// configure the remote listening address:
cfg.HTTPRemoteAddress = listenAddr
// we MUST override the PreCheck function
cfg.PushPreCheck = func(ctx context.Context, info dag.Info, _ map[string]string) error {
return nil
}
// in order for remotes to allow pinning, they must be provided a PinAPI:
cfg.PinAPI = b.Pin()
})
if err != nil {
return nil, nil, err
}
// start listening for remote pushes & pulls. This should be long running,
// like a server. Cancel the provided context to close
if err = bDsync.StartRemote(ctx); err != nil {
return nil, nil, err
}
return aDsync, nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment