Skip to content

Instantly share code, notes, and snippets.

@kkent030315
Created July 4, 2021 20:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kkent030315/b695a0ab6aa9244821a2855f19726ee5 to your computer and use it in GitHub Desktop.
Save kkent030315/b695a0ab6aa9244821a2855f19726ee5 to your computer and use it in GitHub Desktop.
race_condition.py
#
# MIT copyright 2021 Kento Oki <hrn832@protonmail.com>
#
import threading
import time
from collections import Counter
import sqlite3
import uuid
import random
# the log that holds `item_uid` of
# perform of item enchantments
log_list=[]
# this represents what number of items
# we are going to enchant simultaneously
NUMBER_OF_TOTAL_ITEMS=500
# this represents what probability of percent
# the delay occurs while the server processing
DELAY_CHANCE_PERCENT=10
# this represents miliseconds to delay
DELAY_MILLISECOND=5
def init() -> None:
db = Database()
db.sql_exec(
f"DROP TABLE IF EXISTS items")
db.sql_exec(
f"CREATE TABLE items (item_uid TEXT UNIQUE, item_name TEXT)")
class Database:
def __init__(self):
self.connection = sqlite3.connect(
# sqlite3 checks if the connection is established
# by multiple threads to prevent race condition
# a.k.a data race.
# in this case we disable forcibly
# in order to perform proof-of-concept
'v4rd.db', check_same_thread=False)
self.cursor = self.connection.cursor()
def sql_exec(self, context: str) -> sqlite3.Cursor or None:
try:
self.cursor.execute(context)
self.connection.commit()
return self.cursor
except sqlite3.ProgrammingError:
# recursive use of cursors not allowed, ignore
pass
def __del__(self):
self.connection.close()
class Item:
def __init__(self, item_name):
self.item_name = item_name
self.item_uid = uuid.uuid4()
self.db = Database()
self.create()
def is_item_exists(self) -> bool:
cursor = self.db.sql_exec(
f"SELECT COUNT(*) FROM items WHERE item_uid = '{self.item_uid}'")
if not cursor:
return False
fetched_result = cursor.fetchone()
if fetched_result is None or fetched_result[0] == 0:
return False
return True
def remove(self) -> None:
self.db.sql_exec(
f"DELETE FROM items WHERE item_uid = '{self.item_uid}'")
def create(self) -> None:
try:
self.db.sql_exec(
f"INSERT INTO items "
f"(item_uid, item_name) "
f"VALUES "
f"('{self.item_uid}', '{self.item_name}')")
except sqlite3.IntegrityError:
# duplicate column of `item_uid`
# will raise exception sqlite3.IntegrityError
# but the nature of UUID4 it should not happen
print(f"{self.item_uid}: sqlite3.IntegrityError")
# regenerate uuid and retry
self.item_uid = uuid.uuid4()
self.create()
def enchant(self) -> bool:
# check if the player actually has this item
if not self.is_item_exists():
return False
# this statement will be executed with
# the probability of `DELAY_CHANCE_PERCENT`
if random.randint(0, 100) < DELAY_CHANCE_PERCENT:
time.sleep(DELAY_MILLISECOND/1000)
# in this case, 100% break. then remove item
self.remove()
# enchant finished successfully
# this flag does not mean the successfull of enchantment
return True
class ACTION_LIST:
ACTION_ENCHANTMENT = "1"
@staticmethod
def convert_to_str(value: str) -> str:
if value == ACTION_LIST.ACTION_ENCHANTMENT:
return "ACTION_ENCHANTMENT"
def insert_log(item_uid: str, action_id: ACTION_LIST, thread_id: str) -> None:
# add item_uid to the list in order to
# check for duplicate enchantments later
log_list.append(item_uid)
print(f"[Thread {thread_id}] [EnchantLog] ItemUID: {str(item_uid)} "
f"actionID: {ACTION_LIST.convert_to_str(action_id)}")
def process_multi_enchants(item_list: Item, thread_id: int) -> None:
for item in item_list:
# insert log if the enchantment process succeeded
if item.enchant():
insert_log(item.item_uid, ACTION_LIST.ACTION_ENCHANTMENT, thread_id)
def fetch_counter_list() -> dict:
# count logs by item_uid
# e.g, dict(UUID4('0e4e78ae-c780-11e2-8598-5855dgfa776b', 1))
return dict(Counter(log_list))
def duplicate_information() -> None:
count_list = fetch_counter_list()
# this array holds `item_uid` of the duplicate items
corrupt_entry_index_list = []
# array index starts from zero
counter = -1
for value in count_list.values():
counter += 1
if value >= 2:
corrupt_entry_index_list.append(counter)
# print information of every single duplite items
[print(
f"Duplicate: Item UID: {list(count_list)[index]} "
f"Number of duplicate items: {list(count_list.values())[index]}")
for index in corrupt_entry_index_list]
# print summary
print(f"within {NUMBER_OF_TOTAL_ITEMS} of items, "
f"{len(corrupt_entry_index_list)} of duplicate enchantment found")
def check_for_duplicate_log() -> None:
count_list = fetch_counter_list()
# extract only duplicate items (x >= 2)
deplicate_entries = [ x for x in count_list.values() if x >= 2 ]
# number of duplicate items
duplicate_count = len(deplicate_entries)
if duplicate_count > 0:
print("Race Condition Found")
duplicate_information()
else:
print("No Data Corruption!")
def main() -> None:
print(f"enchant {NUMBER_OF_TOTAL_ITEMS} of items"
f"with {DELAY_CHANCE_PERCENT}% of probability"
f"with {DELAY_MILLISECOND}ms delay")
init()
multi_enchant_data = {
"item_list":
[ Item("Pickaxe") for _ in range(NUMBER_OF_TOTAL_ITEMS) ],
}
# request simultaneously enchantment of the items
# with two exact same data
thread1 = threading.Thread(
target=process_multi_enchants,
args=[multi_enchant_data["item_list"], 1])
thread2 = threading.Thread(
target=process_multi_enchants,
args=[multi_enchant_data["item_list"], 2])
# start threads
thread1.start()
thread2.start()
# await for the threads to finish
thread1.join()
thread2.join()
print(f"Done")
check_for_duplicate_log()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment