Skip to content

Instantly share code, notes, and snippets.

@TransparentLC
Last active March 22, 2021 15:56
Show Gist options
  • Save TransparentLC/b563955031db9c22152ea51fabb8f07d to your computer and use it in GitHub Desktop.
Save TransparentLC/b563955031db9c22152ea51fabb8f07d to your computer and use it in GitHub Desktop.
使用多进程生成随机数据填充的文件。
import argparse
import multiprocessing
import re
import os
import sys
import time
def parseFileSize(string: str) -> int:
try:
size, unit = re.search(r'^([0-9]*\.?[0-9]+)((?:b|kb|mb|gb)?)$', (string if type(string) is str else str(string)).lower()).groups()
except:
raise ValueError(f'Failed to parse size: {string}')
size = float(size)
if unit == 'kb':
size *= 1024
elif unit == 'mb':
size *= 1048576
elif unit == 'gb':
size *= 1073741824
return round(size)
def formatFileSize(size: int) -> str:
unit = 'Bytes'
units = ['TB', 'GB', 'MB', 'KB']
while size >= 1024 and len(units):
size /= 1024
unit = units.pop()
return f'{size:.2f} {unit}'
def writeFile(path: str, size: int, queue: multiprocessing.Queue, counter: multiprocessing.Value) -> None:
with counter.get_lock():
counter.value = 0
print('', end='')
try:
with open(path, 'wb') as f:
while True:
with counter.get_lock():
if counter.value >= size:
return None
data = queue.get()
f.write(data)
with counter.get_lock():
counter.value += len(data)
progress = counter.value / size
print(
'\r' + ' '.join((
f'[{round(32 * progress) * "█"}{round(32 * (1 - progress)) * " "}]',
f'{progress * 1e2:.2f}%',
f'{formatFileSize(counter.value)} / {formatFileSize(size)}',
f'Queue={queue.qsize()}',
''
)),
end=''
)
except (KeyboardInterrupt, IOError):
pass
finally:
print('')
def generateRandomChunk(maxBuffer: int, size: int, queue: multiprocessing.Queue) -> None:
try:
while size > 0:
s = min(maxBuffer, size)
queue.put(os.urandom(s))
size -= s
except (KeyboardInterrupt, BrokenPipeError):
pass
if __name__ == '__main__':
parser = argparse.ArgumentParser(
epilog='\n'.join((
'A script to generate dummy file with random bytes.',
'Uses multiprocessing. Each process will generate at least 256 KB random bytes.',
)),
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
'-s',
'--size',
required=True,
type=str,
help='The size of the dummy file. You can input number of bytes or use units (kb, mb, gb).'
)
parser.add_argument(
'-o',
'--output',
required=False,
type=str,
help='The path of the dummy file. Leave it empty to generate a random one.'
)
parser.add_argument(
'-b',
'--buffer',
required=False,
default=262144,
type=str,
help='The size of the random bytes\' buffer in each process.'
)
parser.add_argument(
'-q',
'--queue-length',
required=False,
default=128,
type=int,
help='The max length of the writing queue. Related to max memory usage of the writing process. Set 0 to disable the limit.'
)
args = parser.parse_args()
if not args.output:
args.output = os.urandom(16).hex()
args.output = os.path.realpath(args.output)
args.size = parseFileSize(args.size)
args.buffer = parseFileSize(args.buffer)
print(f'The generated file will be saved in {args.output}')
ts = time.perf_counter()
manager = multiprocessing.Manager()
queue = manager.Queue(args.queue_length)
counter = multiprocessing.Value('Q', 0)
processList = (
multiprocessing.Process(target=writeFile, args=(args.output, args.size, queue, counter)),
multiprocessing.Process(target=generateRandomChunk, args=(args.buffer, args.size, queue)),
)
for p in processList:
p.start()
try:
for p in processList:
p.join()
except KeyboardInterrupt:
pass
te = time.perf_counter()
with counter.get_lock():
print(f'Written {formatFileSize(counter.value)} in {te - ts:.3f}s, {formatFileSize(counter.value / (te - ts))}/s')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment