Created
July 20, 2023 15:49
-
-
Save so-jelly/75ce5a24e1f9275716a29a6b2cc930ee to your computer and use it in GitHub Desktop.
ftp client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"io" | |
"log" | |
"path/filepath" | |
"sync" | |
"cloud.google.com/go/storage" | |
"github.com/jlaffaye/ftp" | |
"google.golang.org/api/googleapi" | |
) | |
// FTPConfig represents the configuration for FTP connection. | |
type FTPConfig struct { | |
Server string | |
User string | |
Password string | |
Path string | |
} | |
// GCSConfig represents the configuration for GCS connection. | |
type GCSConfig struct { | |
BucketName string | |
BucketPath string | |
} | |
func main() { | |
// FTP configuration | |
ftpConfig := FTPConfig{ | |
// fill these in | |
} | |
// GCS configuration | |
gcsConfig := GCSConfig{ | |
//these too | |
} | |
ctx := context.Background() | |
// Create a GCS client | |
gcsClient, err := storage.NewClient(ctx) | |
if err != nil { | |
log.Fatalf("GCS client creation error: %v", err) | |
} | |
defer gcsClient.Close() | |
// Connect to FTP server | |
ftpClient, err := connectFTP(ftpConfig.Server) | |
if err != nil { | |
log.Fatalf("FTP connection error: %v", err) | |
} | |
defer ftpClient.Quit() | |
// Login to the FTP server | |
if err := ftpClient.Login(ftpConfig.User, ftpConfig.Password); err != nil { | |
log.Fatalf("FTP login error: %v", err) | |
} | |
// Define the number of parallel tasks you want to run | |
numParallelTasks := 10 | |
// Create a buffered channel with a capacity equal to the number of parallel tasks | |
semaphore := make(chan struct{}, numParallelTasks) | |
// Create a wait group to wait for all tasks to finish | |
var wg sync.WaitGroup | |
w := ftpClient.Walk("/") | |
for w.Next() { | |
if w.Err() != nil { | |
log.Fatalln("error walking ftp server", err) | |
} | |
if w.Stat().Type == ftp.EntryTypeFile { | |
wg.Add(1) // | |
semaphore <- struct{}{} // Acquire a semaphore | |
go func(fileName string) { | |
defer func() { | |
<-semaphore // Release the semaphore to allow another task to run | |
wg.Done() // Mark the task as completed in the wait group | |
}() | |
// Connect to FTP server | |
// make a new connection for goroutine | |
ftpClient, err := connectFTP(ftpConfig.Server) | |
if err != nil { | |
log.Fatalf("FTP connection error: %v", err) | |
} | |
defer ftpClient.Quit() | |
// Login to the FTP server | |
if err := ftpClient.Login(ftpConfig.User, ftpConfig.Password); err != nil { | |
log.Fatalf("FTP login error: %v", err) | |
} | |
log.Println("found file:", fileName) | |
objectName := gcsConfig.BucketPath + filepath.Base(fileName) | |
have, _ := checkFileExists(ctx, gcsClient, gcsConfig.BucketName, objectName) | |
if have { | |
log.Println("already have") | |
return | |
} | |
// Download file from FTP | |
content, err := downloadFTPFile(ftpClient, fileName) | |
if err != nil { | |
log.Printf("FTP file download error for %s: %v", fileName, err) | |
} | |
// Upload to GCS | |
if err := uploadToGCS(ctx, gcsClient, gcsConfig.BucketName, objectName, content); err != nil { | |
log.Printf("GCS file upload error for %s: %v", w.Stat().Name, err) | |
} else { | |
log.Printf("File %s uploaded to GCS bucket %s with object name %s", fileName, gcsConfig.BucketName, objectName) | |
} | |
}(w.Stat().Name) | |
} | |
} | |
wg.Wait() | |
} | |
func connectFTP(server string) (*ftp.ServerConn, error) { | |
ftpClient, err := ftp.Dial(fmt.Sprintf("%s:21", server)) | |
if err != nil { | |
return nil, err | |
} | |
return ftpClient, nil | |
} | |
func downloadFTPFile(ftpClient *ftp.ServerConn, filePath string) ([]byte, error) { | |
r, err := ftpClient.Retr(filePath) | |
if err != nil { | |
return nil, err | |
} | |
defer r.Close() | |
return io.ReadAll(r) | |
} | |
func uploadToGCS(ctx context.Context, gcsClient *storage.Client, bucketName, objectName string, content []byte) error { | |
wc := gcsClient.Bucket(bucketName).Object(objectName).NewWriter(ctx) | |
defer wc.Close() | |
if _, err := wc.Write(content); err != nil { | |
return err | |
} | |
return nil | |
} | |
func checkFileExists(ctx context.Context, gcsClient *storage.Client, bucketName, objectName string) (bool, error) { | |
attrs, err := gcsClient.Bucket(bucketName).Object(objectName).Attrs(ctx) | |
if err != nil { | |
if storageErr, ok := err.(*googleapi.Error); ok && storageErr.Code == 404 { | |
return false, nil // Object does not exist | |
} | |
return false, err // Error occurred during query | |
} | |
return attrs != nil, nil // Object exists | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment