Skip to content

Instantly share code, notes, and snippets.

@fxn
Last active May 28, 2017 17:51
Show Gist options
  • Save fxn/689f2f92f1bd2cac52628e4bd499b96e to your computer and use it in GitHub Desktop.
Save fxn/689f2f92f1bd2cac52628e4bd499b96e to your computer and use it in GitHub Desktop.

Four solutions

I have worked first with the file downloaded to make sure I got the result right. Those drafts are not here.

After that we have four solutions:

  1. A simple one that fetches the entire payload in one request.
  2. An streamed one, same idea, but processing by chunks.
  3. A parallel approach that speeds up the download using HTTP range requests.
  4. A variant of the previous one that parallelizes the computation as well.

In my MacBook Pro with fiber via wifi (~280 Mpbs) I get these times:

Solution Runtime Memory
simple ~5 min High
streamed 2 min 55 s Low
parallel 1 min 20 s High
parallel streamed 53 s Low

Since this is dominated by I/O, times vary, but they are around those ballparks.

Generic performance considerations

  • S3 does not support compression, so there is no need to send Accept-Encoding.
  • The average of a collection of numbers can be computed on the fly, incrementally, no need to collect them all, sum them and divide. The first three solutions compute it that way. This is relavant since there are 10,906,858 data points. The fourth solution does that per process, and then computes the grand total from the partials.
  • Those formulas have divisions. Division is not exact even with arbitrary precision decimals, but it is with rational numbers. I compared the performance of rationals and floats in this problem. The average with rationals needed about 3 minutes, while floats needed 20 seconds. The exact result computed with rationals is 1.7506631158120882, and floats yield 1.7506631158119936 which is not that bad (1.75066311581206 in the 4th solution). A real use case should decide if those decimals are important. The solutions use floats given this tiny error.
  • In general is better to reuse buffers where possible.
  • As a rule of thumb, it is better to delegate as much as possible to the builtin functions, since oftentimes they are written in C. This is particularly important in buffered solutions.
  • I have played a bit with buffer sizes, 4096 is a typical one that works well, 8192 had no measurable difference, 1MB slowed things down.
  • Concatenation of bytearrays has terrible performance, the simple parallel downloader used it in some initial versions and I could not understand what was going on with the slowness of downloads. Lists are faster for that use case.

It's been a long time

It's been about 14 years since I did any Python. If the dog that flies the helicopter had a Twitter account, he would send a picture of me for "I have no idea of what I am doing".

import http.client
host = 's3.amazonaws.com'
path = '/carto-1000x/data/yellow_tripdata_2016-01.csv'
conn = http.client.HTTPSConnection(host)
conn.request('GET', path)
payload = conn.getresponse().read()
avg = 0
for i, line in enumerate(payload.splitlines()):
if i == 0:
continue
tip_amount = float(line.split(b',')[-4])
avg += (tip_amount - avg)/i
print(avg)
import http.client
host = 's3.amazonaws.com'
path = '/carto-1000x/data/yellow_tripdata_2016-01.csv'
bsize = 4096
class Averager:
def __init__(self):
self.headers = False # have the headers been consumed?
self.ndata = 0 # number of data points so far
self.avg = 0 # incremental average
self.pline = None # partial line at end of buffer, if any
def process(self, buffer):
lines = buffer.splitlines(True)
if self.pline:
lines[0] = self.pline + lines[0]
self.pline = None
if not lines[-1].endswith(b'\n'):
self.pline = lines.pop()
if not self.headers:
if len(lines) > 0:
lines = lines[1:]
self.headers = True
for line in lines:
tip_amount = float(line.split(b',')[-4])
self.ndata += 1
self.avg += (tip_amount - self.avg)/self.ndata
conn = http.client.HTTPSConnection(host)
conn.request('GET', path)
resp = conn.getresponse()
averager = Averager()
buffer = bytearray(bsize)
while not resp.closed:
nbytes = resp.readinto(buffer)
if nbytes > 0:
averager.process(buffer[0:nbytes])
else:
break
print(averager.avg)
import http.client
import multiprocessing
method = 'GET'
host = 's3.amazonaws.com'
path = '/carto-1000x/data/yellow_tripdata_2016-01.csv'
body = None
bsize = 4096
nprocs = multiprocessing.cpu_count()
def headers(byte_range):
return {'Range': 'bytes=' + str(byte_range[0]) + '-' + str(byte_range[1])}
def fetch(byte_range, writer):
conn = http.client.HTTPSConnection(host)
conn.request(method, path, body, headers(byte_range))
resp = conn.getresponse()
buffer = bytearray(bsize)
# We are going to collect this in a list because concatenating bytearrays
# has horrible performance.
chunks = []
while True:
nbytes = resp.readinto(buffer)
if nbytes > 0:
chunks.append(buffer[0:nbytes])
else:
break
for chunk in chunks:
writer.send_bytes(chunk)
writer.close()
return
class Averager:
def __init__(self):
self.headers = False # have the headers been consumed?
self.ndata = 0 # number of data points so far
self.avg = 0 # incremental average
self.pline = None # partial line at end of buffer, if any
def process(self, buffer):
lines = buffer.splitlines(True)
if self.pline:
lines[0] = self.pline + lines[0]
self.pline = None
if not lines[-1].endswith(b'\n'):
self.pline = lines.pop()
if not self.headers:
if len(lines) > 0:
lines = lines[1:]
self.headers = True
for line in lines:
tip_amount = float(line.split(b',')[-4])
self.ndata += 1
self.avg += (tip_amount - self.avg)/self.ndata
# Would be computed using a HEAD request, hard-coded to simplify the code, is
# not going to make a difference in performance.
content_length = 1708674492
byte_range_width = int(content_length/nprocs)
byte_ranges = []
from_byte = 0
while from_byte <= content_length:
# Range requests may have limits greater than the file size. In such case
# the server returns what is left.
to_byte = from_byte + byte_range_width
byte_ranges.append([from_byte, to_byte])
from_byte = to_byte + 1
averager = Averager()
readers = []
for byte_range in byte_ranges:
reader, writer = multiprocessing.Pipe(False)
process = multiprocessing.Process(target=fetch, args=(byte_range, writer))
process.start()
readers.append(reader)
buffer = bytearray(bsize)
for reader in readers:
while True:
nbytes = reader.recv_bytes_into(buffer)
averager.process(buffer[0:nbytes])
# There is no guarantee that the chunks are of this size, though
# in practice it is normally the case except for maybe the last one.
# This should check a different condition and trap EOFError. Problem
# is rev_bytes_into() blocked for some reason I could not understand.
# In real code this break condition should be fixed.
if nbytes < bsize:
break
print(averager.avg)
import http.client
from multiprocessing import cpu_count, Process, Queue
method = 'GET'
host = 's3.amazonaws.com'
path = '/carto-1000x/data/yellow_tripdata_2016-01.csv'
body = None
bsize = 4096
nprocs = cpu_count()
def headers(byte_range):
return {'Range': 'bytes=' + str(byte_range[0]) + '-' + str(byte_range[1])}
class Averager:
def __init__(self):
self.ndata = 0 # number of data points so far
self.avg = 0 # incremental average
self.pline = None # partial line at end of buffer, if any
def process(self, lines):
if self.pline:
lines[0] = self.pline + lines[0]
self.pline = None
if not lines[-1].endswith(b'\n'):
self.pline = lines.pop()
for line in lines:
tip_amount = float(line.split(b',')[-4])
self.ndata += 1
self.avg += (tip_amount - self.avg)/self.ndata
# Computes the average of tip_amount in the whole lines in the byte range,
# except for the first and (perhaps) last ones, which could be partial.
# Puts in the queue the first line and an averager in a dictionary.
def byte_range_avg(byte_range, queue):
averager = Averager()
first_line = None
conn = http.client.HTTPSConnection(host)
conn.request(method, path, body, headers(byte_range))
resp = conn.getresponse()
buffer = bytearray(bsize)
while True:
nbytes = resp.readinto(buffer)
if nbytes > 0:
lines = buffer[0:nbytes].splitlines(True)
# There is no guarantee that we read a whole line even if there are
# lines in the file to be read in this range. A real program would
# implement seek_line() or somesuch to loop until you get at least
# one or whatever payload until end of stream. For the purposes of
# the exercise we'll leave it here, it works in practice.
if not first_line: first_line = lines.pop(0)
if lines: averager.process(lines)
else:
break
queue.put({ 'first_line': first_line, 'averager': averager })
queue.close()
#
# --- Compute the HTTP ranges, as many as CPU cores ----------------------------
#
# Would be computed using a HEAD request, hard-coded to simplify the code, is
# not going to make a difference in performance.
content_length = 1708674492
byte_range_width = int(content_length/nprocs)
byte_ranges = []
from_byte = 0
while from_byte <= content_length:
# Range requests may have limits greater than the file size. In such case
# the server returns what is left.
to_byte = from_byte + byte_range_width
byte_ranges.append([from_byte, to_byte])
from_byte = to_byte + 1
#
# --- Fork one process per range and capture the results in order --------------
#
queues = []
for byte_range in byte_ranges:
queue = Queue()
process = Process(target=byte_range_avg, args=(byte_range, queue))
process.start()
queues.append(queue)
results = [ queue.get(True) for queue in queues ]
#
# --- Additional averager for the first and last lines per range ---------------
#
pending = []
for result in results:
pending.append(result['first_line'])
pending.append(result['averager'].pline)
pending.pop(0) # remove CSV header
pending = list(filter(bool, pending)) # edge cases
averager = Averager()
averager.process(b''.join(pending).splitlines(True))
#
# --- Final computations -------------------------------------------------------
#
averagers = [ result['averager'] for result in results ]
averagers.append(averager)
avg = 0
ndata = 0
for averager in averagers:
agg_ndata = ndata + averager.ndata
if agg_ndata:
avg = avg*ndata/agg_ndata + averager.avg*averager.ndata/agg_ndata
ndata += averager.ndata
print(avg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment