Skip to content

Instantly share code, notes, and snippets.

@smerritt
Created June 14, 2014 01:39
Show Gist options
  • Save smerritt/3006c57a64f8ba3effd4 to your computer and use it in GitHub Desktop.
Save smerritt/3006c57a64f8ba3effd4 to your computer and use it in GitHub Desktop.
# NOTE: all this code is completely untested. At best, it's syntactically
# valid, but don't bet on it.
import errno
import fcntl
import os
import splicetee
F_SETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1031)
F_GETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1032)
class SpliceChainLink(object):
"""
Moves data with splice() from source to sink.
"""
def __init__(self, sourcefd, sinkfd, next_link=None,
close_source_on_completion=True,
close_sink_on_completion=True):
"""
:param sourcefd: file descriptor to splice from
:param sinkfd: file descriptor to splice to
:param next_link: (optional) next link in the splice chain
:param close_source_on_completion: close sourcefd when .complete()
:param close_sink_on_completion: close sink when .complete()
"""
self.sourcefd = sourcefd
self.sinkfd = sinkfd
self.next_link = next_link
self.close_source_on_completion = close_source_on_completion
self.close_sink_on_completion = close_sink_on_completion
def move_bytes(self, nbytes):
"""
Called when data has been written to the source. Writes it out to the
sink.
:param nbytes: number of bytes to attempt to take from the source
:returns: number of bytes moved to the sink. Might be less than
"""
bytes_moved = 0
next_link_bytes = 0
while nbytes > 0:
try:
_junk, _junk, moved = splicetee.splice(
self.sourcefd, 0, self.sinkfd, 0, nbytes, 0)
except OSError as err:
# It's annoying that a simple EWOULDBLOCK turns into a whole
# exception object instead of just returning -1 and setting
# errno or something, but that's how splicetee does it.
if err.errno != errno.EWOULDBLOCK:
raise
if self.next_link:
self.next_link.move_bytes(bytes_moved)
next_link_bytes = 0
trampoline(self.sinkfd, write=True)
else:
if moved == 0:
break
bytes_moved += moved
next_link_bytes += moved
nbytes -= moved
if self.next_link:
self.next_link.move_bytes(next_link_bytes)
next_link_bytes = 0
return bytes_moved
def close(self):
"""
Takes care of any necessary finalization, e.g. closing filehandles. It
is the responsibility of the user to call close(); failure to do so
may result in resource leaks.
"""
# When we're splicing stuff from disk <---> socket, there's at least
# one intermediate pipe. Typically, we want to close all those pipes,
# but leave the disk filehandle and the socket filehandle sitting open
# because there's other code that handles closing them. Thus, we need
# two separate flags for fine-grained control.
#
# Also, recall that a pipe has two file descriptors, so we don't need
# to coordinate who-closes-what with our next link. We'll close the
# write end of the pipe (our sink), and they'll close the read end
# (their source).
if self.close_source_on_completion:
os.close(self.sourcefd)
if self.close_sink_on_completion:
os.close(self.sinkfd)
if self.next_link:
next_link.close()
class ThreadpooledSpliceChainLink(SpliceChainLink):
"""
A link in a splice chain that does things in a threadpool. Useful for disk IO.
"""
def __init__(self, *args, **kwargs):
self.threadpool = kwargs.pop('threadpool')
super(ThreadpooledSpliceChainLink, self).__init__(*args, **kwargs)
def move_bytes(self, nbytes):
self.threadpool.run_in_thread(
super(ThreadpooledSpliceChainLink, self).move_bytes, nbytes)
class Md5SpliceChainLink(SpliceChainLink):
"""
SpliceChainLink that MD5-hashes the bytes that go through it. Uses an
in-kernel MD5 socket so that data doesn't enter user-space.
The MD5 checksum will be in the 'md5sum' attribute after .close() is
called.
"""
def __init__(self, *args, **kwargs):
super(ThreadpooledSpliceChainLink, self).__init__(*args, **kwargs)
self.md5sum = None
self.total_bytes_hashed = 0
self.hash_rpipe, self.hash_wpipe = os.pipe()
self.md5_sockfd = some_jiggery_pokery_yielding_an_md5_socket()
# Make sure our pipe is the same size as the source pipe so that we
# can tee() all the data over in a single syscall.
fcntl.fcntl(self.hash_rpipe, F_SETPIPE_SZ,
fcntl.fcntl(self.sourcefd, F_GETPIPE_SZ, 0))
def move_bytes(self, nbytes):
# At this point, the source has $nbytes bytes in it. Tee it over to
# the hash socket.
bytes_copied = splicetee.tee(self.source, self.hash_wpipe, nbytes, 0)
if bytes_copied != nbytes:
# We teed data between two pipes of equal size, and the
# destination pipe was empty, but somehow there wasn't enough
# room...? This should never happen.
raise Exception("tee() failed: tried to copy %d bytes, "
"but only moved %d" %
(bytes_in_pipe, bytes_copied))
# Take the data and feed it into an in-kernel MD5 socket. The
# MD5 socket hashes data that is written to it. Reading from
# it yields the MD5 checksum of the written data.
hashed = splicetee.splice(self.hash_rpipe, 0, self.md5_sockfd, 0,
nbytes, splicetee.SPLICE_F_MORE)[2]
if hashed != bytes_in_pipe:
# It's a data sink. It doesn't get full, so this should never
# happen.
raise Exception("md5 socket didn't take all the data? "
"(tried to write %d, but wrote %d)" %
(bytes_in_pipe, hashed))
self.total_bytes_hashed += hashed
return super(Md5SpliceChainLink, self).move_bytes(nbytes)
def close(self):
# Linux MD5 sockets return '00000000000000000000000000000000' for the
# checksum if you didn't write any bytes to them, which is incorrect.
if self.total_bytes_hashed > 0:
bin_checksum = os.read(md5_sockfd, 16)
hex_checksum = ''.join("%02x" % ord(c) for c in bin_checksum)
else:
hex_checksum = 'd41d8cd98f00b204e9800998ecf8427e' # md5("")
self.md5sum = hex_checksum
os.close(self.hash_rpipe)
os.close(self.hash_wpipe)
os.close(self.md5_sockfd)
return super(Md5SpliceChainLink, self).close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment