Skip to content

Instantly share code, notes, and snippets.

@yurishkuro
Created March 10, 2020 16:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yurishkuro/d460a21c293cd02306cc8bc19d895940 to your computer and use it in GitHub Desktop.
Save yurishkuro/d460a21c293cd02306cc8bc19d895940 to your computer and use it in GitHub Desktop.
lazy session for Cassandra in Jaeger
package cassandra
import (
"errors"
"sync"
"sync/atomic"
"github.com/jaegertracing/jaeger/pkg/cassandra"
"go.uber.org/zap"
)
var errUninitializedSession = errors.New("uninitialized Cassandra session")
// LazySession is a wrapper which attempts session creation only when queries are received. This allows for
// application startup even when Cassandra is unavailable.
// The first query incurs the cost of the blocking session creation. If session creation is unsuccessful, the query
// is created with a disconnectedSession, which will return an error when used.
type LazySession struct {
sessionInitialized uint32
sessionProvider SessionProvider
logger *zap.Logger
session cassandra.Session
m sync.Mutex
}
// SessionProvider is a func which provides a cassandra session
type SessionProvider func() (cassandra.Session, error)
// NewLazySession creates a new LazySession
func NewLazySession(sessionProvider SessionProvider, logger *zap.Logger) cassandra.Session {
return &LazySession{
sessionProvider: sessionProvider,
logger: logger,
}
}
func (l *LazySession) getSession() cassandra.Session {
if atomic.LoadUint32(&l.sessionInitialized) == 1 {
return l.session
}
l.logger.Info("Waiting on lock to begin lazy Cassandra session initialization")
l.m.Lock()
defer l.m.Unlock()
if l.sessionInitialized == 0 {
l.logger.Debug("Starting lazy Cassandra session initialization")
session, err := l.sessionProvider()
if err != nil {
l.logger.Warn("Unable to initialize lazy Cassandra session", zap.Error(err))
return disconnectedSession{}
}
l.logger.Info("Successfully initialized lazy Cassandra session")
l.session = session
atomic.StoreUint32(&l.sessionInitialized, 1)
}
return l.session
}
// ---
// Query delegates to underlying session query. It creates a new session if none exists
func (l *LazySession) Query(stmt string, values ...interface{}) cassandra.Query {
return l.getSession().Query(stmt, values...)
}
// Close delegates to underlying session query. It creates a session if none exists
func (l *LazySession) Close() {
l.getSession().Close()
}
// ---
// disconnectedSession returns disconnected queries which short circuit to produce errors
type disconnectedSession struct {
}
func (disconnectedSession) Query(stmt string, values ...interface{}) cassandra.Query {
return disconnectedQuery{}
}
func (disconnectedSession) Close() {
}
// ---
// disconnectQuery returns queries that always error
type disconnectedQuery struct {
}
func (disconnectedQuery) Exec() error {
return errUninitializedSession
}
func (disconnectedQuery) String() string {
return "This is a query from an uninitialized session."
}
func (disconnectedQuery) ScanCAS(dest ...interface{}) (bool, error) {
return false, errUninitializedSession
}
func (disconnectedQuery) Iter() cassandra.Iterator {
return disconnectedIterator{}
}
func (disconnectedQuery) Bind(v ...interface{}) cassandra.Query {
return disconnectedQuery{}
}
func (disconnectedQuery) Consistency(level cassandra.Consistency) cassandra.Query {
return disconnectedQuery{}
}
func (disconnectedQuery) PageSize(int) cassandra.Query {
return disconnectedQuery{}
}
// ---
// disconnectedIterator returns iterators which short circuit to produce errors
type disconnectedIterator struct {
}
func (disconnectedIterator) Scan(dest ...interface{}) bool {
return false
}
func (disconnectedIterator) Close() error {
return errUninitializedSession
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment