Skip to content

Instantly share code, notes, and snippets.

@gedw99
Forked from delaneyj/egctx.go
Created May 9, 2023 14:36
Show Gist options
  • Save gedw99/434078435cc1eff3728c70f2fb65fd9f to your computer and use it in GitHub Desktop.
Save gedw99/434078435cc1eff3728c70f2fb65fd9f to your computer and use it in GitHub Desktop.
package utils
import (
"context"
"golang.org/x/sync/errgroup"
)
type ErrGroupSharedCtx struct {
eg *errgroup.Group
ctx context.Context
}
type CtxErrFunc func(ctx context.Context) error
func NewErrGroupSharedCtx(ctx context.Context, funcs ...CtxErrFunc) *ErrGroupSharedCtx {
eg, ctx := errgroup.WithContext(ctx)
egCtx := &ErrGroupSharedCtx{
eg: eg,
ctx: ctx,
}
egCtx.Go(funcs...)
return egCtx
}
func (egc *ErrGroupSharedCtx) Go(funcs ...CtxErrFunc) {
for _, f := range funcs {
fn := f
egc.eg.Go(func() error {
return fn(egc.ctx)
})
}
}
func (egc *ErrGroupSharedCtx) Wait() error {
return egc.eg.Wait()
}
type ErrGroupSeparateCtx struct {
eg *errgroup.Group
}
func NewErrGroupSeparateCtx() *ErrGroupSeparateCtx {
eg := &errgroup.Group{}
egCtx := &ErrGroupSeparateCtx{
eg: eg,
}
return egCtx
}
func (egc *ErrGroupSeparateCtx) Go(ctx context.Context, funcs ...CtxErrFunc) {
for _, f := range funcs {
fn := f
egc.eg.Go(func() error {
return fn(ctx)
})
}
}
func (egc *ErrGroupSeparateCtx) Wait() error {
return egc.eg.Wait()
}
package utils
import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
"zombiezen.com/go/sqlite"
"zombiezen.com/go/sqlite/sqlitemigration"
"zombiezen.com/go/sqlite/sqlitex"
)
type Database struct {
write *sqlitex.Pool
read *sqlitex.Pool
}
type TxFn func(tx *sqlite.Conn) error
func NewDatabase(ctx context.Context, dbFilename string, migrations []string) (*Database, error) {
if err := os.MkdirAll(filepath.Dir(dbFilename), 0755); err != nil {
return nil, fmt.Errorf("could not create database directory: %w", err)
}
uri := fmt.Sprintf("file:%s?_journal_mode=WAL&_synchronous=NORMAL", dbFilename)
writePool, err := sqlitex.Open(uri, 0, 1)
if err != nil {
return nil, fmt.Errorf("could not open write pool: %w", err)
}
conn := writePool.Get(ctx)
defer writePool.Put(conn)
schema := sqlitemigration.Schema{
Migrations: migrations,
}
if err := sqlitemigration.Migrate(ctx, conn, schema); err != nil {
return nil, fmt.Errorf("could not migrate event store: %w", err)
}
readPool, err := sqlitex.Open(uri, 0, runtime.NumCPU())
if err != nil {
return nil, fmt.Errorf("could not open read pool: %w", err)
}
db := &Database{
write: writePool,
read: readPool,
}
return db, nil
}
func (db *Database) Close() error {
if err := db.write.Close(); err != nil {
return fmt.Errorf("failed to close write pool: %w", err)
}
if err := db.read.Close(); err != nil {
return fmt.Errorf("failed to close read pool: %w", err)
}
return nil
}
func (db *Database) WriteTX(ctx context.Context, fn TxFn) (err error) {
conn := db.write.Get(ctx)
if conn == nil {
return fmt.Errorf("could not get write connection from pool")
}
defer db.write.Put(conn)
endFn, err := sqlitex.ImmediateTransaction(conn)
if err != nil {
return fmt.Errorf("could not start transaction: %w", err)
}
defer endFn(&err)
if err := fn(conn); err != nil {
return fmt.Errorf("could not execute write transaction: %w", err)
}
return nil
}
func (db *Database) ReadTX(ctx context.Context, fn TxFn) (err error) {
conn := db.read.Get(ctx)
if conn == nil {
return fmt.Errorf("could not get read connection from pool")
}
defer db.read.Put(conn)
endFn := sqlitex.Transaction(conn)
defer endFn(&err)
if err := fn(conn); err != nil {
return fmt.Errorf("could not execute read transaction: %w", err)
}
return nil
}
const (
secondsInADay = 86400
UnixEpochJulianDay = 2440587.5
)
// TimeToJulianDay converts a time.Time into a Julian day.
func TimeToJulianDay(t time.Time) float64 {
return float64(t.UTC().Unix())/secondsInADay + UnixEpochJulianDay
}
// JulianDayToTime converts a Julian day into a time.Time.
func JulianDayToTime(d float64) time.Time {
return time.Unix(int64((d-UnixEpochJulianDay)*secondsInADay), 0).UTC()
}
func JulianNow() float64 {
return TimeToJulianDay(time.Now())
}
func TimestampJulian(ts *timestamppb.Timestamp) float64 {
return TimeToJulianDay(ts.AsTime())
}
func JulianDayToTimestamp(f float64) *timestamppb.Timestamp {
t := JulianDayToTime(f)
return timestamppb.New(t)
}
func JulianDayToTimestampStmt(stmt *sqlite.Stmt, param string) *timestamppb.Timestamp {
julianDays := stmt.GetFloat(param)
return JulianDayToTimestamp(julianDays)
}
func JulianDayToTimeStmt(stmt *sqlite.Stmt, param string) time.Time {
julianDays := stmt.GetFloat(param)
return JulianDayToTime(julianDays)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment