Created
July 27, 2021 09:54
-
-
Save kirs/d169c1534320c9e5f16b14007effcf22 to your computer and use it in GitHub Desktop.
scrappy ruby script to manage vreplication streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ruby version of https://github.com/vitessio/contrib/blob/master/vreplgen/vreplgen.go | |
require 'shellwords' | |
require 'json' | |
# Grab your tablet ID with `vtctlclient ListAllTablets zone1` | |
TABLET_ID = 'zone1-0428408676' | |
VTCTLD_ADDR = 'localhost:15999' | |
def list_all | |
query = <<~SQL.strip | |
select * from _vt.vreplication | |
SQL | |
cmd = [ | |
'vtctlclient', | |
"-server=#{VTCTLD_ADDR}", | |
'VReplicationExec', | |
'-json', | |
TABLET_ID, | |
query | |
] | |
puts cmd.shelljoin | |
res = `#{cmd.shelljoin}` | |
JSON.parse(res)["rows"] | |
end | |
def delete(id) | |
if id.is_a?(Range) | |
query = <<~SQL.strip | |
delete from _vt.vreplication where id >= #{id.min} and id <= #{id.max} | |
SQL | |
else | |
query = <<~SQL.strip | |
delete from _vt.vreplication where id=#{id} | |
SQL | |
end | |
puts query | |
cmd = [ | |
'vtctlclient', | |
"-server=#{VTCTLD_ADDR}", | |
'VReplicationExec', | |
'-json', | |
TABLET_ID, | |
query | |
] | |
res = `#{cmd.shelljoin}` | |
pp JSON.parse(res) | |
end | |
def build_insert(tenant_id, target) | |
raise "bad tenant_id" unless tenant_id.to_i > 0 | |
tenant_id = tenant_id.to_s | |
# serialized BinlogSource structs, | |
# see https://vitess.io/docs/reference/vreplication/vreplication/#the-source-field | |
struct = 'keyspace:"commerce" shard:"-" filter:<'\ | |
'rules:<match:"orders" filter:"select * from orders where tenant_id=' + tenant_id + '" > '\ | |
'rules:<match:"products" filter:"select * from products where tenant_id=' + tenant_id + '" > '\ | |
'rules:<match:"customers" filter:"select * from customers where tenant_id=' + tenant_id + '" > '\ | |
'> ' | |
[ | |
'vtctlclient', | |
"-server=#{VTCTLD_ADDR}", | |
'VReplicationExec', | |
TABLET_ID, | |
<<~SQL.strip | |
insert into _vt.vreplication | |
(db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values | |
('vt_keyspace', '#{struct}', '', 99999, 99999, '#{target}', 0, 0, 'Running') | |
SQL | |
] | |
end | |
case ARGV[0] | |
when 'list' | |
rows = list_all | |
pp rows | |
puts "total: #{rows.size}" | |
running = rows.count { |r| r[11] == "Running" } | |
puts "running: #{running}" | |
when 'delete' | |
if ARGV[1] && ARGV[2] | |
range = (ARGV[1].to_i)..(ARGV[2].to_i) | |
delete(range) | |
else | |
delete(ARGV[1]) | |
end | |
when 'add' | |
target = ARGV[3] || 'master' | |
if ARGV[1] && ARGV[2] | |
range = (ARGV[1].to_i)..(ARGV[2].to_i) | |
cleanup(range) | |
range.each do |id| | |
cmd = build_insert(id, target) | |
`#{cmd.shelljoin}` | |
puts cmd.shelljoin | |
sleep 1 | |
end | |
else | |
cmd = build_insert(ARGV[1], target) | |
cmd.shelljoin | |
`#{cmd.shelljoin}` | |
end | |
else | |
raise "arg required" | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment