Skip to content

Instantly share code, notes, and snippets.

@kenluck2001
Created November 1, 2024 21:56
Asynchronous file reading using asyncio
from typing import List
import asyncio
import time
from typing import Coroutine, Any
import shutil
import os
import subprocess
class Settings:
"""
Parameter settings
"""
# NUM_OF_LINES_READ_BUFFER is the size of buffer when reading
NUM_OF_LINES_READ_BUFFER: int = (
100000 # INFLUENCE ALSO BY NUMBER OF COLUMNS PER ROW
)
# NUM_OF_LINES_PER_FILE is number of lines written on the file
NUM_OF_LINES_PER_FILE: int = 1000
# NUM_OF_EXPERIMENTS is the number of experiments to run
NUM_OF_EXPERIMENTS = 100
class FileContent:
'''
Version 1 does not delete temporary files
'''
def __init__(self):
pass
async def __incrementalProcessingOfRecordsHandler(self, lines: list[str]) -> int:
"""
Incrementally process each generator batch of file contents
"""
record_count: int = 0
for line in lines:
# process operation
record_count += 1
#print(f"Processed {record_count} records")
return record_count
async def processBatch(self, config_file_name: str) -> int:
"""
Given a file name extract every content in the file
"""
total_record_count: int = 0
cnt: int = 0
with open(config_file_name, "r") as csvfile:
# simplify file reading with an implicit generator
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER)
while lines:
cnt = await self.__incrementalProcessingOfRecordsHandler(
lines
)
total_record_count += cnt
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER)
return total_record_count
def splitCSVToMultipleFiles(self, csv_name_with_path: str) -> list[str]:
"""
Split the content of a csv file into a number of smaller file for concurrency
"""
xind: int = csv_name_with_path.rfind("/")
BASE_FILE_DIR: str = csv_name_with_path[: (xind + 1)]
csv_name: str = csv_name_with_path.split("/")[-1]
FILE_PATH: str = os.path.abspath(csv_name_with_path)
FILE_DIR: str = "{}{}".format(BASE_FILE_DIR, "v1/fake")
if os.path.exists(FILE_DIR):
shutil.rmtree(FILE_DIR) # delete folder
# create a new folder
os.makedirs(FILE_DIR)
csv_name_prefix: str = csv_name.split(".")[0]
csv_name_suffix: str = "split"
# First split huge file into multiple
subprocess.call(
[
"split",
"--lines={}".format(Settings.NUM_OF_LINES_PER_FILE),
"--numeric-suffixes",
FILE_PATH,
"{}_{}".format(csv_name_prefix, csv_name_suffix),
],
cwd=FILE_DIR,
)
all_files: list[str] = os.listdir(FILE_DIR)
file_paths = list(map(lambda name: os.path.join(FILE_DIR, name), all_files))
return file_paths
async def __createTaskList(self, config_file_name: str) -> list[Coroutine[Any, Any, int]]:
tasks: list[Any] = []
file_paths: list[str] = self.splitCSVToMultipleFiles(config_file_name)
for file_name in file_paths:
tasks.append(self.processBatch(file_name))
return tasks
async def load_file_contents_using_tasks(self, config_file_name: str, return_exceptions: bool = True) -> int:
"""
Asynchronously read the files
"""
tasks: list[Coroutine[Any, Any, int]] = await self.__createTaskList(config_file_name)
file_paths: list[str] = self.splitCSVToMultipleFiles(config_file_name)
for file_name in file_paths:
tasks.append(self.processBatch(file_name))
count_lst = [
v
for v in await asyncio.gather(*tasks, return_exceptions=True)
if not isinstance(v, Exception)
]
# recreate a task for use and testing
if not return_exceptions:
tasks = await self.__createTaskList(config_file_name)
count_lst = await asyncio.gather(*tasks)
return sum(count_lst)
async def asyncApp(config_file_name: str ) -> None:
content: FileContent = FileContent()
num_of_records: int = 0
start_time = time.perf_counter()
for _ in range(Settings.NUM_OF_EXPERIMENTS):
num_of_records += await content.load_file_contents_using_tasks(config_file_name)
exec_time = (time.perf_counter() - start_time)
print(f"[ASYNC RUNS: ] Execution time: {exec_time:0.2f} seconds for {num_of_records//Settings.NUM_OF_EXPERIMENTS} records.")
if __name__ == "__main__":
# Dataset from https://catalog.data.gov/dataset/crime-data-from-2020-to-present
# Download link: https://data.lacity.org/api/views/2nrs-mtv8/rows.csv?accessType=DOWNLOAD
csvfile = "Crime_Data_from_2020_to_Present.csv"
asyncio.run(asyncApp(csvfile))
import asyncio
import time
from typing import Coroutine, Any, List, Optional
import shutil
import os
import subprocess
class Settings:
"""
Parameter settings
"""
# NUM_OF_LINES_READ_BUFFER is the size of buffer when reading
NUM_OF_LINES_READ_BUFFER: int = (
100000 # INFLUENCE ALSO BY NUMBER OF COLUMNS PER ROW
)
# NUM_OF_LINES_PER_FILE is number of lines written on the file
NUM_OF_LINES_PER_FILE: int = 1000
# NUM_OF_EXPERIMENTS is the number of experiments to run
NUM_OF_EXPERIMENTS = 100
from typing import Tuple, Coroutine, Any
class FileContent:
'''
Version 2 has mechansim to delete stale files
'''
def __init__(self):
self.parent_file_lst = []
async def __incrementalProcessingOfRecordsHandler(self, lines: list[str]) -> int:
"""
Incrementally process each generator batch of file contents
"""
record_count: int = 0
for line in lines:
# process operation
record_count += 1
#print(f"Processed {record_count} records")
return record_count
async def processBatch2(self, config_file_name: str) -> int:
"""
Given a file name extract every content in the file
"""
total_record_count: int = 0
cnt: int = 0
with open(config_file_name, "r") as csvfile:
# simplify file reading with an implicit generator
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER)
while lines:
cnt = await self.__incrementalProcessingOfRecordsHandler(
lines
)
total_record_count += cnt
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER)
return total_record_count
async def processBatch(self, config_file_name: str) -> int:
"""
Given a file name extract every content in the file
"""
total_record_count: int = 0
cnt: int = 0
with open(config_file_name, "r") as csvfile:
# simplify file reading with an implicit generator
lines = csvfile.readlines()
total_record_count = await self.__incrementalProcessingOfRecordsHandler(lines)
return total_record_count
def __getTupleFilePathNameAndDirectory(self, csv_name_with_path: str) -> tuple[str, str]:
xind: int = csv_name_with_path.rfind("/")
BASE_FILE_DIR: str = csv_name_with_path[: (xind + 1)]
csv_name: str = csv_name_with_path.split("/")[-1]
FILE_PATH: str = os.path.abspath(csv_name_with_path)
FILE_DIR: str = "{}{}".format(BASE_FILE_DIR, "v2/fake")
return FILE_PATH, FILE_DIR, csv_name
def splitCSVToMultipleFiles(self, csv_name_with_path: str) -> list[str]:
"""
Split the content of a csv file into a number of smaller file for concurrency
"""
FILE_PATH, FILE_DIR, csv_name = self.__getTupleFilePathNameAndDirectory(csv_name_with_path)
if not os.path.exists(FILE_DIR):
# create a new folder
os.makedirs(FILE_DIR)
if FILE_DIR not in self.parent_file_lst:
self.parent_file_lst.append(FILE_DIR)
csv_name_prefix: str = csv_name.split(".")[0]
csv_name_suffix: str = "split"
# First split huge file into multiple
subprocess.call(
[
"split",
"--lines={}".format(Settings.NUM_OF_LINES_PER_FILE),
"--numeric-suffixes",
FILE_PATH,
"{}_{}".format(csv_name_prefix, csv_name_suffix),
],
cwd=FILE_DIR,
)
all_files: list[str] = os.listdir(FILE_DIR)
file_paths = list(map(lambda name: os.path.join(FILE_DIR, name), all_files))
return file_paths
async def __createTaskList(self, config_file_name: str) -> list[Coroutine[Any, Any, int]]:
tasks = []
file_paths: list[str] = self.splitCSVToMultipleFiles(config_file_name)
for file_name in file_paths:
tasks.append(self.processBatch(file_name))
return tasks
async def load_file_contents_using_tasks(self, config_file_name: str, return_exceptions: bool = False) -> int:
"""
Asynchronously read the files
"""
tasks = await self.__createTaskList(config_file_name)
file_paths: list[str] = self.splitCSVToMultipleFiles(config_file_name)
for file_name in file_paths:
tasks.append(self.processBatch(file_name))
count_lst = [
v
for v in await asyncio.gather(*tasks, return_exceptions=True)
if not isinstance(v, Exception)
]
# recreate a task for use and testing
if not return_exceptions:
tasks = await self.__createTaskList(config_file_name)
count_lst = await asyncio.gather(*tasks)
return sum(count_lst)
# Deleting (Calling destructor)
def __del__(self):
for file_dir in self.parent_file_lst:
if os.path.exists(file_dir):
shutil.rmtree(file_dir) # delete folder
class SyncFileContent:
'''
Synchronous files reading
'''
def __init__(self):
pass
def __incrementalProcessingOfRecordsHandler(self, lines: list[str]) -> int:
"""
Incrementally process each generator batch of file contents
"""
record_count = 0
for line in lines:
# process operation
record_count += 1
#print(f"Processed {record_count} records")
return record_count
def processBatch2(self, config_file_name: str) -> int:
"""
Given a file name extract every content in the file
"""
total_record_count: int = 0
cnt: int = 0
with open(config_file_name, "r") as csvfile:
# simplify file reading with an implicit generator
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER)
while lines:
cnt = self.__incrementalProcessingOfRecordsHandler(
lines
)
total_record_count += cnt
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER)
return total_record_count
def processBatch(self, config_file_name: str) -> int:
"""
Given a file name extract every content in the file
"""
total_record_count: int = 0
cnt: int = 0
with open(config_file_name, "r") as csvfile:
# simplify file reading with an implicit generator
lines = csvfile.readlines()
total_record_count = self.__incrementalProcessingOfRecordsHandler(lines)
return total_record_count
def load_file_contents_synchronous(self, config_file_name: str) -> int:
total_record_count: int = self.processBatch(config_file_name)
#print(f"Total number of processed {total_record_count} records")
return total_record_count
async def asyncApp(config_file_name: str ) -> None:
content = FileContent()
num_of_records = 0
start_time = time.perf_counter()
for _ in range(Settings.NUM_OF_EXPERIMENTS):
num_of_records += await content.load_file_contents_using_tasks(config_file_name)
exec_time = (time.perf_counter() - start_time)
print(f"[ASYNC RUNS: ] Average Execution time: {exec_time:0.2f} seconds for {num_of_records//Settings.NUM_OF_EXPERIMENTS} records.")
def synchronousApp(config_file_name: str) -> None:
content = SyncFileContent()
num_of_records = 0
start_time = time.perf_counter()
for _ in range(Settings.NUM_OF_EXPERIMENTS):
num_of_records += content.load_file_contents_synchronous(config_file_name)
exec_time = (time.perf_counter() - start_time)
print(f"[SYNCHRONOUS RUNS: ] Average Execution time: {exec_time:0.2f} seconds for {num_of_records//Settings.NUM_OF_EXPERIMENTS} records.")
if __name__ == "__main__":
# Dataset from https://catalog.data.gov/dataset/crime-data-from-2020-to-present
# Download link: https://data.lacity.org/api/views/2nrs-mtv8/rows.csv?accessType=DOWNLOAD
csvfile = "Crime_Data_from_2020_to_Present.csv"
asyncio.run(asyncApp(csvfile))
synchronousApp(csvfile)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment