Created
June 14, 2014 01:39
-
-
Save smerritt/3006c57a64f8ba3effd4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# NOTE: all this code is completely untested. At best, it's syntactically | |
# valid, but don't bet on it. | |
import errno | |
import fcntl | |
import os | |
import splicetee | |
F_SETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1031) | |
F_GETPIPE_SZ = getattr(fcntl, 'F_SETPIPE_SZ', 1032) | |
class SpliceChainLink(object): | |
""" | |
Moves data with splice() from source to sink. | |
""" | |
def __init__(self, sourcefd, sinkfd, next_link=None, | |
close_source_on_completion=True, | |
close_sink_on_completion=True): | |
""" | |
:param sourcefd: file descriptor to splice from | |
:param sinkfd: file descriptor to splice to | |
:param next_link: (optional) next link in the splice chain | |
:param close_source_on_completion: close sourcefd when .complete() | |
:param close_sink_on_completion: close sink when .complete() | |
""" | |
self.sourcefd = sourcefd | |
self.sinkfd = sinkfd | |
self.next_link = next_link | |
self.close_source_on_completion = close_source_on_completion | |
self.close_sink_on_completion = close_sink_on_completion | |
def move_bytes(self, nbytes): | |
""" | |
Called when data has been written to the source. Writes it out to the | |
sink. | |
:param nbytes: number of bytes to attempt to take from the source | |
:returns: number of bytes moved to the sink. Might be less than | |
""" | |
bytes_moved = 0 | |
next_link_bytes = 0 | |
while nbytes > 0: | |
try: | |
_junk, _junk, moved = splicetee.splice( | |
self.sourcefd, 0, self.sinkfd, 0, nbytes, 0) | |
except OSError as err: | |
# It's annoying that a simple EWOULDBLOCK turns into a whole | |
# exception object instead of just returning -1 and setting | |
# errno or something, but that's how splicetee does it. | |
if err.errno != errno.EWOULDBLOCK: | |
raise | |
if self.next_link: | |
self.next_link.move_bytes(bytes_moved) | |
next_link_bytes = 0 | |
trampoline(self.sinkfd, write=True) | |
else: | |
if moved == 0: | |
break | |
bytes_moved += moved | |
next_link_bytes += moved | |
nbytes -= moved | |
if self.next_link: | |
self.next_link.move_bytes(next_link_bytes) | |
next_link_bytes = 0 | |
return bytes_moved | |
def close(self): | |
""" | |
Takes care of any necessary finalization, e.g. closing filehandles. It | |
is the responsibility of the user to call close(); failure to do so | |
may result in resource leaks. | |
""" | |
# When we're splicing stuff from disk <---> socket, there's at least | |
# one intermediate pipe. Typically, we want to close all those pipes, | |
# but leave the disk filehandle and the socket filehandle sitting open | |
# because there's other code that handles closing them. Thus, we need | |
# two separate flags for fine-grained control. | |
# | |
# Also, recall that a pipe has two file descriptors, so we don't need | |
# to coordinate who-closes-what with our next link. We'll close the | |
# write end of the pipe (our sink), and they'll close the read end | |
# (their source). | |
if self.close_source_on_completion: | |
os.close(self.sourcefd) | |
if self.close_sink_on_completion: | |
os.close(self.sinkfd) | |
if self.next_link: | |
next_link.close() | |
class ThreadpooledSpliceChainLink(SpliceChainLink): | |
""" | |
A link in a splice chain that does things in a threadpool. Useful for disk IO. | |
""" | |
def __init__(self, *args, **kwargs): | |
self.threadpool = kwargs.pop('threadpool') | |
super(ThreadpooledSpliceChainLink, self).__init__(*args, **kwargs) | |
def move_bytes(self, nbytes): | |
self.threadpool.run_in_thread( | |
super(ThreadpooledSpliceChainLink, self).move_bytes, nbytes) | |
class Md5SpliceChainLink(SpliceChainLink): | |
""" | |
SpliceChainLink that MD5-hashes the bytes that go through it. Uses an | |
in-kernel MD5 socket so that data doesn't enter user-space. | |
The MD5 checksum will be in the 'md5sum' attribute after .close() is | |
called. | |
""" | |
def __init__(self, *args, **kwargs): | |
super(ThreadpooledSpliceChainLink, self).__init__(*args, **kwargs) | |
self.md5sum = None | |
self.total_bytes_hashed = 0 | |
self.hash_rpipe, self.hash_wpipe = os.pipe() | |
self.md5_sockfd = some_jiggery_pokery_yielding_an_md5_socket() | |
# Make sure our pipe is the same size as the source pipe so that we | |
# can tee() all the data over in a single syscall. | |
fcntl.fcntl(self.hash_rpipe, F_SETPIPE_SZ, | |
fcntl.fcntl(self.sourcefd, F_GETPIPE_SZ, 0)) | |
def move_bytes(self, nbytes): | |
# At this point, the source has $nbytes bytes in it. Tee it over to | |
# the hash socket. | |
bytes_copied = splicetee.tee(self.source, self.hash_wpipe, nbytes, 0) | |
if bytes_copied != nbytes: | |
# We teed data between two pipes of equal size, and the | |
# destination pipe was empty, but somehow there wasn't enough | |
# room...? This should never happen. | |
raise Exception("tee() failed: tried to copy %d bytes, " | |
"but only moved %d" % | |
(bytes_in_pipe, bytes_copied)) | |
# Take the data and feed it into an in-kernel MD5 socket. The | |
# MD5 socket hashes data that is written to it. Reading from | |
# it yields the MD5 checksum of the written data. | |
hashed = splicetee.splice(self.hash_rpipe, 0, self.md5_sockfd, 0, | |
nbytes, splicetee.SPLICE_F_MORE)[2] | |
if hashed != bytes_in_pipe: | |
# It's a data sink. It doesn't get full, so this should never | |
# happen. | |
raise Exception("md5 socket didn't take all the data? " | |
"(tried to write %d, but wrote %d)" % | |
(bytes_in_pipe, hashed)) | |
self.total_bytes_hashed += hashed | |
return super(Md5SpliceChainLink, self).move_bytes(nbytes) | |
def close(self): | |
# Linux MD5 sockets return '00000000000000000000000000000000' for the | |
# checksum if you didn't write any bytes to them, which is incorrect. | |
if self.total_bytes_hashed > 0: | |
bin_checksum = os.read(md5_sockfd, 16) | |
hex_checksum = ''.join("%02x" % ord(c) for c in bin_checksum) | |
else: | |
hex_checksum = 'd41d8cd98f00b204e9800998ecf8427e' # md5("") | |
self.md5sum = hex_checksum | |
os.close(self.hash_rpipe) | |
os.close(self.hash_wpipe) | |
os.close(self.md5_sockfd) | |
return super(Md5SpliceChainLink, self).close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment