-
-
Save YuJuncen/4f82d1769a766faa78098923d3e32071 to your computer and use it in GitHub Desktop.
The scripts for testing the PiTR feature of TiKV with tiny data set (~1G).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'mysql' | |
require 'json' | |
require 'date' | |
DB = 'mysql://root:@control:4000/test' | |
DB_DOWNSTREAM = 'mysql://root:@tikvs-downstream:4000/test' | |
STREAM_ENDPOINT = "http://minio.pingcap.net:9000" | |
STREAM_TARGET = "s3://pitr/" | |
TASK_NAME = "fiolvit" | |
PD = "http://tikvs:2379" | |
DOWNSTREAM_PD = "http://tikvs-downstream:2379" | |
class Context | |
def self.tso | |
DateTime.now.strftime("%Q").to_i << 18 | |
end | |
def self.unique_name prefix | |
suffix = Time.new.strftime "%Y%m%d%H%M%S" | |
"#{prefix}:#{suffix}" | |
end | |
end | |
class SQL | |
def initialize(type = :upstream) | |
if type == :downstream then | |
@my = Mysql.connect(DB_DOWNSTREAM) | |
elsif type == :upstream then | |
@my = Mysql.connect(DB) | |
end | |
end | |
def sql(s) | |
@my.query(s).to_a | |
end | |
end | |
class BR | |
def self.full(storage_prefix) | |
s = %x( BR_LOG_TO_TERM=1 br backup full -u #{PD} -s '#{STREAM_TARGET}#{storage_prefix}-full' --s3.endpoint #{STREAM_ENDPOINT} --log-format json | grep "Full Backup success summary" ) | |
j = JSON.parse!(s) | |
j["BackupTS"] | |
end | |
def self.start_task(storage_prefix, start_ts) | |
%x( br log start \ | |
--task-name #{TASK_NAME} \ | |
-s '#{STREAM_TARGET}#{storage_prefix}-incremental' \ | |
--s3.endpoint #{STREAM_ENDPOINT} -u #{PD} --start-ts #{start_ts} \ | |
--check-requirements=false | |
) | |
end | |
def self.stop_task() | |
%x( br log stop --task-name #{TASK_NAME} -u #{PD} ) | |
end | |
def self.pause_task() | |
%x( br log pause --task-name #{TASK_NAME} -u #{PD} ) | |
end | |
def self.resume_task() | |
%x( br log resume --task-name #{TASK_NAME} -u #{PD} ) | |
end | |
def self.restore(storage_prefix) | |
%x( br restore point --full-backup-storage '#{STREAM_TARGET}#{storage_prefix}-full' \ | |
--s3.endpoint #{STREAM_ENDPOINT} \ | |
-s '#{STREAM_TARGET}#{storage_prefix}-incremental' \ | |
--check-requirements=false \ | |
-u '#{DOWNSTREAM_PD}' | |
) | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require './basic.rb' | |
require 'securerandom' | |
class TestTable | |
def initialize(name, conn=SQL.new) | |
@name = name | |
@conn = conn | |
end | |
def create_test_table() | |
SQL.new.sql %{ | |
CREATE TABLE IF NOT EXISTS #{@name}( | |
id bigint PRIMARY KEY AUTO_RANDOM(8), | |
sub_id decimal(16, 16), | |
padding varchar(2048), | |
create_time DATETIME DEFAULT NOW(), | |
last_update DATETIME DEFAULT NOW(), | |
INDEX some_index(sub_id) | |
); | |
} | |
end | |
def insert_batch(n, conn=@conn) | |
batch = (0...n).map do |_| "(#{rand}, '#{SecureRandom.alphanumeric(rand(2048))}')" end | |
conn.sql "INSERT INTO #{@name}(sub_id, padding) VALUES #{batch.join(',')}" | |
end | |
def concurrency_insert(n, batch_size, times_pre_thread) | |
ths = [] | |
for i in 0...n do | |
ths << Thread.start do | | | |
times_pre_thread.times do | |
insert_batch batch_size, SQL.new | |
end | |
end | |
end | |
ths.each(&:join) | |
end | |
def rename(new_name) | |
@conn.sql "RENAME TABLE #{@name} to #{new_name}" | |
@name = new_name | |
end | |
def get_somewhat_hash(conn = @conn) | |
r = conn.sql "SELECT count(*), SUM(sub_id) from #{@name}" | |
r[0] | |
end | |
end | |
def create_table_and_insert(name, with_tiflash = true) | |
t1 = TestTable.new(name) | |
t1.create_test_table | |
conn = SQL.new | |
t1.insert_batch(10000, conn) | |
if with_tiflash then | |
conn.sql "alter table #{name} set tiflash replica 1" | |
end | |
t1 | |
end | |
def tiflash_count(conn) | |
r = conn.sql "select count(*) from information_schema.cluster_info where type = 'tiflash'" | |
r[0][0].to_i | |
end | |
def get_somewhat_hash(conn=@conn) | |
r = conn.sql %{WITH | |
A AS (select count(*) cnt, SUM(sub_id) hash from test.table1), | |
B AS (select count(*) cnt , SUM(sub_id) hash from test.table2) | |
SELECT A.cnt + B.cnt, A.hash + B.hash from A, B; | |
} | |
r[0] | |
end | |
class TestTiFlash | |
def initialize | |
@test_name = Context.unique_name "tiflash" | |
end | |
def case_insert | |
conn = SQL.new | |
["table1", "table2", "table3", "table4"].each do |t| | |
conn.sql "drop table if exists #{t}" | |
end | |
BR.stop_task | |
puts "using test name #{@test_name}" | |
puts "upstream number tiflash node: #{tiflash_count conn}" | |
t1 = create_table_and_insert("table1") | |
backup_ts = BR.full(@test_name) | |
puts "useing backup ts #{backup_ts}" | |
BR.start_task(@test_name, backup_ts) | |
t2 = create_table_and_insert("table2") | |
t3 = create_table_and_insert("table3", false) | |
t2.rename "table4" | |
conn.sql "ALTER TABLE table3 SET TIFLASH REPLICA 1" | |
conn.sql "ALTER TABLE table1 SET TIFLASH REPLICA 0" | |
[Thread.start do | | t1.concurrency_insert 10, 10, 3000 end, | |
Thread.start do | | t2.concurrency_insert 10, 10, 3000 end, | |
Thread.start do | | t3.concurrency_insert 10, 10, 3000 end].each(&:join) | |
[t1,t2,t3] | |
end | |
def wait_checkpoint() | |
sleep 600 | |
end | |
def restore_and_check(ts) | |
uh = ts.map do |t| | |
t.get_somewhat_hash | |
end | |
puts "Insert Done; #{uh}" | |
dc = SQL.new :downstream | |
dc.sql "drop database test;" | |
BR.restore @test_name | |
dc = SQL.new :downstream | |
dh = ts.map do |t| | |
t.get_somewhat_hash dc | |
end | |
puts "downstream tiflash count #{tiflash_count dc}" | |
puts "#{uh} vs #{dh}; equals? = #{uh == dh}" | |
end | |
def run_case | |
ts = case_insert | |
wait_checkpoint | |
restore_and_check ts | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment