Created
September 10, 2015 18:56
-
-
Save aravindavk/193eda60b6049ad025f4 to your computer and use it in GitHub Desktop.
Script to Simulate Race Conditions, http://aravindavk.in/blog/simulating-race-conditions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import random | |
import unittest | |
on_disk = None | |
class Error(Exception): | |
pass | |
def create(gfid, path): | |
global on_disk | |
if on_disk is None: | |
on_disk = (gfid, path) | |
elif on_disk != (gfid, path): | |
raise Error("CREATE/MKNOD Failed: on_disk {0}, target: {1}".format( | |
on_disk, (gfid, path)) | |
) | |
def mknod(gfid, path): | |
create(gfid, path) | |
def rename(gfid, src, dest): | |
global on_disk | |
if on_disk == (gfid, src) or on_disk is None: | |
on_disk = (gfid, dest) | |
elif on_disk == (gfid, dest): | |
pass | |
else: | |
raise Error("RENAME Failed: on_disk {0}, target {1}".format( | |
on_disk, (gfid, src, dest) | |
)) | |
def unlink(gfid, path): | |
global on_disk | |
if on_disk == (gfid, path): | |
on_disk = None | |
elif on_disk is None: | |
pass | |
else: | |
raise Error("UNLINK failed: on_disk {0}, target {1}".format( | |
on_disk, (gfid, path) | |
)) | |
def data(gfid): | |
global on_disk | |
if on_disk is not None and on_disk[0] == gfid: | |
pass | |
else: | |
raise Error("DATA failed: on_disk {0}, target {1}".format( | |
on_disk, (gfid, ) | |
)) | |
def execute(row): | |
elements = row.split() | |
if elements[1] in ["CREATE", "MKNOD"]: | |
create(elements[2], elements[3]) | |
elif elements[1] == "RENAME": | |
rename(elements[2], elements[3], elements[4]) | |
elif elements[1] == "UNLINK": | |
unlink(elements[2], elements[3]) | |
elif elements[1] == "DATA": | |
data(elements[2]) | |
def mixit(label1, label2, b1, b2): | |
prev_rand = 0 | |
out = ["{0} {1}".format(label1, x) for x in b1] | |
for i in b2: | |
prev_rand = random.choice(range(prev_rand+1, len(out)+1)) | |
out.insert(prev_rand, "{0} {1}".format(label2, i)) | |
return out | |
class RebalanceGeorepTests(unittest.TestCase): | |
pass | |
def test_generator(out, on_disk_expect): | |
def test(self): | |
global on_disk | |
on_disk = None | |
for r in out: | |
try: | |
execute(r) | |
except Error as e: | |
self.assertTrue(False, "{0} Failed: {1}".format( | |
out, e | |
)) | |
self.assertEqual(on_disk_expect, on_disk, | |
"{0} Failed: {1} != {2}".format( | |
out, on_disk_expect, on_disk | |
)) | |
return test | |
if __name__ == "__main__": | |
test_grp_prefix = "g1" | |
b1 = ["CREATE 0945daec-6f8c-438e-9bbf-b2ebf07543ef f1", | |
"DATA 0945daec-6f8c-438e-9bbf-b2ebf07543ef"] | |
b2 = ["MKNOD 0945daec-6f8c-438e-9bbf-b2ebf07543ef f1", | |
"DATA 0945daec-6f8c-438e-9bbf-b2ebf07543ef", | |
"UNLINK 0945daec-6f8c-438e-9bbf-b2ebf07543ef f1"] | |
tests = set() | |
for i in range(0, 1000): | |
tests.add(tuple(mixit("B1", "B2", b1, b2))) | |
tests.add(tuple(mixit("B2", "B1", b2, b1))) | |
for i, t in enumerate(tests): | |
func = test_generator(t, None) | |
setattr(RebalanceGeorepTests, "test_{0}_{1}".format( | |
test_grp_prefix, i), func) | |
runner = unittest.TextTestRunner(verbosity=2) | |
itersuite = unittest.TestLoader().loadTestsFromTestCase( | |
RebalanceGeorepTests) | |
runner.run(itersuite) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment