Skip to content

Instantly share code, notes, and snippets.

@alculquicondor
Created October 1, 2017 20:39
Show Gist options
  • Save alculquicondor/3121901fb80a1f06ac85182643cb0f41 to your computer and use it in GitHub Desktop.
Save alculquicondor/3121901fb80a1f06ac85182643cb0f41 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import argparse
import collections
import os
import string
import multiprocessing
import time
BUFFER_SIZE = 20 << 20 # 20MB
MAX_THREADS = len(os.sched_getaffinity(0))
PUNCTUATION_REMOVAL_TRANSLATOR = str.maketrans("", "", string.punctuation)
class RangeDescriptor(object):
def __init__(self, start, first_word="", last_word=""):
self.start = start
self.first_word = first_word
self.last_word = last_word
def __str__(self):
return "(%s, %s)" % (self.first_word or "<>", self.last_word or "<>")
def get_words(text, descriptor, first):
text = text.translate(PUNCTUATION_REMOVAL_TRANSLATOR)
last_whitespace = -1
for i in range(len(text)):
if text[i] in string.whitespace:
if last_whitespace < i - 1:
word = text[last_whitespace + 1 : i]
if last_whitespace == -1 and descriptor.last_word:
word = descriptor.last_word + word
word = word.lower()
if last_whitespace == -1 and first:
descriptor.first_word = word
else:
yield word
last_whitespace = i
if last_whitespace < len(text) - 1:
descriptor.last_word = text[last_whitespace + 1 :].lower()
else:
descriptor.last_word = ""
def task(filename, start, end, queue):
counter = collections.Counter()
descriptor = RangeDescriptor(start)
with open(filename, "r") as f:
f.seek(start)
first = True
while start < end:
char_count = min(end - start, BUFFER_SIZE)
text = f.read(char_count)
start += char_count
last_word_container = []
counter.update(get_words(text, descriptor, first))
first = False
queue.put((descriptor, counter))
def main(filename, process_count):
time_start = time.perf_counter()
try:
file_size = os.stat(filename).st_size
except FileNotFoundError as exc:
print(exc, file=sys.stderr)
exit(exc.errno)
queue = multiprocessing.Queue()
pieces = [file_size // process_count for _ in range(process_count)]
for i in range(file_size % process_count):
pieces[i] += 1
start = 0
processes = []
for i in range(process_count):
process = multiprocessing.Process(
target=task, args=(filename, start, start + pieces[i], queue))
process.start()
processes.append(process)
start += pieces[i]
for process in processes:
process.join()
results = []
for _ in range(process_count):
results.append(queue.get())
results.sort(key=lambda r: r[0].start)
counter = collections.Counter()
descriptors = []
for descriptor, local_counter in results:
descriptors.append(descriptor)
counter.update(local_counter)
for i in range(1, process_count):
word = descriptors[i - 1].last_word + descriptors[i].first_word
counter[word] += 1
if descriptors[0].first_word:
counter[descriptors[0].first_word] += 1
if descriptors[-1].last_word:
counter[descriptors[-1].last_word] += 1
time_end = time.perf_counter()
for word, count in counter.items():
print("%s: %d" % (word, count))
print("=> Processing time: %.4f" % (time_end - time_start))
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Does a word count on the provided file")
parser.add_argument("file", help="The file to process")
parser.add_argument("--processes", "-t", type=int, dest="processes", default=MAX_THREADS)
args = parser.parse_args()
main(args.file, args.processes)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment