Skip to content

Instantly share code, notes, and snippets.

@spdkils
Last active February 4, 2024 00:35
Show Gist options
  • Save spdkils/204b40649cb33ace62d2e410fe16704d to your computer and use it in GitHub Desktop.
Save spdkils/204b40649cb33ace62d2e410fe16704d to your computer and use it in GitHub Desktop.
No dependecy windows actual async file copies?
import asyncio
import ctypes
from ctypes import windll, wintypes
from pathlib import Path
# Define necessary constants and structures
FILE_READ_DATA = 0x1
FILE_READ_SHARED = 0x1
FILE_WRITE_DATA = 0x2
CREATE_ALWAYS = 0x2
OPEN_EXISTING = 0x3
OPEN_ALWAYS = 0x4
GENERIC_WRITE = 0x40000000
FILE_ATTRIBUTE_NORMAL = 0x80
FILE_FLAG_OVERLAPPED = 0x40000000
INVALID_HANDLE_VALUE = -1
ERROR_IO_PENDING = 0x3E5
STATUS_PENDING = 0x103
ERROR_HANDLE_EOF = 0x26
# Structure for AYNC I/O in Windows
class OVERLAPPED(ctypes.Structure):
_fields_ = [
("Internal", wintypes.LPVOID),
("InternalHigh", wintypes.LPVOID),
("Offset", wintypes.DWORD),
("OffsetHigh", wintypes.DWORD),
("hEvent", wintypes.HANDLE),
]
GetOverlappedResult = windll.kernel32.GetOverlappedResult
GetOverlappedResult.argtypes = [wintypes.HANDLE, ctypes.POINTER(OVERLAPPED), wintypes.LPDWORD, wintypes.BOOL]
GetOverlappedResult.restype = wintypes.BOOL
CreateFile = ctypes.windll.kernel32.CreateFileW
ReadFile = ctypes.windll.kernel32.ReadFile
WriteFile = ctypes.windll.kernel32.WriteFile
CloseHandle = ctypes.windll.kernel32.CloseHandle
GetLastError = windll.kernel32.GetLastError
async def _read_file(source_handle, buffer, read_bytes, overlapped):
"""Reads data from the file asynchronously."""
return await _file_operation(ReadFile, source_handle, buffer, read_bytes, overlapped)
async def _write_file(dest_handle, buffer, buffer_size, overlapped):
"""Writes data to the file asynchronously."""
return await _file_operation(WriteFile, dest_handle, buffer, buffer_size, overlapped)
async def _file_operation(func, handle, buffer, buffer_size, overlapped):
"""Performs the file operation asynchronously.
This function is a helper function for the read and write file functions."""
if func(handle, buffer, buffer_size, None, ctypes.byref(overlapped)) == 0:
error_code = GetLastError()
# All overlapped I/O operations return immediately, so the operation should be pending.
if error_code == ERROR_IO_PENDING:
while overlapped.Internal == wintypes.DWORD(STATUS_PENDING).value:
await asyncio.sleep(0)
# Not sure if this is wasteful, should I do this once and reuse the variables?
n = wintypes.DWORD()
b = wintypes.BOOL()
if GetOverlappedResult(handle, ctypes.byref(overlapped), ctypes.byref(n), b) == 0:
error_code = GetLastError()
# If the main driver function works, I should not reach the EOF, but just in case.
if error_code == ERROR_HANDLE_EOF:
return 0, buffer
else:
raise ctypes.WinError()
if n.value == 0:
return 0, buffer
overlapped.Offset += n.value
return n.value, buffer
else:
return 0, buffer
def _open_file(path, desired_access, share_mode, creation_disposition):
"""Opens a file asynchronously."""
handle = CreateFile(
path, desired_access, share_mode, None, creation_disposition, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, None
)
if handle == INVALID_HANDLE_VALUE:
raise ctypes.WinError()
return handle
async def copy_file(source_path, dest_path):
"""Copies a file asynchronously.
args:
source_path: str: The path to the source file.
dest_path: str: The path to the destination file.
"""
try:
# I could use calls to the WINAPI here, but c calls are so ugly.
source_size = Path(source_path).stat().st_size
# I only set this for read_shared because I was doing a one to many test. Not sure if this is great.
source_handle = _open_file(source_path, FILE_READ_DATA, FILE_READ_SHARED, OPEN_EXISTING)
dest_handle = _open_file(dest_path, GENERIC_WRITE, 0, CREATE_ALWAYS)
buffer = ctypes.create_string_buffer(4096)
overlappedRead = OVERLAPPED()
overlappedWrite = OVERLAPPED()
total_read = 0
while total_read < source_size:
bytes, buffer = await _read_file(source_handle, buffer, 4095, overlappedRead)
total_read += bytes
bytes, buffer = await _write_file(dest_handle, buffer, bytes, overlappedWrite)
# Just one more place to give other tasks a chance to run.
await asyncio.sleep(0)
finally:
CloseHandle(source_handle)
CloseHandle(dest_handle)
async def main():
tasks = [asyncio.create_task(copy_file("c:\\temp\\test.txt", f"c:\\temp\\dest{i}.txt")) for i in range(10)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment