Skip to content

Instantly share code, notes, and snippets.

@ruimaranhao
Last active January 16, 2018 11:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ruimaranhao/bdf980dbef041c2d9abf2553b726c780 to your computer and use it in GitHub Desktop.
Save ruimaranhao/bdf980dbef041c2d9abf2553b726c780 to your computer and use it in GitHub Desktop.
Read a large dataset
import multiprocessing
import time
import os
import mmap
from math import ceil
def size_and_chunks(file_name, processes):
sz = os.path.getsize(file_name)
return sz, sz / processes + 1
def to_number(cad):
try:
fields = cad.decode('ascii').split(",")
if len(fields) != 19:
return 0
return float(fields[15])
except:
return 0
def count_lines(queue, pid, processes, file_name):
lines = 0
with open(file_name, "r") as fo:
fid = fo.fileno()
sz, chunk = size_and_chunks(file_name, processes)
mm = mmap.mmap(fid, 0, access=mmap.ACCESS_READ)
start_position = ceil(chunk * pid)
end_position = ceil(chunk * (pid + 1))
end_position = sz if end_position > sz else end_position
mm.seek(start_position)
tip_amount = 0
while True:
line = mm.readline()
if mm.tell() > end_position or len(line) == 0:
break
tip_amount += to_number(line)
lines += 1
queue.put((lines, tip_amount))
mm.close()
if __name__ == '__main__':
t = time.time()
file_name = 'yellow_tripdata_2016-01.csv'
processes = multiprocessing.cpu_count()
queue = multiprocessing.Queue()
jobs = []
for pid in range(processes):
p = multiprocessing.Process(target=count_lines, args=(queue, pid, processes, file_name))
p.start()
jobs.append(p)
jobs[0].join()
lines, avg_tip_amount = 0, 0
for p in range(0, processes):
l, tp = queue.get()
lines += l
avg_tip_amount += tp
avg_tip_amount /= lines - 1 #do not count header
print('{:0.2f} seconds\nLines: {}\nAvg Tip Amount: {:0.4f}'.format(time.time() - t, lines, avg_tip_amount))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment