Skip to content

Instantly share code, notes, and snippets.

@kadaliao
Last active September 2, 2021 10:03
Show Gist options
  • Save kadaliao/84e3e2d357d5ce7d3770db55428827d9 to your computer and use it in GitHub Desktop.
Save kadaliao/84e3e2d357d5ce7d3770db55428827d9 to your computer and use it in GitHub Desktop.
大文件桶排序
"""
演示外部排序-桶排序
拆分大文件到固定尺寸的桶中,桶内进行排序之后,再合并成排序好的大文件。
"""
import os
import shutil
import random
from math import ceil
from tqdm import tqdm
NUMBER_OF_LINES = 200_000
BUCKET_SIZE = 10_000
MIN_VAL = 10_000
MAX_VAL = 100_000
os.makedirs("tmp", exist_ok=True)
def count_and_find_min_max(filename):
"""统计行数,并找出数据范围"""
count = 0
min_val = float("inf")
max_val = float("-inf")
with open(filename) as fp:
for data in fp:
count += 1
value = int(data)
if value < min_val:
min_val = value
elif value > max_val:
max_val = value
return count, min_val, max_val
def split_to_buckets(filename, mode="lines"):
"""根据数据范围及单文件大小分桶"""
lines, min_val, max_val = count_and_find_min_max(filename)
if lines <= BUCKET_SIZE:
return [filename]
if filename.startswith("tmp"):
bucket_prefix = f"{filename}_bucket"
else:
bucket_prefix = f"tmp/{filename}_bucket"
if mode == "value":
# 会出现每个桶的实际元素比 BUCKET_SIZE 要多
bucket_num = (max_val - min_val) // BUCKET_SIZE
bucket_min_max = [
(
open(f"{bucket_prefix}_{i}", "w"),
min_val + i * BUCKET_SIZE,
min_val + (i + 1) * BUCKET_SIZE,
)
for i in range(bucket_num)
]
else:
# max-min 相差不大,按照行数拆分文件
bucket_num = ceil(lines / BUCKET_SIZE)
bucket_range = (max_val - min_val) / bucket_num
# 分成 (min_val, range)...(xxx, max_val) 的桶
bucket_min_max = []
for i in range(bucket_num + 1):
bucket_min_max.append(
(
open(f"{bucket_prefix}_{i}", "w"),
min_val + i * bucket_range,
min_val + (i + 1) * bucket_range,
)
)
bucket_min_max.append(
(
open(f"{bucket_prefix}_{i+1}", "w"),
min_val + (i + 1) * bucket_range,
max_val + 1,
)
)
with open(filename) as fp:
for value in fp:
value = int(value)
for fp_b, mmi, mmx in bucket_min_max:
if mmi <= value < mmx:
fp_b.write(f"{value}\n")
fp_b.flush()
print(f"📦 {mmi}<={value}<={mmx} -> {fp_b.name}")
break
buckets = [fp.name for fp, _, _ in bucket_min_max]
[fp.close() for fp, _, _ in bucket_min_max]
sub_buckets = []
for bucket in buckets:
sub_buckets.extend(split_to_buckets(bucket))
return sub_buckets
def sort_and_merge_buckets(bucket_files):
"""排序每个桶然后合并数据"""
for bucket_file in bucket_files:
with open(bucket_file) as fpr, open("sorted", "a") as fpw:
data = fpr.read().strip().split("\n")
numbers = sorted([int(d) for d in data if d])
str_numbers = map(lambda d: str(d) + "\n", numbers)
fpw.writelines(str_numbers)
fpw.flush()
print(f"👌 {fpr.name} -> {fpw.name}")
def generate_large_file(
filename="data.txt",
min_val=MIN_VAL,
max_val=MAX_VAL,
number_of_lines=NUMBER_OF_LINES,
):
with open(filename, "w") as fp:
for i in tqdm(range(number_of_lines), total=number_of_lines):
fp.write(f"{random.randrange(min_val, max_val)}\n")
def sort_large_file(filename):
bucket_files = split_to_buckets(filename, mode="val")
sort_and_merge_buckets(bucket_files)
if __name__ == "__main__":
filename = "data"
generate_large_file(filename)
sort_large_file(filename)
shutil.rmtree('tmp', ignore_errors=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment