Skip to content

Instantly share code, notes, and snippets.

@laserson
Created February 11, 2014 18:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save laserson/8941547 to your computer and use it in GitHub Desktop.
Save laserson/8941547 to your computer and use it in GitHub Desktop.
Allow streaming of Avro data using the Python client. Simulates a seekable file type.
# The Python avro client expects a seekable Avro data file, which makes it annoying
# to stream bytes through it using HDFS clients that just give you cat (like snakebite).
# It's idiotic because the client only seeks to the end in order to call tell() to get
# the file size, which in turn is only used to determine when you get to EOF.
import snakebite.client
class AvroStreamWrapper(object):
# this class can be provided to DataFileReader to read Avro data.
def __init__(self, hdfs_client, path):
# hdfs_client is snakebite.client.Client
# path is a non-globbing string
self.hdfs_client = hdfs_client
self.path = path
# source is a generator. calling .next() gives me bytes
self.source = hdfs_client.cat([self.path], check_crc=True).next()
self.buffer = ''
self.cursor = 0
self.last16 = ''
self.last_seek_mode = 0
def read(self, n):
if self.last_seek_mode == 0 or self.last_seek_mode == 1:
try:
while len(self.buffer) < n:
self.buffer += self.source.next()
except StopIteration:
pass
buf = self.buffer[:n]
self.buffer = self.buffer[n:]
self.cursor += len(buf)
self.last16 = self.last16[len(buf):] + buf[-16:]
return buf
else:
raise ValueError("Can't read from the end of a stream.")
def seek(self, offset, from_what=0):
self.last_seek_mode = from_what
if from_what == 0: # from the beginning
if offset < self.cursor:
diff = self.cursor - offset
if diff > len(self.last16):
raise ValueError("Can't move back that far")
self.cursor = offset
self.buffer = self.last16[-diff:] + self.buffer
self.last16 = self.last16[:-diff]
elif offset == self.cursor:
pass
else:
diff = offset - self.cursor
self.read(diff)
self.cursor += diff
elif from_what == 1: # from the cursor
if offset < 0:
diff = -offset
if diff > len(self.last16):
raise ValueError("Can't move back that far")
self.cursor = offset
self.buffer = self.last16[-diff:] + self.buffer
self.last16 = self.last16[:-diff]
elif offset == 0:
pass
else:
self.read(offset)
self.cursor += offset
elif from_what == 2: # from the end
if offset != 0:
# hack: I allow offset == 0 because avro uses this to determine
# the size of the file with a subsequent call to tell()
raise ValueError("Can't read from the end of a stream.")
else:
raise ValueError("Acceptable values of from_what are 0, 1, or 2.")
def tell(self):
if self.last_seek_mode == 0 or self.last_seek_mode == 1:
return self.cursor
else:
# hack: Avro DataFileWriter seeks to the end of the file and calls
# tell() just to find out the file size
ls = self.hdfs_client.ls([self.path]).next()
return ls['length']
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment