Skip to content

Instantly share code, notes, and snippets.

@HenriBeck
Created July 20, 2023 19:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save HenriBeck/1c05e701222ad954f3a79c91565e22f6 to your computer and use it in GitHub Desktop.
Save HenriBeck/1c05e701222ad954f3a79c91565e22f6 to your computer and use it in GitHub Desktop.
Go Future
package asyncutils
import (
"context"
"fmt"
"log"
"runtime/debug"
)
// A Future is a task which starts but doesn't block the current thread since it's run in a goroutine.
//
// To wait on a future, call the Await() method which blocks until the future resolves either to a value or an error.
//
// To start a new future, check the `RunFuture` function.
type Future[T any] interface {
Await() (T, error)
}
type future[T any] struct {
val T
err error
done chan struct{}
}
func (future *future[T]) Await() (T, error) {
<-future.done
return future.val, future.err
}
type staticFuture[T any] struct {
val T
}
func (future *staticFuture[T]) Await() (T, error) {
return future.val, nil
}
func Value[T any](value T) Future[T] {
return &staticFuture[T]{val: value}
}
// AwaitAllSlice awaits all futures in a given slice.
//
// The return values are returned in order though futures
// which resulted in an error are logged and skipped in the output.
func AwaitAllSlice[T any](ctx context.Context, futures []Future[T]) []T {
results := make([]T, 0, len(futures))
for _, future := range futures {
result, err := future.Await()
if err != nil {
logger.ReportError(ctx, err)
continue
}
results = append(results, result)
}
return results
}
// AwaitAllMap awaits all futures in a map and returns the map without futures.
//
// Any future which resolves into an error will be omitted from the final map.
func AwaitAllMap[Key comparable, T any](ctx context.Context, futures map[Key]Future[T]) map[Key]T {
results := make(map[Key]T, len(futures))
for key, future := range futures {
result, err := future.Await()
if err != nil {
logger.ReportError(ctx, err)
continue
}
results[key] = result
}
return results
}
// RunFuture starts a new future and runs the provided function in a goroutine.
// The calling goroutine will not be blocked until `Await()` on the future is called
// which will return the results of the passed function.
//
// The ctx is needed to properly create track the async operations both in our tracing and monitoring.
// The operation name serves as a identifier in the tracing to identify the async code more easily.
//
// Panics are also recovered and are returned as normal errors.
// In this case the default value of `T` is returned with the paniced error.
//
// Example:
//
// future := async.RunFuture(ctx, "FetchResponse", func() (*http.Response, error) {
// res, err := http.Get("https://www.google.com")
// if err != nil {
// return nil, err
// }
//
// return res, nil
// })
//
// fmt.Println("This code will not wait for the HTTP request")
//
// res, err := future.Await()
//
// fmt.Println("This code waits until the http request is done")
//
// For more examples, check the test suite of the `async` package.
func RunFuture[T any](
ctx context.Context,
f func(ctx context.Context) (T, error),
) Future[T] {
future := &future[T]{
done: make(chan struct{}, 1),
}
go func(ctx context.Context) {
defer close(future.done)
defer func() {
panicErr := recover()
if panicErr == nil {
return
}
if IsDevelopment() {
log.Printf("Recovered panic: %v", panicErr)
debug.PrintStack()
}
future.err = fmt.Errorf("GoRoutine paniced: %v", panicErr)
}()
future.val, future.err = f(ctx)
}(ctx)
return future
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment