Skip to content

Instantly share code, notes, and snippets.

@rgkimball
Last active August 9, 2021 03:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rgkimball/1726190c6bfad1b66a679b447e1175a4 to your computer and use it in GitHub Desktop.
Save rgkimball/1726190c6bfad1b66a679b447e1175a4 to your computer and use it in GitHub Desktop.
File rowcount estimation
def count_file_iter(*ar, **kw):
"""
Small utility to iterate through individual rows of a file given a generator,
used only to count rows.
:param ar: args for open()
:param kw: keyword args for open()
:return: yields 1 for each row
"""
for _ in open(*ar, **kw):
yield 1
def estimate_row_count(
fp: str,
max_rows: int = 10_000,
estimate_large_files: bool = True,
**kwargs
) -> int:
"""
Efficient file row estimator that stops when it reaches the specified cap; extremely large files will not
require full calculations since we already know they meet a certain threshold.
:param fp: str; file path to flat file e.g. 'large_file.csv'
:param max_rows: int; number of rows
:param estimate_large_files:
boolean; if the row count exceeds max_rows, determines whether the function should return max_rows or try to
guess the number of rows based on the original file size on disk.
:return: int
"""
itf = count_file_iter(fp, 'r', **kwargs)
c = 0
while c < max_rows:
try:
c += next(itf)
except StopIteration:
break
# If we hit the row count limit, perform an estimate from file size; 100 row sample typically accurate within ±1%.
# Since size on disk and size in memory differ, we write a small file back to the disk to produce an
# accurate estimate by extrapolating from the size of the first N rows.
if estimate_large_files:
if c == max_rows:
sample_size = 100
full_size_on_disk = os.path.getsize(fp)
with open(fp, 'r', **kwargs) as fo:
o = [next(fo) for _ in range(sample_size)]
tmp_fn = os.path.join(os.getcwd(), str(dt.now().timestamp()))
with open(tmp_fn, 'w+', **kwargs) as fo:
for ln in o:
fo.write(ln)
sample_size_on_disk = os.path.getsize(tmp_fn)
os.remove(tmp_fn)
bytes_per_row = sample_size_on_disk / sample_size
c = round(full_size_on_disk / bytes_per_row, 0)
return c
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment