Created
February 11, 2014 18:55
-
-
Save laserson/8941547 to your computer and use it in GitHub Desktop.
Allow streaming of Avro data using the Python client. Simulates a seekable file type.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The Python avro client expects a seekable Avro data file, which makes it annoying | |
# to stream bytes through it using HDFS clients that just give you cat (like snakebite). | |
# It's idiotic because the client only seeks to the end in order to call tell() to get | |
# the file size, which in turn is only used to determine when you get to EOF. | |
import snakebite.client | |
class AvroStreamWrapper(object): | |
# this class can be provided to DataFileReader to read Avro data. | |
def __init__(self, hdfs_client, path): | |
# hdfs_client is snakebite.client.Client | |
# path is a non-globbing string | |
self.hdfs_client = hdfs_client | |
self.path = path | |
# source is a generator. calling .next() gives me bytes | |
self.source = hdfs_client.cat([self.path], check_crc=True).next() | |
self.buffer = '' | |
self.cursor = 0 | |
self.last16 = '' | |
self.last_seek_mode = 0 | |
def read(self, n): | |
if self.last_seek_mode == 0 or self.last_seek_mode == 1: | |
try: | |
while len(self.buffer) < n: | |
self.buffer += self.source.next() | |
except StopIteration: | |
pass | |
buf = self.buffer[:n] | |
self.buffer = self.buffer[n:] | |
self.cursor += len(buf) | |
self.last16 = self.last16[len(buf):] + buf[-16:] | |
return buf | |
else: | |
raise ValueError("Can't read from the end of a stream.") | |
def seek(self, offset, from_what=0): | |
self.last_seek_mode = from_what | |
if from_what == 0: # from the beginning | |
if offset < self.cursor: | |
diff = self.cursor - offset | |
if diff > len(self.last16): | |
raise ValueError("Can't move back that far") | |
self.cursor = offset | |
self.buffer = self.last16[-diff:] + self.buffer | |
self.last16 = self.last16[:-diff] | |
elif offset == self.cursor: | |
pass | |
else: | |
diff = offset - self.cursor | |
self.read(diff) | |
self.cursor += diff | |
elif from_what == 1: # from the cursor | |
if offset < 0: | |
diff = -offset | |
if diff > len(self.last16): | |
raise ValueError("Can't move back that far") | |
self.cursor = offset | |
self.buffer = self.last16[-diff:] + self.buffer | |
self.last16 = self.last16[:-diff] | |
elif offset == 0: | |
pass | |
else: | |
self.read(offset) | |
self.cursor += offset | |
elif from_what == 2: # from the end | |
if offset != 0: | |
# hack: I allow offset == 0 because avro uses this to determine | |
# the size of the file with a subsequent call to tell() | |
raise ValueError("Can't read from the end of a stream.") | |
else: | |
raise ValueError("Acceptable values of from_what are 0, 1, or 2.") | |
def tell(self): | |
if self.last_seek_mode == 0 or self.last_seek_mode == 1: | |
return self.cursor | |
else: | |
# hack: Avro DataFileWriter seeks to the end of the file and calls | |
# tell() just to find out the file size | |
ls = self.hdfs_client.ls([self.path]).next() | |
return ls['length'] | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment