Skip to content

Instantly share code, notes, and snippets.

@YuJuncen
Last active August 17, 2022 12:50
Show Gist options
  • Save YuJuncen/4f82d1769a766faa78098923d3e32071 to your computer and use it in GitHub Desktop.
Save YuJuncen/4f82d1769a766faa78098923d3e32071 to your computer and use it in GitHub Desktop.
The scripts for testing the PiTR feature of TiKV with tiny data set (~1G).
require 'mysql'
require 'json'
require 'date'
DB = 'mysql://root:@control:4000/test'
DB_DOWNSTREAM = 'mysql://root:@tikvs-downstream:4000/test'
STREAM_ENDPOINT = "http://minio.pingcap.net:9000"
STREAM_TARGET = "s3://pitr/"
TASK_NAME = "fiolvit"
PD = "http://tikvs:2379"
DOWNSTREAM_PD = "http://tikvs-downstream:2379"
class Context
def self.tso
DateTime.now.strftime("%Q").to_i << 18
end
def self.unique_name prefix
suffix = Time.new.strftime "%Y%m%d%H%M%S"
"#{prefix}:#{suffix}"
end
end
class SQL
def initialize(type = :upstream)
if type == :downstream then
@my = Mysql.connect(DB_DOWNSTREAM)
elsif type == :upstream then
@my = Mysql.connect(DB)
end
end
def sql(s)
@my.query(s).to_a
end
end
class BR
def self.full(storage_prefix)
s = %x( BR_LOG_TO_TERM=1 br backup full -u #{PD} -s '#{STREAM_TARGET}#{storage_prefix}-full' --s3.endpoint #{STREAM_ENDPOINT} --log-format json | grep "Full Backup success summary" )
j = JSON.parse!(s)
j["BackupTS"]
end
def self.start_task(storage_prefix, start_ts)
%x( br log start \
--task-name #{TASK_NAME} \
-s '#{STREAM_TARGET}#{storage_prefix}-incremental' \
--s3.endpoint #{STREAM_ENDPOINT} -u #{PD} --start-ts #{start_ts} \
--check-requirements=false
)
end
def self.stop_task()
%x( br log stop --task-name #{TASK_NAME} -u #{PD} )
end
def self.pause_task()
%x( br log pause --task-name #{TASK_NAME} -u #{PD} )
end
def self.resume_task()
%x( br log resume --task-name #{TASK_NAME} -u #{PD} )
end
def self.restore(storage_prefix)
%x( br restore point --full-backup-storage '#{STREAM_TARGET}#{storage_prefix}-full' \
--s3.endpoint #{STREAM_ENDPOINT} \
-s '#{STREAM_TARGET}#{storage_prefix}-incremental' \
--check-requirements=false \
-u '#{DOWNSTREAM_PD}'
)
end
end
require './basic.rb'
require 'securerandom'
class TestTable
def initialize(name, conn=SQL.new)
@name = name
@conn = conn
end
def create_test_table()
SQL.new.sql %{
CREATE TABLE IF NOT EXISTS #{@name}(
id bigint PRIMARY KEY AUTO_RANDOM(8),
sub_id decimal(16, 16),
padding varchar(2048),
create_time DATETIME DEFAULT NOW(),
last_update DATETIME DEFAULT NOW(),
INDEX some_index(sub_id)
);
}
end
def insert_batch(n, conn=@conn)
batch = (0...n).map do |_| "(#{rand}, '#{SecureRandom.alphanumeric(rand(2048))}')" end
conn.sql "INSERT INTO #{@name}(sub_id, padding) VALUES #{batch.join(',')}"
end
def concurrency_insert(n, batch_size, times_pre_thread)
ths = []
for i in 0...n do
ths << Thread.start do | |
times_pre_thread.times do
insert_batch batch_size, SQL.new
end
end
end
ths.each(&:join)
end
def rename(new_name)
@conn.sql "RENAME TABLE #{@name} to #{new_name}"
@name = new_name
end
def get_somewhat_hash(conn = @conn)
r = conn.sql "SELECT count(*), SUM(sub_id) from #{@name}"
r[0]
end
end
def create_table_and_insert(name, with_tiflash = true)
t1 = TestTable.new(name)
t1.create_test_table
conn = SQL.new
t1.insert_batch(10000, conn)
if with_tiflash then
conn.sql "alter table #{name} set tiflash replica 1"
end
t1
end
def tiflash_count(conn)
r = conn.sql "select count(*) from information_schema.cluster_info where type = 'tiflash'"
r[0][0].to_i
end
def get_somewhat_hash(conn=@conn)
r = conn.sql %{WITH
A AS (select count(*) cnt, SUM(sub_id) hash from test.table1),
B AS (select count(*) cnt , SUM(sub_id) hash from test.table2)
SELECT A.cnt + B.cnt, A.hash + B.hash from A, B;
}
r[0]
end
class TestTiFlash
def initialize
@test_name = Context.unique_name "tiflash"
end
def case_insert
conn = SQL.new
["table1", "table2", "table3", "table4"].each do |t|
conn.sql "drop table if exists #{t}"
end
BR.stop_task
puts "using test name #{@test_name}"
puts "upstream number tiflash node: #{tiflash_count conn}"
t1 = create_table_and_insert("table1")
backup_ts = BR.full(@test_name)
puts "useing backup ts #{backup_ts}"
BR.start_task(@test_name, backup_ts)
t2 = create_table_and_insert("table2")
t3 = create_table_and_insert("table3", false)
t2.rename "table4"
conn.sql "ALTER TABLE table3 SET TIFLASH REPLICA 1"
conn.sql "ALTER TABLE table1 SET TIFLASH REPLICA 0"
[Thread.start do | | t1.concurrency_insert 10, 10, 3000 end,
Thread.start do | | t2.concurrency_insert 10, 10, 3000 end,
Thread.start do | | t3.concurrency_insert 10, 10, 3000 end].each(&:join)
[t1,t2,t3]
end
def wait_checkpoint()
sleep 600
end
def restore_and_check(ts)
uh = ts.map do |t|
t.get_somewhat_hash
end
puts "Insert Done; #{uh}"
dc = SQL.new :downstream
dc.sql "drop database test;"
BR.restore @test_name
dc = SQL.new :downstream
dh = ts.map do |t|
t.get_somewhat_hash dc
end
puts "downstream tiflash count #{tiflash_count dc}"
puts "#{uh} vs #{dh}; equals? = #{uh == dh}"
end
def run_case
ts = case_insert
wait_checkpoint
restore_and_check ts
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment