Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@K-Wu
Last active September 21, 2022 22:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save K-Wu/15c85becfb5c17f6b1271030e0ded16d to your computer and use it in GitHub Desktop.
Save K-Wu/15c85becfb5c17f6b1271030e0ded16d to your computer and use it in GitHub Desktop.
Test script that verifies the read write IO correctness of multiple-block FS implementation
import numpy as np
import os
import math
def do_unmount_mount(mount_dev,mount_point):
os.system("sudo mount {mount_dev} {mount_point}".format(mount_dev=mount_dev, mount_point=mount_point))
os.system("sudo umount {mount_point}".format(mount_point=mount_point))
def do_write_read_test(file_name, file_name2, pos_list_beg, pos_list_end, pos_list2_beg, pos_list2_end, block_char_num, file_content, file_content2, mount_dev, mount_point, remount_prob, is_seek):
if is_seek:
fd_write_attribute="w"
else:
fd_write_attribute="a"
for w_idx in range(len(pos_list_beg)):
with open(file_name, fd_write_attribute) as fd:
if is_seek:
fd.seek(int(math.floor(pos_list_beg[w_idx] * block_char_num)))
fd.write("".join(file_content[int(math.floor(pos_list_beg[w_idx] * block_char_num)):int(
math.floor(pos_list_end[w_idx] * block_char_num))]))
if np.random.uniform()<remount_prob:
do_unmount_mount(mount_dev, mount_point)
with open(file_name2, fd_write_attribute) as f2d:
if is_seek:
f2d.seek(int(math.floor(pos_list2_beg[w_idx] * block_char_num)))#This invokes library impl fseek(3) in OS
f2d.write("".join(file_content2[
int(math.floor(pos_list2_beg[w_idx] * block_char_num)):int(
math.floor(pos_list2_end[w_idx] * block_char_num))]))
if np.random.uniform()<remount_prob:
do_unmount_mount(mount_dev, mount_point)
with open(file_name, "r") as fd:
if is_seek:#equality test only the written part in this iteration
fd.seek(int(math.floor(pos_list_beg[w_idx] * block_char_num)))
file_curr_content=fd.read()[0:int(math.floor(pos_list_end[w_idx] * block_char_num))-int(math.floor(pos_list_beg[w_idx] * block_char_num))]
print("test idx: " + str(w_idx) + " file 1 result: " + str(
"".join(
file_content[int(math.floor(pos_list_beg[w_idx] * block_char_num)):int(math.floor(pos_list_end[w_idx] * block_char_num))]) == file_curr_content))
else:#equaility test the whole file content
file_curr_content = fd.read()
print("test idx: " + str(w_idx) + " file 1 result: " + str(
"".join(file_content[0:int(math.floor(pos_list_end[w_idx] * block_char_num))]) == file_curr_content))
if np.random.uniform()<remount_prob:
do_unmount_mount(mount_dev, mount_point)
with open(file_name2, 'r') as f2d:
if is_seek:#equality test only the written part in this iteration
f2d.seek(int(math.floor(pos_list2_beg[w_idx] * block_char_num)))
file_curr_content2 =f2d.read()[0:int(math.floor(pos_list2_end[w_idx] * block_char_num))-int(math.floor(pos_list2_beg[w_idx] * block_char_num))]
print("test idx: " + str(w_idx) + " file 2 result: " + str(
"".join(
file_content2[int(math.floor(pos_list2_beg[w_idx] * block_char_num)):int(math.floor(pos_list2_end[w_idx] * block_char_num))]) == file_curr_content2))
else:#equaility test the whole file content
file_curr_content2 = f2d.read()
print("test idx: " + str(w_idx) + " file 2 result: " + str(
"".join(file_content2[0:int(math.floor(pos_list2_end[w_idx] * block_char_num))]) == file_curr_content2))
if np.random.uniform()<remount_prob:
do_unmount_mount(mount_dev, mount_point)
def seek_write_test():
block_size = 4 # in kb
block_num = 7
#pos_list = [0, 1.5, 3.3, 4.0, 4.7, 5.6, 7]
#pos_list2 = [0, 0.8, 1.0, 3.1, 4.5, 5.6, 7]
pos_list_beg = [1.2,4.1,2.4,4.7]
pos_list_end = [2.4,4.7,4.1,7.0]
pos_list2_beg = [1.2,4.1,2.4,4.7]
pos_list2_end = [2.4,4.7,4.1,7.0]
while True:
this_run_random_int = int(np.random.randint(10000000))
file_name = "./aufsMountPoint/test_multiple_block_seek_write_{}.log".format(this_run_random_int)
file_name2 = "./aufsMountPoint/test_multiple_block_seek_write2_{}.log".format(this_run_random_int)
if (not os.path.exists(file_name)) and (not os.path.exists(file_name2)):
print(this_run_random_int)
break
else:
print("conflict! retry this run random int\n")
total_char_num = block_size * 1024 * block_num
block_char_num = block_size * 1024
file_content = np.random.randint(33, 127,
size=total_char_num).tolist() # devoid of special characters to avoid corner case
file_content2 = np.random.randint(33, 127,
size=total_char_num).tolist() # devoid of special characters to avoid corner case
file_content = list(map(lambda digit: chr(digit), file_content))
file_content2 = list(map(lambda digit: chr(digit), file_content2))
assert (len(pos_list_beg) == len(pos_list2_beg))
assert (len(pos_list_end) == len(pos_list2_end))
assert (len(pos_list_beg) == len(pos_list_end))
do_write_read_test(file_name, file_name2, pos_list_beg, pos_list_end, pos_list2_beg, pos_list2_end, block_char_num,
file_content, file_content2, "/dev/nvme0n1", " /home/kwu/aufsMountPoint", 0, True)
def sequential_write_test():
block_size = 4 # in kb
block_num = 7
pos_list = [0, 1.5, 3.3, 4.0, 4.7, 5.6, 7]
pos_list2 = [0, 0.8, 1.0, 3.1, 4.5, 5.6, 7]
while True:
this_run_random_int = int(np.random.randint(10000000))
file_name = "./aufsMountPoint/test_multiple_block_write_{}.log".format(this_run_random_int)
file_name2 = "./aufsMountPoint/test_multiple_block_write2_{}.log".format(this_run_random_int)
if (not os.path.exists(file_name)) and (not os.path.exists(file_name2)):
print(this_run_random_int)
break
else:
print("conflict! retry this run random int\n")
total_char_num = block_size * 1024 * block_num
block_char_num = block_size * 1024
file_content = np.random.randint(33, 127,
size=total_char_num).tolist() # devoid of special characters to avoid corner case
file_content2 = np.random.randint(33, 127,
size=total_char_num).tolist() # devoid of special characters to avoid corner case
file_content = list(map(lambda digit: chr(digit), file_content))
file_content2 = list(map(lambda digit: chr(digit), file_content2))
assert (len(pos_list) == len(pos_list2))
pos_list_beg = pos_list[0:-1]
pos_list_end = pos_list[1:]
pos_list2_beg = pos_list2[0:-1]
pos_list2_end = pos_list2[1:]
do_write_read_test(file_name, file_name2, pos_list_beg, pos_list_end, pos_list2_beg, pos_list2_end, block_char_num, file_content, file_content2, "/dev/nvme0n1", " /home/kwu/aufsMountPoint", 0, False)
if __name__ =="__main__":
sequential_write_test()
seek_write_test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment