Skip to content

Instantly share code, notes, and snippets.

@okumura
Created November 24, 2019 22:51
Show Gist options
  • Save okumura/f72e977ea7ab199018533e6c8ad8c72d to your computer and use it in GitHub Desktop.
Save okumura/f72e977ea7ab199018533e6c8ad8c72d to your computer and use it in GitHub Desktop.
wc -l
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing
import os
import sys
import time
import threading
import concurrent.futures as futures
from concurrent.futures import ProcessPoolExecutor
def get_chunk_line_count((name, start, stop, blocksize)):
left = stop - start
def blocks(f, left):
while left > 0:
b = f.read(min(left, blocksize))
if b:
yield b
else:
break
left -= len(b)
with open(name, 'r') as f:
f.seek(start)
return sum(bl.count('\n') for bl in blocks(f, left))
def get_file_offset_ranges(name, blocksize=65536, m=1):
fsize = os.stat(name).st_size
chunksize = (fsize // multiprocessing.cpu_count()) * m
n = fsize // chunksize
ranges = []
for i in range(0, n * chunksize, chunksize):
ranges.append((name, i, i + chunksize, blocksize))
if fsize % chunksize != 0:
ranges.append((name, ranges[-1][2], fsize, blocksize))
return ranges
def wc_proc_pool_exec(name, blocksize=65536):
ranges = get_file_offset_ranges(name, blocksize)
with ProcessPoolExecutor(max_workers=len(ranges)) as executor:
results = [executor.submit(get_chunk_line_count, param) for param in ranges]
return sum([future.result() for future in futures.as_completed(results)])
print(wc_proc_pool_exec(sys.argv[1]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment