Skip to content

Instantly share code, notes, and snippets.

@aravindavk
Created September 10, 2015 18:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aravindavk/193eda60b6049ad025f4 to your computer and use it in GitHub Desktop.
Save aravindavk/193eda60b6049ad025f4 to your computer and use it in GitHub Desktop.
Script to Simulate Race Conditions, http://aravindavk.in/blog/simulating-race-conditions
#!/usr/bin/python
import random
import unittest
on_disk = None
class Error(Exception):
pass
def create(gfid, path):
global on_disk
if on_disk is None:
on_disk = (gfid, path)
elif on_disk != (gfid, path):
raise Error("CREATE/MKNOD Failed: on_disk {0}, target: {1}".format(
on_disk, (gfid, path))
)
def mknod(gfid, path):
create(gfid, path)
def rename(gfid, src, dest):
global on_disk
if on_disk == (gfid, src) or on_disk is None:
on_disk = (gfid, dest)
elif on_disk == (gfid, dest):
pass
else:
raise Error("RENAME Failed: on_disk {0}, target {1}".format(
on_disk, (gfid, src, dest)
))
def unlink(gfid, path):
global on_disk
if on_disk == (gfid, path):
on_disk = None
elif on_disk is None:
pass
else:
raise Error("UNLINK failed: on_disk {0}, target {1}".format(
on_disk, (gfid, path)
))
def data(gfid):
global on_disk
if on_disk is not None and on_disk[0] == gfid:
pass
else:
raise Error("DATA failed: on_disk {0}, target {1}".format(
on_disk, (gfid, )
))
def execute(row):
elements = row.split()
if elements[1] in ["CREATE", "MKNOD"]:
create(elements[2], elements[3])
elif elements[1] == "RENAME":
rename(elements[2], elements[3], elements[4])
elif elements[1] == "UNLINK":
unlink(elements[2], elements[3])
elif elements[1] == "DATA":
data(elements[2])
def mixit(label1, label2, b1, b2):
prev_rand = 0
out = ["{0} {1}".format(label1, x) for x in b1]
for i in b2:
prev_rand = random.choice(range(prev_rand+1, len(out)+1))
out.insert(prev_rand, "{0} {1}".format(label2, i))
return out
class RebalanceGeorepTests(unittest.TestCase):
pass
def test_generator(out, on_disk_expect):
def test(self):
global on_disk
on_disk = None
for r in out:
try:
execute(r)
except Error as e:
self.assertTrue(False, "{0} Failed: {1}".format(
out, e
))
self.assertEqual(on_disk_expect, on_disk,
"{0} Failed: {1} != {2}".format(
out, on_disk_expect, on_disk
))
return test
if __name__ == "__main__":
test_grp_prefix = "g1"
b1 = ["CREATE 0945daec-6f8c-438e-9bbf-b2ebf07543ef f1",
"DATA 0945daec-6f8c-438e-9bbf-b2ebf07543ef"]
b2 = ["MKNOD 0945daec-6f8c-438e-9bbf-b2ebf07543ef f1",
"DATA 0945daec-6f8c-438e-9bbf-b2ebf07543ef",
"UNLINK 0945daec-6f8c-438e-9bbf-b2ebf07543ef f1"]
tests = set()
for i in range(0, 1000):
tests.add(tuple(mixit("B1", "B2", b1, b2)))
tests.add(tuple(mixit("B2", "B1", b2, b1)))
for i, t in enumerate(tests):
func = test_generator(t, None)
setattr(RebalanceGeorepTests, "test_{0}_{1}".format(
test_grp_prefix, i), func)
runner = unittest.TextTestRunner(verbosity=2)
itersuite = unittest.TestLoader().loadTestsFromTestCase(
RebalanceGeorepTests)
runner.run(itersuite)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment