Skip to content

Instantly share code, notes, and snippets.

@kirs
Created July 27, 2021 09:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kirs/d169c1534320c9e5f16b14007effcf22 to your computer and use it in GitHub Desktop.
Save kirs/d169c1534320c9e5f16b14007effcf22 to your computer and use it in GitHub Desktop.
scrappy ruby script to manage vreplication streams
# ruby version of https://github.com/vitessio/contrib/blob/master/vreplgen/vreplgen.go
require 'shellwords'
require 'json'
# Grab your tablet ID with `vtctlclient ListAllTablets zone1`
TABLET_ID = 'zone1-0428408676'
VTCTLD_ADDR = 'localhost:15999'
def list_all
query = <<~SQL.strip
select * from _vt.vreplication
SQL
cmd = [
'vtctlclient',
"-server=#{VTCTLD_ADDR}",
'VReplicationExec',
'-json',
TABLET_ID,
query
]
puts cmd.shelljoin
res = `#{cmd.shelljoin}`
JSON.parse(res)["rows"]
end
def delete(id)
if id.is_a?(Range)
query = <<~SQL.strip
delete from _vt.vreplication where id >= #{id.min} and id <= #{id.max}
SQL
else
query = <<~SQL.strip
delete from _vt.vreplication where id=#{id}
SQL
end
puts query
cmd = [
'vtctlclient',
"-server=#{VTCTLD_ADDR}",
'VReplicationExec',
'-json',
TABLET_ID,
query
]
res = `#{cmd.shelljoin}`
pp JSON.parse(res)
end
def build_insert(tenant_id, target)
raise "bad tenant_id" unless tenant_id.to_i > 0
tenant_id = tenant_id.to_s
# serialized BinlogSource structs,
# see https://vitess.io/docs/reference/vreplication/vreplication/#the-source-field
struct = 'keyspace:"commerce" shard:"-" filter:<'\
'rules:<match:"orders" filter:"select * from orders where tenant_id=' + tenant_id + '" > '\
'rules:<match:"products" filter:"select * from products where tenant_id=' + tenant_id + '" > '\
'rules:<match:"customers" filter:"select * from customers where tenant_id=' + tenant_id + '" > '\
'> '
[
'vtctlclient',
"-server=#{VTCTLD_ADDR}",
'VReplicationExec',
TABLET_ID,
<<~SQL.strip
insert into _vt.vreplication
(db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values
('vt_keyspace', '#{struct}', '', 99999, 99999, '#{target}', 0, 0, 'Running')
SQL
]
end
case ARGV[0]
when 'list'
rows = list_all
pp rows
puts "total: #{rows.size}"
running = rows.count { |r| r[11] == "Running" }
puts "running: #{running}"
when 'delete'
if ARGV[1] && ARGV[2]
range = (ARGV[1].to_i)..(ARGV[2].to_i)
delete(range)
else
delete(ARGV[1])
end
when 'add'
target = ARGV[3] || 'master'
if ARGV[1] && ARGV[2]
range = (ARGV[1].to_i)..(ARGV[2].to_i)
cleanup(range)
range.each do |id|
cmd = build_insert(id, target)
`#{cmd.shelljoin}`
puts cmd.shelljoin
sleep 1
end
else
cmd = build_insert(ARGV[1], target)
cmd.shelljoin
`#{cmd.shelljoin}`
end
else
raise "arg required"
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment