Skip to content

Instantly share code, notes, and snippets.

@so-jelly
Created July 20, 2023 15:49
Show Gist options
  • Save so-jelly/75ce5a24e1f9275716a29a6b2cc930ee to your computer and use it in GitHub Desktop.
Save so-jelly/75ce5a24e1f9275716a29a6b2cc930ee to your computer and use it in GitHub Desktop.
ftp client
package main
import (
"context"
"fmt"
"io"
"log"
"path/filepath"
"sync"
"cloud.google.com/go/storage"
"github.com/jlaffaye/ftp"
"google.golang.org/api/googleapi"
)
// FTPConfig represents the configuration for FTP connection.
type FTPConfig struct {
Server string
User string
Password string
Path string
}
// GCSConfig represents the configuration for GCS connection.
type GCSConfig struct {
BucketName string
BucketPath string
}
func main() {
// FTP configuration
ftpConfig := FTPConfig{
// fill these in
}
// GCS configuration
gcsConfig := GCSConfig{
//these too
}
ctx := context.Background()
// Create a GCS client
gcsClient, err := storage.NewClient(ctx)
if err != nil {
log.Fatalf("GCS client creation error: %v", err)
}
defer gcsClient.Close()
// Connect to FTP server
ftpClient, err := connectFTP(ftpConfig.Server)
if err != nil {
log.Fatalf("FTP connection error: %v", err)
}
defer ftpClient.Quit()
// Login to the FTP server
if err := ftpClient.Login(ftpConfig.User, ftpConfig.Password); err != nil {
log.Fatalf("FTP login error: %v", err)
}
// Define the number of parallel tasks you want to run
numParallelTasks := 10
// Create a buffered channel with a capacity equal to the number of parallel tasks
semaphore := make(chan struct{}, numParallelTasks)
// Create a wait group to wait for all tasks to finish
var wg sync.WaitGroup
w := ftpClient.Walk("/")
for w.Next() {
if w.Err() != nil {
log.Fatalln("error walking ftp server", err)
}
if w.Stat().Type == ftp.EntryTypeFile {
wg.Add(1) //
semaphore <- struct{}{} // Acquire a semaphore
go func(fileName string) {
defer func() {
<-semaphore // Release the semaphore to allow another task to run
wg.Done() // Mark the task as completed in the wait group
}()
// Connect to FTP server
// make a new connection for goroutine
ftpClient, err := connectFTP(ftpConfig.Server)
if err != nil {
log.Fatalf("FTP connection error: %v", err)
}
defer ftpClient.Quit()
// Login to the FTP server
if err := ftpClient.Login(ftpConfig.User, ftpConfig.Password); err != nil {
log.Fatalf("FTP login error: %v", err)
}
log.Println("found file:", fileName)
objectName := gcsConfig.BucketPath + filepath.Base(fileName)
have, _ := checkFileExists(ctx, gcsClient, gcsConfig.BucketName, objectName)
if have {
log.Println("already have")
return
}
// Download file from FTP
content, err := downloadFTPFile(ftpClient, fileName)
if err != nil {
log.Printf("FTP file download error for %s: %v", fileName, err)
}
// Upload to GCS
if err := uploadToGCS(ctx, gcsClient, gcsConfig.BucketName, objectName, content); err != nil {
log.Printf("GCS file upload error for %s: %v", w.Stat().Name, err)
} else {
log.Printf("File %s uploaded to GCS bucket %s with object name %s", fileName, gcsConfig.BucketName, objectName)
}
}(w.Stat().Name)
}
}
wg.Wait()
}
func connectFTP(server string) (*ftp.ServerConn, error) {
ftpClient, err := ftp.Dial(fmt.Sprintf("%s:21", server))
if err != nil {
return nil, err
}
return ftpClient, nil
}
func downloadFTPFile(ftpClient *ftp.ServerConn, filePath string) ([]byte, error) {
r, err := ftpClient.Retr(filePath)
if err != nil {
return nil, err
}
defer r.Close()
return io.ReadAll(r)
}
func uploadToGCS(ctx context.Context, gcsClient *storage.Client, bucketName, objectName string, content []byte) error {
wc := gcsClient.Bucket(bucketName).Object(objectName).NewWriter(ctx)
defer wc.Close()
if _, err := wc.Write(content); err != nil {
return err
}
return nil
}
func checkFileExists(ctx context.Context, gcsClient *storage.Client, bucketName, objectName string) (bool, error) {
attrs, err := gcsClient.Bucket(bucketName).Object(objectName).Attrs(ctx)
if err != nil {
if storageErr, ok := err.(*googleapi.Error); ok && storageErr.Code == 404 {
return false, nil // Object does not exist
}
return false, err // Error occurred during query
}
return attrs != nil, nil // Object exists
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment