Created
July 4, 2021 20:36
-
-
Save kkent030315/b695a0ab6aa9244821a2855f19726ee5 to your computer and use it in GitHub Desktop.
race_condition.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# MIT copyright 2021 Kento Oki <hrn832@protonmail.com> | |
# | |
import threading | |
import time | |
from collections import Counter | |
import sqlite3 | |
import uuid | |
import random | |
# the log that holds `item_uid` of | |
# perform of item enchantments | |
log_list=[] | |
# this represents what number of items | |
# we are going to enchant simultaneously | |
NUMBER_OF_TOTAL_ITEMS=500 | |
# this represents what probability of percent | |
# the delay occurs while the server processing | |
DELAY_CHANCE_PERCENT=10 | |
# this represents miliseconds to delay | |
DELAY_MILLISECOND=5 | |
def init() -> None: | |
db = Database() | |
db.sql_exec( | |
f"DROP TABLE IF EXISTS items") | |
db.sql_exec( | |
f"CREATE TABLE items (item_uid TEXT UNIQUE, item_name TEXT)") | |
class Database: | |
def __init__(self): | |
self.connection = sqlite3.connect( | |
# sqlite3 checks if the connection is established | |
# by multiple threads to prevent race condition | |
# a.k.a data race. | |
# in this case we disable forcibly | |
# in order to perform proof-of-concept | |
'v4rd.db', check_same_thread=False) | |
self.cursor = self.connection.cursor() | |
def sql_exec(self, context: str) -> sqlite3.Cursor or None: | |
try: | |
self.cursor.execute(context) | |
self.connection.commit() | |
return self.cursor | |
except sqlite3.ProgrammingError: | |
# recursive use of cursors not allowed, ignore | |
pass | |
def __del__(self): | |
self.connection.close() | |
class Item: | |
def __init__(self, item_name): | |
self.item_name = item_name | |
self.item_uid = uuid.uuid4() | |
self.db = Database() | |
self.create() | |
def is_item_exists(self) -> bool: | |
cursor = self.db.sql_exec( | |
f"SELECT COUNT(*) FROM items WHERE item_uid = '{self.item_uid}'") | |
if not cursor: | |
return False | |
fetched_result = cursor.fetchone() | |
if fetched_result is None or fetched_result[0] == 0: | |
return False | |
return True | |
def remove(self) -> None: | |
self.db.sql_exec( | |
f"DELETE FROM items WHERE item_uid = '{self.item_uid}'") | |
def create(self) -> None: | |
try: | |
self.db.sql_exec( | |
f"INSERT INTO items " | |
f"(item_uid, item_name) " | |
f"VALUES " | |
f"('{self.item_uid}', '{self.item_name}')") | |
except sqlite3.IntegrityError: | |
# duplicate column of `item_uid` | |
# will raise exception sqlite3.IntegrityError | |
# but the nature of UUID4 it should not happen | |
print(f"{self.item_uid}: sqlite3.IntegrityError") | |
# regenerate uuid and retry | |
self.item_uid = uuid.uuid4() | |
self.create() | |
def enchant(self) -> bool: | |
# check if the player actually has this item | |
if not self.is_item_exists(): | |
return False | |
# this statement will be executed with | |
# the probability of `DELAY_CHANCE_PERCENT` | |
if random.randint(0, 100) < DELAY_CHANCE_PERCENT: | |
time.sleep(DELAY_MILLISECOND/1000) | |
# in this case, 100% break. then remove item | |
self.remove() | |
# enchant finished successfully | |
# this flag does not mean the successfull of enchantment | |
return True | |
class ACTION_LIST: | |
ACTION_ENCHANTMENT = "1" | |
@staticmethod | |
def convert_to_str(value: str) -> str: | |
if value == ACTION_LIST.ACTION_ENCHANTMENT: | |
return "ACTION_ENCHANTMENT" | |
def insert_log(item_uid: str, action_id: ACTION_LIST, thread_id: str) -> None: | |
# add item_uid to the list in order to | |
# check for duplicate enchantments later | |
log_list.append(item_uid) | |
print(f"[Thread {thread_id}] [EnchantLog] ItemUID: {str(item_uid)} " | |
f"actionID: {ACTION_LIST.convert_to_str(action_id)}") | |
def process_multi_enchants(item_list: Item, thread_id: int) -> None: | |
for item in item_list: | |
# insert log if the enchantment process succeeded | |
if item.enchant(): | |
insert_log(item.item_uid, ACTION_LIST.ACTION_ENCHANTMENT, thread_id) | |
def fetch_counter_list() -> dict: | |
# count logs by item_uid | |
# e.g, dict(UUID4('0e4e78ae-c780-11e2-8598-5855dgfa776b', 1)) | |
return dict(Counter(log_list)) | |
def duplicate_information() -> None: | |
count_list = fetch_counter_list() | |
# this array holds `item_uid` of the duplicate items | |
corrupt_entry_index_list = [] | |
# array index starts from zero | |
counter = -1 | |
for value in count_list.values(): | |
counter += 1 | |
if value >= 2: | |
corrupt_entry_index_list.append(counter) | |
# print information of every single duplite items | |
[print( | |
f"Duplicate: Item UID: {list(count_list)[index]} " | |
f"Number of duplicate items: {list(count_list.values())[index]}") | |
for index in corrupt_entry_index_list] | |
# print summary | |
print(f"within {NUMBER_OF_TOTAL_ITEMS} of items, " | |
f"{len(corrupt_entry_index_list)} of duplicate enchantment found") | |
def check_for_duplicate_log() -> None: | |
count_list = fetch_counter_list() | |
# extract only duplicate items (x >= 2) | |
deplicate_entries = [ x for x in count_list.values() if x >= 2 ] | |
# number of duplicate items | |
duplicate_count = len(deplicate_entries) | |
if duplicate_count > 0: | |
print("Race Condition Found") | |
duplicate_information() | |
else: | |
print("No Data Corruption!") | |
def main() -> None: | |
print(f"enchant {NUMBER_OF_TOTAL_ITEMS} of items" | |
f"with {DELAY_CHANCE_PERCENT}% of probability" | |
f"with {DELAY_MILLISECOND}ms delay") | |
init() | |
multi_enchant_data = { | |
"item_list": | |
[ Item("Pickaxe") for _ in range(NUMBER_OF_TOTAL_ITEMS) ], | |
} | |
# request simultaneously enchantment of the items | |
# with two exact same data | |
thread1 = threading.Thread( | |
target=process_multi_enchants, | |
args=[multi_enchant_data["item_list"], 1]) | |
thread2 = threading.Thread( | |
target=process_multi_enchants, | |
args=[multi_enchant_data["item_list"], 2]) | |
# start threads | |
thread1.start() | |
thread2.start() | |
# await for the threads to finish | |
thread1.join() | |
thread2.join() | |
print(f"Done") | |
check_for_duplicate_log() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment