Skip to content

Instantly share code, notes, and snippets.

@relistan
Last active March 4, 2024 11:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save relistan/b77c3b26f6f44f27e3b88f7dac79d00a to your computer and use it in GitHub Desktop.
Save relistan/b77c3b26f6f44f27e3b88f7dac79d00a to your computer and use it in GitHub Desktop.
Cassandra: Golang-migrate wrapper with keyspace creation

Cassandra Migration With Keyspace Creation

The problem this solves: Golang-migrate supports Cassandra migrations but does not support creating the keyspace itself. This is annoying and leads to having to wrap it with other tooling or scripting to actually use it. Additionally, when running locally, Cassandra or ScyllaDB can take a fair bit of time to start. This tool solves that problem as well. It:

  • Creates keyspaces if they are missing before running the migrations
  • Checks and retries for up to a couple of minutes waiting for Cassandra to be available

Looks like this:

time="2023-01-04T11:32:12Z" level=info msg="Beginning Cassandra migration"
Settings -----------------------------------------
  * Environment: test
  * Endpoints: [localhost]
  * Keyspace: your_keyspace
  * SSLEnable: false
  * CACertFile: your.crt
  * Port: 9042
  * Username: 
  * Password: 
  * MigrationDir: db/cassandra/migrations
  * ReplicationFactor: 1
  * ReplicationStrategy: SimpleStrategy
--------------------------------------------------
time="2023-01-04T11:32:12Z" level=info msg="Migration completed successfully"
package main
import (
"fmt"
"time"
"github.com/gocql/gocql"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database/cassandra"
_ "github.com/golang-migrate/migrate/v4/source/file"
"github.com/kelseyhightower/envconfig"
"github.com/relistan/rubberneck"
log "github.com/sirupsen/logrus"
)
// Config represents the environment variables for configuration
type Config struct {
Environment string `envconfig:"ENVIRONMENT" default:"dev"`
Endpoints []string `envconfig:"CASSANDRA_ENDPOINTS" required:"true"`
Keyspace string `envconfig:"CASSANDRA_KEYSPACE" required:"true"`
SSLEnable bool `envconfig:"CASSANDRA_SSL_ENABLE" default:"false"`
CACertFile string `envconfig:"CASSANDRA_CACERT_FILE" default:"sf-class2-root.crt"`
Port int `envconfig:"CASSANDRA_PORT" default:"9042"`
Username string `envconfig:"CASSANDRA_USERNAME" default:""`
Password string `envconfig:"CASSANDRA_PASSWORD" default:""`
MigrationDir string `envconfig:"CASSANDRA_MIGRATION_DIR" default:"db/cassandra/migrations"`
ReplicationFactor int `envconfig:"CASSANDRA_REPL_FACTOR" default:"1"`
ReplicationStrategy string `envconfig:"CASSANDRA_REPL_STRATEGY" default:"SimpleStrategy"`
}
// createKeyspace creates the keyspace in Cassandra if it doesn't exist
func createKeyspace(session *gocql.Session, keyspace string, replFactor int, strategy string) error {
query := fmt.Sprintf(
"CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = {'class': '%s', 'replication_factor': %d}",
keyspace, strategy, replFactor)
if err := session.Query(query).Exec(); err != nil {
return err
}
return nil
}
// runMigrations runs schema migrations using Golang-migrate
func runMigrations(sess *gocql.Session, migrationDir string, cassandraHosts []string, keyspace string) error {
cassandraDriver, err := cassandra.WithInstance(sess, &cassandra.Config{
KeyspaceName: keyspace,
})
if err != nil {
return err
}
m, err := migrate.NewWithDatabaseInstance(
"file://"+migrationDir,
"cassandra", cassandraDriver)
if err != nil {
return err
}
defer m.Close()
err = m.Up()
if err != nil && err != migrate.ErrNoChange {
return err
}
return nil
}
func awaitSession(cluster *gocql.ClusterConfig) (session *gocql.Session, err error) {
for i := 0; i < 240; i++ {
session, err = cluster.CreateSession()
if err == nil {
break
}
time.Sleep(500 * time.Millisecond)
log.Warnf("Cassandra not available. Waiting...")
}
return session, err
}
func configureCassandra(config *Config) *gocql.ClusterConfig {
cluster := gocql.NewCluster(config.Endpoints...)
cluster.Keyspace = "system"
cluster.Port = config.Port
// These two are only used when deployed against Amazon Keyspaces
if config.Username != "" {
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: config.Username,
Password: config.Password,
}
}
if config.SSLEnable {
cluster.SslOpts = &gocql.SslOptions{
CaPath: config.CACertFile,
EnableHostVerification: false,
}
}
return cluster
}
func main() {
var config Config
log.Info("Beginning Cassandra migration")
err := envconfig.Process("", &config)
if err != nil {
log.Fatalf("error loading config: %v", err)
}
rubberneck.Print(config)
// Connect to Cassandra cluster
cluster := configureCassandra(&config)
session, err := awaitSession(cluster)
if err != nil {
// We land here after the loop expires
log.Fatalf("error connecting to Cassandra: %v", err)
}
defer session.Close()
keyspace := fmt.Sprintf("%s_%s", config.Keyspace, config.Environment)
// Create the keyspace if it doesn't exist
if err := createKeyspace(session, keyspace, config.ReplicationFactor, config.ReplicationStrategy); err != nil {
log.Fatalf("error creating keyspace %s: %v", keyspace, err)
}
cluster.Keyspace = keyspace
migrateSession, err := cluster.CreateSession()
if err != nil {
log.Fatalf("error connecting to Cassandra: %v", err)
}
defer migrateSession.Close()
// Apply schema migrations
if err := runMigrations(migrateSession, config.MigrationDir, config.Endpoints, keyspace); err != nil {
log.Fatalf("error applying migrations: %v", err)
}
log.Info("Migration completed successfully")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment