Skip to content

Instantly share code, notes, and snippets.

@8enmann
Last active June 22, 2019 20:14
Show Gist options
  • Save 8enmann/31f15cc1616baf7c6acc117745871408 to your computer and use it in GitHub Desktop.
Save 8enmann/31f15cc1616baf7c6acc117745871408 to your computer and use it in GitHub Desktop.
Benchmark reading files with threading and multiprocessing
"""Read files using threading and multiprocessing.
Execute on https://coderpad.io/sandbox
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
from multiprocessing.dummy import Pool
import multiprocessing
from collections import Counter
import glob
import time
import os
from functools import reduce
from typing import List, Iterable
import numpy as np
class Timer:
"""Simple class to print elapsed time in its context."""
def __enter__(self):
self.t = time.time()
def __exit__(self, *args):
print(f'{time.time() - self.t:.4f}s elapsed')
def header(text):
"""Print a section header."""
print('='*10, text, '='*10)
def counts_from_file(filename: str, sleep_seconds=0):
"""Compute counts of whitespace separated tokens in a file.
If the file can't be opened or read, return empty Counter.
"""
time.sleep(sleep_seconds)
try:
with open(filename) as f:
text = f.read()
except (UnicodeDecodeError, PermissionError):
# Skip binary files and files without permission
return Counter()
return Counter(text.split())
# Get a list of non directory files.
files = list(filter(lambda x: not os.path.isdir(x), glob.glob('/etc/**/*')))
print('found', len(files), 'files')
def print_sizes(files: List[str], first: int = 10):
"""Print the largest files and their sizes in bytes."""
sizes = map(os.path.getsize, files)
print(sorted(zip(files, sizes), key=lambda x: -x[1])[:10])
# Test on the first file.
print(counts_from_file(files[0]).most_common(10))
header('single thread')
with Timer():
c = Counter()
for f in files:
c.update(counts_from_file(f))
print(c.most_common(10))
header('thread pool')
with Timer():
c.clear()
with ThreadPoolExecutor() as e:
futures = [e.submit(counts_from_file, f) for f in files]
#c = reduce(lambda a,b: a + b.result(), as_completed(futures), Counter())
for fut in as_completed(futures):
c.update(fut.result())
print(c.most_common(10))
header('map reduce thread pool')
with Timer():
c.clear()
with Pool() as p:
for result in p.imap(counts_from_file, files):
c.update(result)
#c = reduce(lambda a,b: a + b, p.imap(counts_from_file, files))
print(c.most_common(10))
header('map reduce proc pool')
with Timer():
c.clear()
with multiprocessing.Pool() as p:
# c = reduce(lambda a,b: a + b, p.imap(counts_from_file, files))
for result in p.imap(counts_from_file, files):
c.update(result)
print(c.most_common(10))
print(f'vocab size {len(c)}, total tokens {sum(c.values())}')
# Insert extra markers
EOF = '<<EOF>>'
UNK = '<<UNK>>'
c.update([EOF, UNK])
# Make a lookup table for our vocab
vocab = dict(zip(c, range(len(c))))
reverse = dict([(v,k) for k,v in vocab.items()])
EOF_EMB = vocab[EOF]
UNK_EMB = vocab[UNK]
import json
# Write vocab to disk so we can reuse it.
with open('vocab.json', 'w') as f:
f.write(json.dumps(vocab))
def encode(filename: str, sleep_seconds=0) -> Iterable[int]:
"""Embed the files according to the vocab.
If the file can't be opened or read, return None.
If the word isn't in the vocab, replace with UNK.
Append EOF to the end of the file.
"""
time.sleep(sleep_seconds)
try:
with open(filename) as f:
text = f.read()
except (UnicodeDecodeError, PermissionError):
# Skip binary files and files without permission
return None
return [vocab.get(x, UNK_EMB) for x in text.split()] + [EOF_EMB]
def decode(arr: Iterable[int]) -> List[str]:
return ' '.join([reverse[x] for x in arr]).split(EOF)
print('\n' * 3)
header('encode')
with Timer():
res = list(filter(lambda x: x, map(encode, files)))
combined = np.concatenate(res)
print(len(res), len(combined))
np.savez('data.npz', combined)
header('thread pool')
with Timer():
with ThreadPoolExecutor() as e:
res = list(filter(lambda x: x, map(lambda x: x.result(), [e.submit(encode, f) for f in files])))
combined = np.concatenate(res)
print(len(res), len(combined))
# Try decoding.
with Timer():
out = decode(combined)
print(len(out), out[0][:1000])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment