Last active
September 2, 2021 10:03
-
-
Save kadaliao/84e3e2d357d5ce7d3770db55428827d9 to your computer and use it in GitHub Desktop.
大文件桶排序
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
演示外部排序-桶排序 | |
拆分大文件到固定尺寸的桶中,桶内进行排序之后,再合并成排序好的大文件。 | |
""" | |
import os | |
import shutil | |
import random | |
from math import ceil | |
from tqdm import tqdm | |
NUMBER_OF_LINES = 200_000 | |
BUCKET_SIZE = 10_000 | |
MIN_VAL = 10_000 | |
MAX_VAL = 100_000 | |
os.makedirs("tmp", exist_ok=True) | |
def count_and_find_min_max(filename): | |
"""统计行数,并找出数据范围""" | |
count = 0 | |
min_val = float("inf") | |
max_val = float("-inf") | |
with open(filename) as fp: | |
for data in fp: | |
count += 1 | |
value = int(data) | |
if value < min_val: | |
min_val = value | |
elif value > max_val: | |
max_val = value | |
return count, min_val, max_val | |
def split_to_buckets(filename, mode="lines"): | |
"""根据数据范围及单文件大小分桶""" | |
lines, min_val, max_val = count_and_find_min_max(filename) | |
if lines <= BUCKET_SIZE: | |
return [filename] | |
if filename.startswith("tmp"): | |
bucket_prefix = f"{filename}_bucket" | |
else: | |
bucket_prefix = f"tmp/{filename}_bucket" | |
if mode == "value": | |
# 会出现每个桶的实际元素比 BUCKET_SIZE 要多 | |
bucket_num = (max_val - min_val) // BUCKET_SIZE | |
bucket_min_max = [ | |
( | |
open(f"{bucket_prefix}_{i}", "w"), | |
min_val + i * BUCKET_SIZE, | |
min_val + (i + 1) * BUCKET_SIZE, | |
) | |
for i in range(bucket_num) | |
] | |
else: | |
# max-min 相差不大,按照行数拆分文件 | |
bucket_num = ceil(lines / BUCKET_SIZE) | |
bucket_range = (max_val - min_val) / bucket_num | |
# 分成 (min_val, range)...(xxx, max_val) 的桶 | |
bucket_min_max = [] | |
for i in range(bucket_num + 1): | |
bucket_min_max.append( | |
( | |
open(f"{bucket_prefix}_{i}", "w"), | |
min_val + i * bucket_range, | |
min_val + (i + 1) * bucket_range, | |
) | |
) | |
bucket_min_max.append( | |
( | |
open(f"{bucket_prefix}_{i+1}", "w"), | |
min_val + (i + 1) * bucket_range, | |
max_val + 1, | |
) | |
) | |
with open(filename) as fp: | |
for value in fp: | |
value = int(value) | |
for fp_b, mmi, mmx in bucket_min_max: | |
if mmi <= value < mmx: | |
fp_b.write(f"{value}\n") | |
fp_b.flush() | |
print(f"📦 {mmi}<={value}<={mmx} -> {fp_b.name}") | |
break | |
buckets = [fp.name for fp, _, _ in bucket_min_max] | |
[fp.close() for fp, _, _ in bucket_min_max] | |
sub_buckets = [] | |
for bucket in buckets: | |
sub_buckets.extend(split_to_buckets(bucket)) | |
return sub_buckets | |
def sort_and_merge_buckets(bucket_files): | |
"""排序每个桶然后合并数据""" | |
for bucket_file in bucket_files: | |
with open(bucket_file) as fpr, open("sorted", "a") as fpw: | |
data = fpr.read().strip().split("\n") | |
numbers = sorted([int(d) for d in data if d]) | |
str_numbers = map(lambda d: str(d) + "\n", numbers) | |
fpw.writelines(str_numbers) | |
fpw.flush() | |
print(f"👌 {fpr.name} -> {fpw.name}") | |
def generate_large_file( | |
filename="data.txt", | |
min_val=MIN_VAL, | |
max_val=MAX_VAL, | |
number_of_lines=NUMBER_OF_LINES, | |
): | |
with open(filename, "w") as fp: | |
for i in tqdm(range(number_of_lines), total=number_of_lines): | |
fp.write(f"{random.randrange(min_val, max_val)}\n") | |
def sort_large_file(filename): | |
bucket_files = split_to_buckets(filename, mode="val") | |
sort_and_merge_buckets(bucket_files) | |
if __name__ == "__main__": | |
filename = "data" | |
generate_large_file(filename) | |
sort_large_file(filename) | |
shutil.rmtree('tmp', ignore_errors=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment