Skip to content

Instantly share code, notes, and snippets.

@paulwinex
Created March 29, 2023 05:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save paulwinex/2fce7752b55b4d10298c5470d5be6020 to your computer and use it in GitHub Desktop.
Save paulwinex/2fce7752b55b4d10298c5470d5be6020 to your computer and use it in GitHub Desktop.
Test copy speed with ramdisk on linux
import time
import os
from pathlib import Path
from datetime import timedelta
CMD_FILE_COPY = 'cp {} {}'
CMD_DIR_COPY = 'cp -r {} {}'
CMD_DEL = 'rm -fr {}'
SSD_SINGLE_FILE = Path('/path/to/big/file.ext') # REPLACE THIS
SSD_MULTIPLE_FILES = Path('/path/to/big/folder') # REPLACE THIS
RAM_MOUNT_POINT = Path('/mnt/ram')
SSD_TEMP = Path('/tmp')
class timer:
def __enter__(self):
self.start = time.perf_counter()
self.duration = 0
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.duration = round(time.perf_counter() - self.start, 3)
class ramdisk:
def __enter__(self):
mount()
return RAM_MOUNT_POINT
def __exit__(self, exc_type, exc_val, exc_tb):
umount()
def delete_path(path):
os.system(CMD_DEL.format(path))
def copy(src, trg, delete_source=False, delete_target=True):
cmd = (CMD_FILE_COPY if Path(src).is_file() else CMD_DIR_COPY).format(src, trg)
print(cmd)
with timer() as t:
os.system(cmd)
if delete_source:
delete_path(src)
if delete_target:
delete_path(trg)
time.sleep(1)
return t.duration
def mount():
print('Mount...')
RAM_MOUNT_POINT.mkdir(exist_ok=True)
os.system('mount -t tmpfs -o size=70G tmpfs {}'.format(RAM_MOUNT_POINT))
def umount():
print('Unmount...')
os.system('umount {}'.format(RAM_MOUNT_POINT))
def copy_ssd2ssd_single_test():
print('SSD -> SSD single')
return copy(SSD_SINGLE_FILE, '/tmp/deleteme')
def copy_sdd2ram_single_test():
print('SSD -> RAM single')
return copy(SSD_SINGLE_FILE, SSD_TEMP/'deleteme')
def copy_ram2ram_single_test():
print('RAM -> RAM single')
copy(SSD_SINGLE_FILE, SSD_TEMP/'deleteme', delete_target=False)
Path(SSD_TEMP/'subdir').mkdir(parents=True, exist_ok=True)
res = copy(SSD_TEMP/'deleteme', SSD_TEMP/'subdir'/'file', delete_source=True, delete_target=True)
return res
def copy_ram2sdd_single_test():
print('RAM -> SSD single')
copy(SSD_SINGLE_FILE, RAM_MOUNT_POINT/'deleteme', delete_target=False)
return copy(RAM_MOUNT_POINT/'deleteme', SSD_TEMP/'deleteme')
def copy_ssd2ssd_multi_test():
print('SSD -> SSD multi')
return copy(SSD_MULTIPLE_FILES, SSD_TEMP/'deletedir')
def copy_sdd2ram_multi_test():
print('SSD -> RAM multi')
return copy(SSD_MULTIPLE_FILES, RAM_MOUNT_POINT/'deletedir')
def copy_ram2ram_multi_test():
print('RAM -> RAM multi')
copy(SSD_MULTIPLE_FILES, RAM_MOUNT_POINT/'deletedir', delete_target=False)
return copy(RAM_MOUNT_POINT/'deletedir', RAM_MOUNT_POINT/'deletedir2', delete_source=True)
def copy_ram2sdd_multi_test():
print('RAM -> SSD multi')
copy(SSD_MULTIPLE_FILES, RAM_MOUNT_POINT/'deletedir', delete_target=False)
return copy(RAM_MOUNT_POINT/'deletedir', SSD_TEMP/'deletedir', delete_source=True)
def sizeof_fmt(num, suffix="b"):
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f"{num:.1f}Y{suffix}"
def main():
print('Start testing...')
with ramdisk():
stat = {}
stat['ssd > ssd single'] = copy(SSD_SINGLE_FILE, SSD_TEMP/'deleteme.ext')
stat['sdd > ram single'] = copy(SSD_SINGLE_FILE, RAM_MOUNT_POINT/'deleteme.ext', delete_target=False)
stat['ram > ram single'] = copy(RAM_MOUNT_POINT/'deleteme.ext', RAM_MOUNT_POINT/'deleteme2.ext')
stat['ram > sdd single'] = copy(RAM_MOUNT_POINT/'deleteme.ext', SSD_TEMP/'deleteme2.ext', delete_target=True)
with ramdisk():
stat['ssd > ssd multi '] = copy(SSD_MULTIPLE_FILES, SSD_TEMP/'deletedir')
stat['sdd > ram multi '] = copy(SSD_MULTIPLE_FILES, RAM_MOUNT_POINT/'deletedir', delete_target=False)
stat['ram > ram multi '] = copy(RAM_MOUNT_POINT/'deletedir', RAM_MOUNT_POINT/'deletedir2')
delete_path(RAM_MOUNT_POINT/'deletedir2')
stat['ram > sdd multi '] = copy(RAM_MOUNT_POINT/'deletedir', SSD_TEMP/'deletedir2')
single_size = Path(SSD_SINGLE_FILE).stat().st_size
multi_size = sum([file.stat().st_size for file in SSD_MULTIPLE_FILES.rglob('*.*')])
print('Single File Size:', sizeof_fmt(single_size))
print(f"Multiple files dir size: {sizeof_fmt(multi_size)}, {len(tuple(SSD_MULTIPLE_FILES.rglob('*.*')))} files")
for name, duration in stat.items():
if name.endswith('single'):
speed = single_size/duration
else:
speed = multi_size / duration
print(f"{name}: {timedelta(seconds=duration)} / {sizeof_fmt(speed)}/s")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment