Created
July 18, 2011 17:49
-
-
Save samg/1090146 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Schema | |
def self.migrate | |
config.each do |(name, properties)| | |
migrate_column_family(name, properties) | |
end | |
wait_for_schema_agreement | |
schema | |
end | |
def self.schema_agreement? | |
cassandra_client.schema_agreement? | |
end | |
def self.config | |
@config ||= config! | |
end | |
def self.config! | |
YAML.load_file(File.join(Config.root, 'db', 'schema.yml')) | |
end | |
def self.wait_for_schema_agreement | |
return if schema_agreement? | |
secs = 90 | |
Timeout.timeout(secs) do | |
puts "waiting up to #{secs} seconds schema agreement" | |
until schema_agreement? | |
print '.' | |
end | |
puts | |
puts "done" | |
end | |
end | |
# Migrate a column family to a desired state. | |
# NB. Only properties that are explicitly declared are set. Removing a value | |
# from properties will not reset it back to default, it will leave it in its | |
# current state. | |
def self.migrate_column_family(column_family_name, properties = {}) | |
wait_for_schema_agreement | |
cf_def = find_or_initialize_column_family(column_family_name) | |
properties.each do |property, value| | |
cf_def.send "#{property}=", value | |
end | |
if column_family_exists? cf_def.name | |
cassandra_client.update_column_family cf_def | |
else | |
cassandra_client.add_column_family cf_def | |
end | |
end | |
def self.find_or_initialize_column_family(name) | |
cf_def = column_family(name.to_s) || ( | |
cf_def = CassandraThrift::CfDef.new | |
cf_def.keyspace = cassandra_client.keyspace | |
cf_def.name = name.to_s | |
cf_def | |
) | |
end | |
# SCHEMA INTROSPECTION | |
def self.column_family_exists?(name) | |
column_families.map(&:name).include? name.to_s | |
end | |
def self.schema | |
cassandra_client.schema | |
end | |
def self.column_families | |
schema.cf_defs | |
end | |
def self.column_family(name) | |
column_families.detect{|cf| cf.name == name.to_s} | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require File.expand_path(File.join(File.dirname(__FILE__), '..', "spec_helper")) | |
require File.join(Config.root, 'lib', 'schema') | |
describe Schema do | |
before do | |
if Config.db['hosts'] | |
host = Config.db['hosts'].shuffle.first | |
else | |
host = Config.db['host'] | |
end | |
client = Cassandra.new('system', "#{host}:#{Config.db['port']}") | |
client.disable_node_auto_discovery! | |
if Config.db['user'] | |
client.login! Config.db['user'], Config.db['password'] | |
end | |
ks_name = Config.db['keyspace'] | |
begin | |
client.drop_keyspace ks_name | |
rescue CassandraThrift::InvalidRequestException | |
nil | |
end | |
ks_def = CassandraThrift::KsDef.new | |
ks_def.name = ks_name | |
ks_def.strategy_class = 'SimpleStrategy' | |
ks_def.replication_factor = 1 | |
ks_def.cf_defs = [] | |
client.add_keyspace ks_def | |
end | |
describe ".config" do | |
it "should load a yaml file" do | |
YAML.should_receive(:load_file).with File.join(Config.root, 'db', 'schema.yml') | |
Schema.config! | |
end | |
end | |
describe ".schema_agreement?" do | |
it "should be a convenient way to check if the cluster has schema agreement" do | |
Schema.schema_agreement?.should be_true | |
end | |
end | |
describe "#migrate" do | |
before do | |
@config = <<-YAML | |
--- | |
Foo: | |
comparator_type: 'UTF8Type' | |
Bar: | |
column_type: Super | |
comparator_type: 'UTF8Type' | |
Baz: | |
column_type: Super | |
default_validation_class: 'UTF8Type' | |
YAML | |
Schema.stub!(:config).and_return YAML.load(@config) | |
end | |
it "should create 3 columns from the config file" do | |
Schema.should have(0).column_families | |
Schema.migrate | |
Schema.should have(3).column_families | |
Schema.column_families.map(&:name).should =~ %w{ Foo Bar Baz } | |
end | |
it "should set the properties of those column families" do | |
Schema.migrate | |
Schema.column_family("Baz").default_validation_class.should == 'org.apache.cassandra.db.marshal.UTF8Type' | |
Schema.column_family("Baz").column_type.should == 'Super' | |
Schema.column_family("Bar").comparator_type.should == 'org.apache.cassandra.db.marshal.UTF8Type' | |
Schema.column_family("Foo").column_type.should == 'Standard' | |
end | |
end | |
describe '.column_family_exists?' do | |
it 'should return false for non-existant column families' do | |
Schema.column_family_exists?(:MyTestColumnFamily).should be_false | |
end | |
it 'should return true for existing column families' do | |
Schema.migrate_column_family(:MyTestColumnFamily) | |
Schema.column_family_exists?(:MyTestColumnFamily).should be_true | |
end | |
end | |
describe '.migrate_column_family' do | |
it "should create a column family" do | |
Schema.column_family_exists?("MyTestCF").should be_false | |
Schema.migrate_column_family("MyTestCF") | |
Schema.column_family_exists?("MyTestCF").should be_true | |
end | |
it "should set the column's default_validation_class attribute" do | |
Schema.column_family_exists?("MyTestCF").should be_false | |
Schema.migrate_column_family("MyTestCF", 'default_validation_class' => 'UTF8Type') | |
Schema.column_family("MyTestCF").default_validation_class.should == 'org.apache.cassandra.db.marshal.UTF8Type' | |
end | |
it "should update a column family's existing definition" do | |
Schema.column_family_exists?("MyTestCF").should be_false | |
Schema.migrate_column_family("MyTestCF", 'default_validation_class' => 'UTF8Type') | |
Schema.column_family("MyTestCF").default_validation_class.should == 'org.apache.cassandra.db.marshal.UTF8Type' | |
Schema.migrate_column_family("MyTestCF", 'default_validation_class' => 'BytesType') | |
Schema.column_family("MyTestCF").default_validation_class.should == 'org.apache.cassandra.db.marshal.BytesType' | |
end | |
it "should wait for schema agreement before migrating" do | |
Schema.should_receive(:schema_agreement?).exactly(3).times.and_return false, false, true | |
Schema.migrate_column_family("MyTestCF", 'default_validation_class' => 'BytesType') | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey, cool!