Skip to content

Instantly share code, notes, and snippets.

@bfrederix
Created April 9, 2024 22:09
Show Gist options
  • Save bfrederix/1d88c109c0a5ae07f52f75764b6aa750 to your computer and use it in GitHub Desktop.
Save bfrederix/1d88c109c0a5ae07f52f75764b6aa750 to your computer and use it in GitHub Desktop.
package main
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"cloud.google.com/go/storage"
"github.com/DataDog/datadog-go/v5/statsd"
"github.com/gorilla/mux"
"github.com/tus/tusd/v2/pkg/gcsstore"
tusd "github.com/tus/tusd/v2/pkg/handler"
"google.golang.org/api/option"
muxtrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/gorilla/mux"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)
var (
tusBasePath = "/files/"
)
type gcsHandlerConfig struct {
gcsService *gcsstore.GCSService
bucketName string
statsDClient *statsd.Client
}
func init() {
// TODO: Add zap logging here
}
func main() {
// Get the env variable
env := os.Getenv("ENV")
// Start the Datadog tracer
if os.Getenv("DD_API_KEY") != "" {
tracer.Start(
tracer.WithService("ucs-tus"),
tracer.WithEnv(env),
tracer.WithRuntimeMetrics())
defer tracer.Stop()
}
// StatsD env var/yaml configs https://github.com/DataDog/datadog-agent/blob/main/pkg/config/config_template.yaml#L1828
// Go library options: https://github.com/DataDog/datadog-go/blob/master/statsd/options.go
statsdClient, err := statsd.New("127.0.0.1:8125",
statsd.WithNamespace("ucs"),
statsd.WithTags([]string{
fmt.Sprintf("env:%s", env),
fmt.Sprintf("geo:%s", os.Getenv("GEO")),
fmt.Sprintf("project_id:%s", os.Getenv("PROJECT_ID")),
"service:ucs-tus",
}))
if err != nil {
panic(fmt.Errorf("could not create statsd server: %w", err))
}
// Make sure GCS_SERVICE_ACCOUNT_KEY is set in the environment
keyFile := os.Getenv("GCS_SERVICE_ACCOUNT_KEY_FILE")
bucketName := os.Getenv("BUCKET_NAME")
if keyFile == "" || bucketName == "" {
panic(fmt.Errorf("GCS_SERVICE_ACCOUNT_KEY_FILE and BUCKET_NAME must be set in the environment"))
}
var storageClientOpts []option.ClientOption
// Open the service account key file
f, err := os.Open(filepath.Clean(keyFile))
if err != nil {
panic(fmt.Errorf("unable to open key file: %w", err))
}
defer f.Close()
// Read the service account key file
b, err := io.ReadAll(f)
if err != nil {
panic(fmt.Errorf("unable to read key file: %w", err))
}
decodedBytes, err := base64.StdEncoding.DecodeString(string(b))
if err != nil {
panic(fmt.Errorf("unable to decode key file: %w", err))
}
storageClientOpts = append(storageClientOpts, option.WithCredentialsJSON(decodedBytes))
// A storage backend for tusd may consist of multiple different parts which
// handle upload creation, locking, termination and so on. The composer is a
// place where all those separated pieces are joined together. In this example
// we only use the file store, but you may plug in multiple.
composer := tusd.NewStoreComposer()
// Create a GCS Service client
gcsService, err := NewUCSGCSService(storageClientOpts...)
if err != nil {
panic(fmt.Errorf("unable to create GCS service: %w", err))
}
// Create a new GCSStore instance
store := gcsstore.New(bucketName, gcsService)
store.UseIn(composer)
// Get the CORS origins from the environment variable
corsOrigins := os.Getenv("CORS_ORIGINS")
if corsOrigins == "" {
panic(fmt.Errorf("CORS_ORIGINS must be set in the environment"))
}
// Convert the comma separated CORS origins to a slice of strings
origins := strings.Split(corsOrigins, ",")
corsHandler := corsHandler{allowedOrigins: origins}
// Create a new HTTP handler for the tusd server by providing a configuration.
// The StoreComposer property must be set to allow the handler to function.
tusHandler, err := tusd.NewUnroutedHandler(tusd.Config{
BasePath: tusBasePath,
StoreComposer: composer,
//UploadProgressInterval: 1 * time.Second,
//GracefulRequestCompletionTimeout: 10 * time.Second,
//AcquireLockTimeout: 20 * time.Second,
//NetworkTimeout: 60 * time.Second,
NotifyCompleteUploads: true,
RespectForwardedHeaders: true,
DisableDownload: true,
})
if err != nil {
panic(fmt.Errorf("unable to create handler: %w", err))
}
// Start another goroutine for receiving events from the handler whenever
// an upload is completed. The event will contain details about the upload
// itself and the relevant HTTP request.
go func() {
for {
event := <-tusHandler.CompleteUploads
fmt.Printf("Upload %s finished\n", event.Upload.ID)
statsdClient.Distribution("tus_upload_completed", 1, []string{"file_id:" + event.Upload.ID}, 1)
}
}()
// Create context for the server and a cancel function to stop the server
mainCtx, cancel := context.WithCancel(context.Background())
// Create a new mux router. This is the entry point for the HTTP server.
muxTracer := muxtrace.NewRouter()
// Add CORS middleware to the mux router
muxTracer.Use(corsHandler.corsMiddleware)
// Add the GCS specific routes
uc := gcsHandlerConfig{
gcsService: gcsService,
bucketName: bucketName,
statsDClient: statsdClient,
}
// Add a health check route
muxTracer.HandleFunc("/health-check", healthcheck).Methods(http.MethodGet)
// Add the signed URL route
muxTracer.HandleFunc("/signed_url/{id}", uc.SignedURLHandler).Methods(http.MethodGet)
// TODO: Add middleware for debugging?
// muxTracer.Use(requestid.New().RequestID)
// muxTracer.Use(requestduration.New().RequestDuration)
// Create a subrouter for the Tus files path (/files)
filesPath := muxTracer.PathPrefix("/files").Subrouter()
// Add the Tus specific middleware
filesPath.Use(tusHandler.Middleware)
// Add the standard Tus routes
filesPath.HandleFunc("/", tusHandler.PostFile).Methods(http.MethodPost, http.MethodOptions)
filesPath.HandleFunc("/{id}", tusHandler.HeadFile).Methods(http.MethodHead, http.MethodOptions)
filesPath.HandleFunc("/{id}", tusHandler.PatchFile).Methods(http.MethodPatch)
// Create a new HTTP server using the mux router as the handler.
srv := &http.Server{
Addr: ":8080",
ReadHeaderTimeout: 10 * time.Second,
Handler: muxTracer,
BaseContext: func(listener net.Listener) context.Context { return mainCtx },
}
// Cancel the context when the server is shut down.
srv.RegisterOnShutdown(cancel)
// TODO: replace with utils.StartWithGracefulShutdown(srv),
// may need to listen for CompletedUploads elsewhere.
if err := srv.ListenAndServe(); err != nil {
panic(fmt.Errorf("unable to start server: %w", err))
}
}
// NewUCSGCSService returns a GCSService object given a GCloud service account file path.
func NewUCSGCSService(storageClientOptions ...option.ClientOption) (*gcsstore.GCSService, error) {
ctx := context.Background()
client, err := storage.NewClient(ctx, storageClientOptions...)
if err != nil {
return nil, err
}
service := &gcsstore.GCSService{
Client: client,
}
return service, nil
}
type corsHandler struct {
allowedOrigins []string
}
func (c corsHandler) corsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, PATCH, OPTIONS, HEAD")
w.Header().Set("Access-Control-Allow-Headers", "*")
// Check to make sure the origin is allowed, otherwise the request will be rejected
// because the originator doesn't need to know the other allowed origins
origin := r.Header.Get("Origin")
if origin != "" && isAllowedOrigin(origin, c.allowedOrigins) {
w.Header().Set("Access-Control-Allow-Origin", origin)
} else {
// Handle disallowed origin
w.WriteHeader(http.StatusForbidden)
return
}
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusNoContent)
return
}
next.ServeHTTP(w, r)
})
}
// isAllowedOrigin checks if the origin is allowed to make requests to the server
func isAllowedOrigin(origin string, allowedOrigins []string) bool {
for _, allowedOrigin := range allowedOrigins {
if origin == allowedOrigin {
return true
}
}
return false
}
// Create a struct for the signed url json response
type signedURLResponse struct {
SignedUrl string `json:"signed_url"`
}
// SignedURLHandler generates a signed URL for a Tus file in the GCS bucket
func (u gcsHandlerConfig) SignedURLHandler(w http.ResponseWriter, r *http.Request) {
// Get the file ID from the request
vars := mux.Vars(r)
fileID, ok := vars["id"]
if !ok {
http.Error(w, "file ID not provided", http.StatusBadRequest)
return
}
// Specify the options for the signed URL
opts := &storage.SignedURLOptions{
Scheme: storage.SigningSchemeV4,
Method: http.MethodGet,
Expires: time.Now().Add(15 * time.Minute),
}
// Generate the signed URL for the Tus file in the intermediary bucket
bucket := u.gcsService.Client.Bucket(u.bucketName)
url, err := bucket.SignedURL(fileID, opts)
if err != nil {
// Handle error generating signed URL
http.Error(w, "error generating signed URL", http.StatusInternalServerError)
return
}
// Write the signed URL response as JSON
resp := signedURLResponse{
SignedUrl: url,
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(resp); err != nil {
http.Error(w, "error encoding signed URL response", http.StatusInternalServerError)
return
}
statsDTags := []string{"file_id:" + fileID}
// Increment the statsd counter for signed URL requests
u.statsDClient.Distribution("tus_signed_url_created", 1, statsDTags, 1)
// TODO: Add info log here
}
func healthcheck(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
return
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment