-
-
Save bfrederix/1d88c109c0a5ae07f52f75764b6aa750 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"encoding/base64" | |
"encoding/json" | |
"fmt" | |
"io" | |
"net" | |
"net/http" | |
"os" | |
"path/filepath" | |
"strings" | |
"time" | |
"cloud.google.com/go/storage" | |
"github.com/DataDog/datadog-go/v5/statsd" | |
"github.com/gorilla/mux" | |
"github.com/tus/tusd/v2/pkg/gcsstore" | |
tusd "github.com/tus/tusd/v2/pkg/handler" | |
"google.golang.org/api/option" | |
muxtrace "gopkg.in/DataDog/dd-trace-go.v1/contrib/gorilla/mux" | |
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" | |
) | |
var ( | |
tusBasePath = "/files/" | |
) | |
type gcsHandlerConfig struct { | |
gcsService *gcsstore.GCSService | |
bucketName string | |
statsDClient *statsd.Client | |
} | |
func init() { | |
// TODO: Add zap logging here | |
} | |
func main() { | |
// Get the env variable | |
env := os.Getenv("ENV") | |
// Start the Datadog tracer | |
if os.Getenv("DD_API_KEY") != "" { | |
tracer.Start( | |
tracer.WithService("ucs-tus"), | |
tracer.WithEnv(env), | |
tracer.WithRuntimeMetrics()) | |
defer tracer.Stop() | |
} | |
// StatsD env var/yaml configs https://github.com/DataDog/datadog-agent/blob/main/pkg/config/config_template.yaml#L1828 | |
// Go library options: https://github.com/DataDog/datadog-go/blob/master/statsd/options.go | |
statsdClient, err := statsd.New("127.0.0.1:8125", | |
statsd.WithNamespace("ucs"), | |
statsd.WithTags([]string{ | |
fmt.Sprintf("env:%s", env), | |
fmt.Sprintf("geo:%s", os.Getenv("GEO")), | |
fmt.Sprintf("project_id:%s", os.Getenv("PROJECT_ID")), | |
"service:ucs-tus", | |
})) | |
if err != nil { | |
panic(fmt.Errorf("could not create statsd server: %w", err)) | |
} | |
// Make sure GCS_SERVICE_ACCOUNT_KEY is set in the environment | |
keyFile := os.Getenv("GCS_SERVICE_ACCOUNT_KEY_FILE") | |
bucketName := os.Getenv("BUCKET_NAME") | |
if keyFile == "" || bucketName == "" { | |
panic(fmt.Errorf("GCS_SERVICE_ACCOUNT_KEY_FILE and BUCKET_NAME must be set in the environment")) | |
} | |
var storageClientOpts []option.ClientOption | |
// Open the service account key file | |
f, err := os.Open(filepath.Clean(keyFile)) | |
if err != nil { | |
panic(fmt.Errorf("unable to open key file: %w", err)) | |
} | |
defer f.Close() | |
// Read the service account key file | |
b, err := io.ReadAll(f) | |
if err != nil { | |
panic(fmt.Errorf("unable to read key file: %w", err)) | |
} | |
decodedBytes, err := base64.StdEncoding.DecodeString(string(b)) | |
if err != nil { | |
panic(fmt.Errorf("unable to decode key file: %w", err)) | |
} | |
storageClientOpts = append(storageClientOpts, option.WithCredentialsJSON(decodedBytes)) | |
// A storage backend for tusd may consist of multiple different parts which | |
// handle upload creation, locking, termination and so on. The composer is a | |
// place where all those separated pieces are joined together. In this example | |
// we only use the file store, but you may plug in multiple. | |
composer := tusd.NewStoreComposer() | |
// Create a GCS Service client | |
gcsService, err := NewUCSGCSService(storageClientOpts...) | |
if err != nil { | |
panic(fmt.Errorf("unable to create GCS service: %w", err)) | |
} | |
// Create a new GCSStore instance | |
store := gcsstore.New(bucketName, gcsService) | |
store.UseIn(composer) | |
// Get the CORS origins from the environment variable | |
corsOrigins := os.Getenv("CORS_ORIGINS") | |
if corsOrigins == "" { | |
panic(fmt.Errorf("CORS_ORIGINS must be set in the environment")) | |
} | |
// Convert the comma separated CORS origins to a slice of strings | |
origins := strings.Split(corsOrigins, ",") | |
corsHandler := corsHandler{allowedOrigins: origins} | |
// Create a new HTTP handler for the tusd server by providing a configuration. | |
// The StoreComposer property must be set to allow the handler to function. | |
tusHandler, err := tusd.NewUnroutedHandler(tusd.Config{ | |
BasePath: tusBasePath, | |
StoreComposer: composer, | |
//UploadProgressInterval: 1 * time.Second, | |
//GracefulRequestCompletionTimeout: 10 * time.Second, | |
//AcquireLockTimeout: 20 * time.Second, | |
//NetworkTimeout: 60 * time.Second, | |
NotifyCompleteUploads: true, | |
RespectForwardedHeaders: true, | |
DisableDownload: true, | |
}) | |
if err != nil { | |
panic(fmt.Errorf("unable to create handler: %w", err)) | |
} | |
// Start another goroutine for receiving events from the handler whenever | |
// an upload is completed. The event will contain details about the upload | |
// itself and the relevant HTTP request. | |
go func() { | |
for { | |
event := <-tusHandler.CompleteUploads | |
fmt.Printf("Upload %s finished\n", event.Upload.ID) | |
statsdClient.Distribution("tus_upload_completed", 1, []string{"file_id:" + event.Upload.ID}, 1) | |
} | |
}() | |
// Create context for the server and a cancel function to stop the server | |
mainCtx, cancel := context.WithCancel(context.Background()) | |
// Create a new mux router. This is the entry point for the HTTP server. | |
muxTracer := muxtrace.NewRouter() | |
// Add CORS middleware to the mux router | |
muxTracer.Use(corsHandler.corsMiddleware) | |
// Add the GCS specific routes | |
uc := gcsHandlerConfig{ | |
gcsService: gcsService, | |
bucketName: bucketName, | |
statsDClient: statsdClient, | |
} | |
// Add a health check route | |
muxTracer.HandleFunc("/health-check", healthcheck).Methods(http.MethodGet) | |
// Add the signed URL route | |
muxTracer.HandleFunc("/signed_url/{id}", uc.SignedURLHandler).Methods(http.MethodGet) | |
// TODO: Add middleware for debugging? | |
// muxTracer.Use(requestid.New().RequestID) | |
// muxTracer.Use(requestduration.New().RequestDuration) | |
// Create a subrouter for the Tus files path (/files) | |
filesPath := muxTracer.PathPrefix("/files").Subrouter() | |
// Add the Tus specific middleware | |
filesPath.Use(tusHandler.Middleware) | |
// Add the standard Tus routes | |
filesPath.HandleFunc("/", tusHandler.PostFile).Methods(http.MethodPost, http.MethodOptions) | |
filesPath.HandleFunc("/{id}", tusHandler.HeadFile).Methods(http.MethodHead, http.MethodOptions) | |
filesPath.HandleFunc("/{id}", tusHandler.PatchFile).Methods(http.MethodPatch) | |
// Create a new HTTP server using the mux router as the handler. | |
srv := &http.Server{ | |
Addr: ":8080", | |
ReadHeaderTimeout: 10 * time.Second, | |
Handler: muxTracer, | |
BaseContext: func(listener net.Listener) context.Context { return mainCtx }, | |
} | |
// Cancel the context when the server is shut down. | |
srv.RegisterOnShutdown(cancel) | |
// TODO: replace with utils.StartWithGracefulShutdown(srv), | |
// may need to listen for CompletedUploads elsewhere. | |
if err := srv.ListenAndServe(); err != nil { | |
panic(fmt.Errorf("unable to start server: %w", err)) | |
} | |
} | |
// NewUCSGCSService returns a GCSService object given a GCloud service account file path. | |
func NewUCSGCSService(storageClientOptions ...option.ClientOption) (*gcsstore.GCSService, error) { | |
ctx := context.Background() | |
client, err := storage.NewClient(ctx, storageClientOptions...) | |
if err != nil { | |
return nil, err | |
} | |
service := &gcsstore.GCSService{ | |
Client: client, | |
} | |
return service, nil | |
} | |
type corsHandler struct { | |
allowedOrigins []string | |
} | |
func (c corsHandler) corsMiddleware(next http.Handler) http.Handler { | |
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | |
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, PATCH, OPTIONS, HEAD") | |
w.Header().Set("Access-Control-Allow-Headers", "*") | |
// Check to make sure the origin is allowed, otherwise the request will be rejected | |
// because the originator doesn't need to know the other allowed origins | |
origin := r.Header.Get("Origin") | |
if origin != "" && isAllowedOrigin(origin, c.allowedOrigins) { | |
w.Header().Set("Access-Control-Allow-Origin", origin) | |
} else { | |
// Handle disallowed origin | |
w.WriteHeader(http.StatusForbidden) | |
return | |
} | |
if r.Method == "OPTIONS" { | |
w.WriteHeader(http.StatusNoContent) | |
return | |
} | |
next.ServeHTTP(w, r) | |
}) | |
} | |
// isAllowedOrigin checks if the origin is allowed to make requests to the server | |
func isAllowedOrigin(origin string, allowedOrigins []string) bool { | |
for _, allowedOrigin := range allowedOrigins { | |
if origin == allowedOrigin { | |
return true | |
} | |
} | |
return false | |
} | |
// Create a struct for the signed url json response | |
type signedURLResponse struct { | |
SignedUrl string `json:"signed_url"` | |
} | |
// SignedURLHandler generates a signed URL for a Tus file in the GCS bucket | |
func (u gcsHandlerConfig) SignedURLHandler(w http.ResponseWriter, r *http.Request) { | |
// Get the file ID from the request | |
vars := mux.Vars(r) | |
fileID, ok := vars["id"] | |
if !ok { | |
http.Error(w, "file ID not provided", http.StatusBadRequest) | |
return | |
} | |
// Specify the options for the signed URL | |
opts := &storage.SignedURLOptions{ | |
Scheme: storage.SigningSchemeV4, | |
Method: http.MethodGet, | |
Expires: time.Now().Add(15 * time.Minute), | |
} | |
// Generate the signed URL for the Tus file in the intermediary bucket | |
bucket := u.gcsService.Client.Bucket(u.bucketName) | |
url, err := bucket.SignedURL(fileID, opts) | |
if err != nil { | |
// Handle error generating signed URL | |
http.Error(w, "error generating signed URL", http.StatusInternalServerError) | |
return | |
} | |
// Write the signed URL response as JSON | |
resp := signedURLResponse{ | |
SignedUrl: url, | |
} | |
w.Header().Set("Content-Type", "application/json") | |
if err := json.NewEncoder(w).Encode(resp); err != nil { | |
http.Error(w, "error encoding signed URL response", http.StatusInternalServerError) | |
return | |
} | |
statsDTags := []string{"file_id:" + fileID} | |
// Increment the statsd counter for signed URL requests | |
u.statsDClient.Distribution("tus_signed_url_created", 1, statsDTags, 1) | |
// TODO: Add info log here | |
} | |
func healthcheck(w http.ResponseWriter, r *http.Request) { | |
w.WriteHeader(http.StatusOK) | |
return | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment