Skip to content

Instantly share code, notes, and snippets.

@kenluck2001
Created November 1, 2024 21:56
Show Gist options
  • Save kenluck2001/9e3630c5a8f394bd2f7622c033cb0092 to your computer and use it in GitHub Desktop.
Save kenluck2001/9e3630c5a8f394bd2f7622c033cb0092 to your computer and use it in GitHub Desktop.
Asynchronous file reading using asyncio
from typing import List
import asyncio
import time
from typing import Coroutine, Any
import shutil
import os
import subprocess
class Settings:
"""
Parameter settings
"""
# NUM_OF_LINES_READ_BUFFER is the size of buffer when reading
NUM_OF_LINES_READ_BUFFER: int = (
100000 # INFLUENCE ALSO BY NUMBER OF COLUMNS PER ROW
)
# NUM_OF_LINES_PER_FILE is number of lines written on the file
NUM_OF_LINES_PER_FILE: int = 1000
# NUM_OF_EXPERIMENTS is the number of experiments to run
NUM_OF_EXPERIMENTS = 100
class FileContent:
'''
Version 1 does not delete temporary files
'''
def __init__(self):
pass
async def __incrementalProcessingOfRecordsHandler(self, lines: list[str]) -> int:
"""
Incrementally process each generator batch of file contents
"""
record_count: int = 0
for line in lines:
# process operation
record_count += 1
#print(f"Processed {record_count} records")
return record_count
async def processBatch(self, config_file_name: str) -> int:
"""
Given a file name extract every content in the file
"""
total_record_count: int = 0
cnt: int = 0
with open(config_file_name, "r") as csvfile:
# simplify file reading with an implicit generator
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER)
while lines:
cnt = await self.__incrementalProcessingOfRecordsHandler(
lines
)
total_record_count += cnt
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER)
return total_record_count
def splitCSVToMultipleFiles(self, csv_name_with_path: str) -> list[str]:
"""
Split the content of a csv file into a number of smaller file for concurrency
"""
xind: int = csv_name_with_path.rfind("/")
BASE_FILE_DIR: str = csv_name_with_path[: (xind + 1)]
csv_name: str = csv_name_with_path.split("/")[-1]
FILE_PATH: str = os.path.abspath(csv_name_with_path)
FILE_DIR: str = "{}{}".format(BASE_FILE_DIR, "v1/fake")
if os.path.exists(FILE_DIR):
shutil.rmtree(FILE_DIR) # delete folder
# create a new folder
os.makedirs(FILE_DIR)
csv_name_prefix: str = csv_name.split(".")[0]
csv_name_suffix: str = "split"
# First split huge file into multiple
subprocess.call(
[
"split",
"--lines={}".format(Settings.NUM_OF_LINES_PER_FILE),
"--numeric-suffixes",
FILE_PATH,
"{}_{}".format(csv_name_prefix, csv_name_suffix),
],
cwd=FILE_DIR,
)
all_files: list[str] = os.listdir(FILE_DIR)
file_paths = list(map(lambda name: os.path.join(FILE_DIR, name), all_files))
return file_paths
async def __createTaskList(self, config_file_name: str) -> list[Coroutine[Any, Any, int]]:
tasks: list[Any] = []
file_paths: list[str] = self.splitCSVToMultipleFiles(config_file_name)
for file_name in file_paths:
tasks.append(self.processBatch(file_name))
return tasks
async def load_file_contents_using_tasks(self, config_file_name: str, return_exceptions: bool = True) -> int:
"""
Asynchronously read the files
"""
tasks: list[Coroutine[Any, Any, int]] = await self.__createTaskList(config_file_name)
file_paths: list[str] = self.splitCSVToMultipleFiles(config_file_name)
for file_name in file_paths:
tasks.append(self.processBatch(file_name))
count_lst = [
v
for v in await asyncio.gather(*tasks, return_exceptions=True)
if not isinstance(v, Exception)
]
# recreate a task for use and testing
if not return_exceptions:
tasks = await self.__createTaskList(config_file_name)
count_lst = await asyncio.gather(*tasks)
return sum(count_lst)
async def asyncApp(config_file_name: str ) -> None:
content: FileContent = FileContent()
num_of_records: int = 0
start_time = time.perf_counter()
for _ in range(Settings.NUM_OF_EXPERIMENTS):
num_of_records += await content.load_file_contents_using_tasks(config_file_name)
exec_time = (time.perf_counter() - start_time)
print(f"[ASYNC RUNS: ] Execution time: {exec_time:0.2f} seconds for {num_of_records//Settings.NUM_OF_EXPERIMENTS} records.")
if __name__ == "__main__":
# Dataset from https://catalog.data.gov/dataset/crime-data-from-2020-to-present
# Download link: https://data.lacity.org/api/views/2nrs-mtv8/rows.csv?accessType=DOWNLOAD
csvfile = "Crime_Data_from_2020_to_Present.csv"
asyncio.run(asyncApp(csvfile))
import asyncio
import time
from typing import Coroutine, Any, List, Optional
import shutil
import os
import subprocess
class Settings:
"""
Parameter settings
"""
# NUM_OF_LINES_READ_BUFFER is the size of buffer when reading
NUM_OF_LINES_READ_BUFFER: int = (
100000 # INFLUENCE ALSO BY NUMBER OF COLUMNS PER ROW
)
# NUM_OF_LINES_PER_FILE is number of lines written on the file
NUM_OF_LINES_PER_FILE: int = 1000
# NUM_OF_EXPERIMENTS is the number of experiments to run
NUM_OF_EXPERIMENTS = 100
from typing import Tuple, Coroutine, Any
class FileContent:
'''
Version 2 has mechansim to delete stale files
'''
def __init__(self):
self.parent_file_lst = []
async def __incrementalProcessingOfRecordsHandler(self, lines: list[str]) -> int:
"""
Incrementally process each generator batch of file contents
"""
record_count: int = 0
for line in lines:
# process operation
record_count += 1
#print(f"Processed {record_count} records")
return record_count
async def processBatch2(self, config_file_name: str) -> int:
"""
Given a file name extract every content in the file
"""
total_record_count: int = 0
cnt: int = 0
with open(config_file_name, "r") as csvfile:
# simplify file reading with an implicit generator
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER)
while lines:
cnt = await self.__incrementalProcessingOfRecordsHandler(
lines
)
total_record_count += cnt
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER)
return total_record_count
async def processBatch(self, config_file_name: str) -> int:
"""
Given a file name extract every content in the file
"""
total_record_count: int = 0
cnt: int = 0
with open(config_file_name, "r") as csvfile:
# simplify file reading with an implicit generator
lines = csvfile.readlines()
total_record_count = await self.__incrementalProcessingOfRecordsHandler(lines)
return total_record_count
def __getTupleFilePathNameAndDirectory(self, csv_name_with_path: str) -> tuple[str, str]:
xind: int = csv_name_with_path.rfind("/")
BASE_FILE_DIR: str = csv_name_with_path[: (xind + 1)]
csv_name: str = csv_name_with_path.split("/")[-1]
FILE_PATH: str = os.path.abspath(csv_name_with_path)
FILE_DIR: str = "{}{}".format(BASE_FILE_DIR, "v2/fake")
return FILE_PATH, FILE_DIR, csv_name
def splitCSVToMultipleFiles(self, csv_name_with_path: str) -> list[str]:
"""
Split the content of a csv file into a number of smaller file for concurrency
"""
FILE_PATH, FILE_DIR, csv_name = self.__getTupleFilePathNameAndDirectory(csv_name_with_path)
if not os.path.exists(FILE_DIR):
# create a new folder
os.makedirs(FILE_DIR)
if FILE_DIR not in self.parent_file_lst:
self.parent_file_lst.append(FILE_DIR)
csv_name_prefix: str = csv_name.split(".")[0]
csv_name_suffix: str = "split"
# First split huge file into multiple
subprocess.call(
[
"split",
"--lines={}".format(Settings.NUM_OF_LINES_PER_FILE),
"--numeric-suffixes",
FILE_PATH,
"{}_{}".format(csv_name_prefix, csv_name_suffix),
],
cwd=FILE_DIR,
)
all_files: list[str] = os.listdir(FILE_DIR)
file_paths = list(map(lambda name: os.path.join(FILE_DIR, name), all_files))
return file_paths
async def __createTaskList(self, config_file_name: str) -> list[Coroutine[Any, Any, int]]:
tasks = []
file_paths: list[str] = self.splitCSVToMultipleFiles(config_file_name)
for file_name in file_paths:
tasks.append(self.processBatch(file_name))
return tasks
async def load_file_contents_using_tasks(self, config_file_name: str, return_exceptions: bool = False) -> int:
"""
Asynchronously read the files
"""
tasks = await self.__createTaskList(config_file_name)
file_paths: list[str] = self.splitCSVToMultipleFiles(config_file_name)
for file_name in file_paths:
tasks.append(self.processBatch(file_name))
count_lst = [
v
for v in await asyncio.gather(*tasks, return_exceptions=True)
if not isinstance(v, Exception)
]
# recreate a task for use and testing
if not return_exceptions:
tasks = await self.__createTaskList(config_file_name)
count_lst = await asyncio.gather(*tasks)
return sum(count_lst)
# Deleting (Calling destructor)
def __del__(self):
for file_dir in self.parent_file_lst:
if os.path.exists(file_dir):
shutil.rmtree(file_dir) # delete folder
class SyncFileContent:
'''
Synchronous files reading
'''
def __init__(self):
pass
def __incrementalProcessingOfRecordsHandler(self, lines: list[str]) -> int:
"""
Incrementally process each generator batch of file contents
"""
record_count = 0
for line in lines:
# process operation
record_count += 1
#print(f"Processed {record_count} records")
return record_count
def processBatch2(self, config_file_name: str) -> int:
"""
Given a file name extract every content in the file
"""
total_record_count: int = 0
cnt: int = 0
with open(config_file_name, "r") as csvfile:
# simplify file reading with an implicit generator
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER)
while lines:
cnt = self.__incrementalProcessingOfRecordsHandler(
lines
)
total_record_count += cnt
lines = csvfile.readlines(Settings.NUM_OF_LINES_READ_BUFFER)
return total_record_count
def processBatch(self, config_file_name: str) -> int:
"""
Given a file name extract every content in the file
"""
total_record_count: int = 0
cnt: int = 0
with open(config_file_name, "r") as csvfile:
# simplify file reading with an implicit generator
lines = csvfile.readlines()
total_record_count = self.__incrementalProcessingOfRecordsHandler(lines)
return total_record_count
def load_file_contents_synchronous(self, config_file_name: str) -> int:
total_record_count: int = self.processBatch(config_file_name)
#print(f"Total number of processed {total_record_count} records")
return total_record_count
async def asyncApp(config_file_name: str ) -> None:
content = FileContent()
num_of_records = 0
start_time = time.perf_counter()
for _ in range(Settings.NUM_OF_EXPERIMENTS):
num_of_records += await content.load_file_contents_using_tasks(config_file_name)
exec_time = (time.perf_counter() - start_time)
print(f"[ASYNC RUNS: ] Average Execution time: {exec_time:0.2f} seconds for {num_of_records//Settings.NUM_OF_EXPERIMENTS} records.")
def synchronousApp(config_file_name: str) -> None:
content = SyncFileContent()
num_of_records = 0
start_time = time.perf_counter()
for _ in range(Settings.NUM_OF_EXPERIMENTS):
num_of_records += content.load_file_contents_synchronous(config_file_name)
exec_time = (time.perf_counter() - start_time)
print(f"[SYNCHRONOUS RUNS: ] Average Execution time: {exec_time:0.2f} seconds for {num_of_records//Settings.NUM_OF_EXPERIMENTS} records.")
if __name__ == "__main__":
# Dataset from https://catalog.data.gov/dataset/crime-data-from-2020-to-present
# Download link: https://data.lacity.org/api/views/2nrs-mtv8/rows.csv?accessType=DOWNLOAD
csvfile = "Crime_Data_from_2020_to_Present.csv"
asyncio.run(asyncApp(csvfile))
synchronousApp(csvfile)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment