Created
November 1, 2024 21:56
-
-
Save kenluck2001/9e3630c5a8f394bd2f7622c033cb0092 to your computer and use it in GitHub Desktop.
Asynchronous file reading using asyncio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import List | |
import asyncio | |
import time | |
from typing import Coroutine, Any | |
import shutil | |
import os | |
import subprocess | |
class Settings: | |
""" | |
Parameter settings | |
""" | |
# NUM_OF_LINES_READ_BUFFER is the size of buffer when reading | |
NUM_OF_LINES_READ_BUFFER: int = ( | |
100000 # INFLUENCE ALSO BY NUMBER OF COLUMNS PER ROW | |
) | |
# NUM_OF_LINES_PER_FILE is number of lines written on the file | |
NUM_OF_LINES_PER_FILE: int = 1000 | |
# NUM_OF_EXPERIMENTS is the number of experiments to run | |
NUM_OF_EXPERIMENTS = 100 | |
class FileContent: | |
''' | |
Version 1 does not delete temporary files | |
''' | |
def __init__(self): | |
pass | |
async def __incrementalProcessingOfRecordsHandler(self, lines: list[str]) -> int: | |
""" | |
Incrementally process each generator batch of file contents | |
""" | |
record_count: int = 0 | |
for line in lines: | |
# process operation | |
record_count += 1 | |
#print(f"Processed {record_count} records") | |
return record_count | |
async def processBatch(self, config_file_name: str) -> int: | |
""" | |
Given a file name extract every content in the file | |
""" | |
total_record_count: int = 0 | |
cnt: int = 0 | |
with open(config_file_name, "r") as csvfile: | |
# simplify file reading with an implicit generator | |
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER) | |
while lines: | |
cnt = await self.__incrementalProcessingOfRecordsHandler( | |
lines | |
) | |
total_record_count += cnt | |
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER) | |
return total_record_count | |
def splitCSVToMultipleFiles(self, csv_name_with_path: str) -> list[str]: | |
""" | |
Split the content of a csv file into a number of smaller file for concurrency | |
""" | |
xind: int = csv_name_with_path.rfind("/") | |
BASE_FILE_DIR: str = csv_name_with_path[: (xind + 1)] | |
csv_name: str = csv_name_with_path.split("/")[-1] | |
FILE_PATH: str = os.path.abspath(csv_name_with_path) | |
FILE_DIR: str = "{}{}".format(BASE_FILE_DIR, "v1/fake") | |
if os.path.exists(FILE_DIR): | |
shutil.rmtree(FILE_DIR) # delete folder | |
# create a new folder | |
os.makedirs(FILE_DIR) | |
csv_name_prefix: str = csv_name.split(".")[0] | |
csv_name_suffix: str = "split" | |
# First split huge file into multiple | |
subprocess.call( | |
[ | |
"split", | |
"--lines={}".format(Settings.NUM_OF_LINES_PER_FILE), | |
"--numeric-suffixes", | |
FILE_PATH, | |
"{}_{}".format(csv_name_prefix, csv_name_suffix), | |
], | |
cwd=FILE_DIR, | |
) | |
all_files: list[str] = os.listdir(FILE_DIR) | |
file_paths = list(map(lambda name: os.path.join(FILE_DIR, name), all_files)) | |
return file_paths | |
async def __createTaskList(self, config_file_name: str) -> list[Coroutine[Any, Any, int]]: | |
tasks: list[Any] = [] | |
file_paths: list[str] = self.splitCSVToMultipleFiles(config_file_name) | |
for file_name in file_paths: | |
tasks.append(self.processBatch(file_name)) | |
return tasks | |
async def load_file_contents_using_tasks(self, config_file_name: str, return_exceptions: bool = True) -> int: | |
""" | |
Asynchronously read the files | |
""" | |
tasks: list[Coroutine[Any, Any, int]] = await self.__createTaskList(config_file_name) | |
file_paths: list[str] = self.splitCSVToMultipleFiles(config_file_name) | |
for file_name in file_paths: | |
tasks.append(self.processBatch(file_name)) | |
count_lst = [ | |
v | |
for v in await asyncio.gather(*tasks, return_exceptions=True) | |
if not isinstance(v, Exception) | |
] | |
# recreate a task for use and testing | |
if not return_exceptions: | |
tasks = await self.__createTaskList(config_file_name) | |
count_lst = await asyncio.gather(*tasks) | |
return sum(count_lst) | |
async def asyncApp(config_file_name: str ) -> None: | |
content: FileContent = FileContent() | |
num_of_records: int = 0 | |
start_time = time.perf_counter() | |
for _ in range(Settings.NUM_OF_EXPERIMENTS): | |
num_of_records += await content.load_file_contents_using_tasks(config_file_name) | |
exec_time = (time.perf_counter() - start_time) | |
print(f"[ASYNC RUNS: ] Execution time: {exec_time:0.2f} seconds for {num_of_records//Settings.NUM_OF_EXPERIMENTS} records.") | |
if __name__ == "__main__": | |
# Dataset from https://catalog.data.gov/dataset/crime-data-from-2020-to-present | |
# Download link: https://data.lacity.org/api/views/2nrs-mtv8/rows.csv?accessType=DOWNLOAD | |
csvfile = "Crime_Data_from_2020_to_Present.csv" | |
asyncio.run(asyncApp(csvfile)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import time | |
from typing import Coroutine, Any, List, Optional | |
import shutil | |
import os | |
import subprocess | |
class Settings: | |
""" | |
Parameter settings | |
""" | |
# NUM_OF_LINES_READ_BUFFER is the size of buffer when reading | |
NUM_OF_LINES_READ_BUFFER: int = ( | |
100000 # INFLUENCE ALSO BY NUMBER OF COLUMNS PER ROW | |
) | |
# NUM_OF_LINES_PER_FILE is number of lines written on the file | |
NUM_OF_LINES_PER_FILE: int = 1000 | |
# NUM_OF_EXPERIMENTS is the number of experiments to run | |
NUM_OF_EXPERIMENTS = 100 | |
from typing import Tuple, Coroutine, Any | |
class FileContent: | |
''' | |
Version 2 has mechansim to delete stale files | |
''' | |
def __init__(self): | |
self.parent_file_lst = [] | |
async def __incrementalProcessingOfRecordsHandler(self, lines: list[str]) -> int: | |
""" | |
Incrementally process each generator batch of file contents | |
""" | |
record_count: int = 0 | |
for line in lines: | |
# process operation | |
record_count += 1 | |
#print(f"Processed {record_count} records") | |
return record_count | |
async def processBatch2(self, config_file_name: str) -> int: | |
""" | |
Given a file name extract every content in the file | |
""" | |
total_record_count: int = 0 | |
cnt: int = 0 | |
with open(config_file_name, "r") as csvfile: | |
# simplify file reading with an implicit generator | |
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER) | |
while lines: | |
cnt = await self.__incrementalProcessingOfRecordsHandler( | |
lines | |
) | |
total_record_count += cnt | |
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER) | |
return total_record_count | |
async def processBatch(self, config_file_name: str) -> int: | |
""" | |
Given a file name extract every content in the file | |
""" | |
total_record_count: int = 0 | |
cnt: int = 0 | |
with open(config_file_name, "r") as csvfile: | |
# simplify file reading with an implicit generator | |
lines = csvfile.readlines() | |
total_record_count = await self.__incrementalProcessingOfRecordsHandler(lines) | |
return total_record_count | |
def __getTupleFilePathNameAndDirectory(self, csv_name_with_path: str) -> tuple[str, str]: | |
xind: int = csv_name_with_path.rfind("/") | |
BASE_FILE_DIR: str = csv_name_with_path[: (xind + 1)] | |
csv_name: str = csv_name_with_path.split("/")[-1] | |
FILE_PATH: str = os.path.abspath(csv_name_with_path) | |
FILE_DIR: str = "{}{}".format(BASE_FILE_DIR, "v2/fake") | |
return FILE_PATH, FILE_DIR, csv_name | |
def splitCSVToMultipleFiles(self, csv_name_with_path: str) -> list[str]: | |
""" | |
Split the content of a csv file into a number of smaller file for concurrency | |
""" | |
FILE_PATH, FILE_DIR, csv_name = self.__getTupleFilePathNameAndDirectory(csv_name_with_path) | |
if not os.path.exists(FILE_DIR): | |
# create a new folder | |
os.makedirs(FILE_DIR) | |
if FILE_DIR not in self.parent_file_lst: | |
self.parent_file_lst.append(FILE_DIR) | |
csv_name_prefix: str = csv_name.split(".")[0] | |
csv_name_suffix: str = "split" | |
# First split huge file into multiple | |
subprocess.call( | |
[ | |
"split", | |
"--lines={}".format(Settings.NUM_OF_LINES_PER_FILE), | |
"--numeric-suffixes", | |
FILE_PATH, | |
"{}_{}".format(csv_name_prefix, csv_name_suffix), | |
], | |
cwd=FILE_DIR, | |
) | |
all_files: list[str] = os.listdir(FILE_DIR) | |
file_paths = list(map(lambda name: os.path.join(FILE_DIR, name), all_files)) | |
return file_paths | |
async def __createTaskList(self, config_file_name: str) -> list[Coroutine[Any, Any, int]]: | |
tasks = [] | |
file_paths: list[str] = self.splitCSVToMultipleFiles(config_file_name) | |
for file_name in file_paths: | |
tasks.append(self.processBatch(file_name)) | |
return tasks | |
async def load_file_contents_using_tasks(self, config_file_name: str, return_exceptions: bool = False) -> int: | |
""" | |
Asynchronously read the files | |
""" | |
tasks = await self.__createTaskList(config_file_name) | |
file_paths: list[str] = self.splitCSVToMultipleFiles(config_file_name) | |
for file_name in file_paths: | |
tasks.append(self.processBatch(file_name)) | |
count_lst = [ | |
v | |
for v in await asyncio.gather(*tasks, return_exceptions=True) | |
if not isinstance(v, Exception) | |
] | |
# recreate a task for use and testing | |
if not return_exceptions: | |
tasks = await self.__createTaskList(config_file_name) | |
count_lst = await asyncio.gather(*tasks) | |
return sum(count_lst) | |
# Deleting (Calling destructor) | |
def __del__(self): | |
for file_dir in self.parent_file_lst: | |
if os.path.exists(file_dir): | |
shutil.rmtree(file_dir) # delete folder | |
class SyncFileContent: | |
''' | |
Synchronous files reading | |
''' | |
def __init__(self): | |
pass | |
def __incrementalProcessingOfRecordsHandler(self, lines: list[str]) -> int: | |
""" | |
Incrementally process each generator batch of file contents | |
""" | |
record_count = 0 | |
for line in lines: | |
# process operation | |
record_count += 1 | |
#print(f"Processed {record_count} records") | |
return record_count | |
def processBatch2(self, config_file_name: str) -> int: | |
""" | |
Given a file name extract every content in the file | |
""" | |
total_record_count: int = 0 | |
cnt: int = 0 | |
with open(config_file_name, "r") as csvfile: | |
# simplify file reading with an implicit generator | |
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER) | |
while lines: | |
cnt = self.__incrementalProcessingOfRecordsHandler( | |
lines | |
) | |
total_record_count += cnt | |
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER) | |
return total_record_count | |
def processBatch(self, config_file_name: str) -> int: | |
""" | |
Given a file name extract every content in the file | |
""" | |
total_record_count: int = 0 | |
cnt: int = 0 | |
with open(config_file_name, "r") as csvfile: | |
# simplify file reading with an implicit generator | |
lines = csvfile.readlines() | |
total_record_count = self.__incrementalProcessingOfRecordsHandler(lines) | |
return total_record_count | |
def load_file_contents_synchronous(self, config_file_name: str) -> int: | |
total_record_count: int = self.processBatch(config_file_name) | |
#print(f"Total number of processed {total_record_count} records") | |
return total_record_count | |
async def asyncApp(config_file_name: str ) -> None: | |
content = FileContent() | |
num_of_records = 0 | |
start_time = time.perf_counter() | |
for _ in range(Settings.NUM_OF_EXPERIMENTS): | |
num_of_records += await content.load_file_contents_using_tasks(config_file_name) | |
exec_time = (time.perf_counter() - start_time) | |
print(f"[ASYNC RUNS: ] Average Execution time: {exec_time:0.2f} seconds for {num_of_records//Settings.NUM_OF_EXPERIMENTS} records.") | |
def synchronousApp(config_file_name: str) -> None: | |
content = SyncFileContent() | |
num_of_records = 0 | |
start_time = time.perf_counter() | |
for _ in range(Settings.NUM_OF_EXPERIMENTS): | |
num_of_records += content.load_file_contents_synchronous(config_file_name) | |
exec_time = (time.perf_counter() - start_time) | |
print(f"[SYNCHRONOUS RUNS: ] Average Execution time: {exec_time:0.2f} seconds for {num_of_records//Settings.NUM_OF_EXPERIMENTS} records.") | |
if __name__ == "__main__": | |
# Dataset from https://catalog.data.gov/dataset/crime-data-from-2020-to-present | |
# Download link: https://data.lacity.org/api/views/2nrs-mtv8/rows.csv?accessType=DOWNLOAD | |
csvfile = "Crime_Data_from_2020_to_Present.csv" | |
asyncio.run(asyncApp(csvfile)) | |
synchronousApp(csvfile) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment