Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save aeroevan/988dde466a17b70ff4ae to your computer and use it in GitHub Desktop.
Save aeroevan/988dde466a17b70ff4ae to your computer and use it in GitHub Desktop.
Fixup for concatenated avro files
From 28938c5e63262d69f3c670e3cc7de9963318d239 Mon Sep 17 00:00:00 2001
From: Evan McClain <aeroevan@gmail.com>
Date: Mon, 7 Dec 2015 00:53:39 -0500
Subject: [PATCH] Add optional offset to _read_header.
---
src/avro/datafile.py | 21 ++++++++++++---------
1 file changed, 12 insertions(+), 9 deletions(-)
diff --git a/src/avro/datafile.py b/src/avro/datafile.py
index e12a68f..208e965 100644
--- a/src/avro/datafile.py
+++ b/src/avro/datafile.py
@@ -98,7 +98,7 @@ class DataFileWriter(object):
else:
# open writer for reading to collect metadata
dfr = DataFileReader(writer, io.DatumReader())
-
+
# TODO(hammer): collect arbitrary metadata
# collect metadata
self._sync_marker = dfr.sync_marker
@@ -179,7 +179,7 @@ class DataFileWriter(object):
# Write block
self.writer.write(compressed_data)
-
+
# Write CRC32 checksum for Snappy
if self.get_meta(CODEC_KEY) == 'snappy':
self.encoder.write_crc32(uncompressed_data)
@@ -188,7 +188,7 @@ class DataFileWriter(object):
self.writer.write(self.sync_marker)
# reset buffer
- self.buffer_writer.truncate(0)
+ self.buffer_writer.truncate(0)
self.block_count = 0
def append(self, datum):
@@ -223,14 +223,14 @@ class DataFileReader(object):
"""Read files written by DataFileWriter."""
# TODO(hammer): allow user to specify expected schema?
# TODO(hammer): allow user to specify the encoder
- def __init__(self, reader, datum_reader):
+ def __init__(self, reader, datum_reader, offset=0):
self._reader = reader
self._raw_decoder = io.BinaryDecoder(reader)
self._datum_decoder = None # Maybe reset at every block.
self._datum_reader = datum_reader
-
+
# read the header: magic, meta, sync
- self._read_header()
+ self._read_header(offset)
# ensure codec is valid
self.codec = self.get_meta('avro.codec')
@@ -290,9 +290,10 @@ class DataFileReader(object):
def is_EOF(self):
return self.reader.tell() == self.file_length
- def _read_header(self):
+ def _read_header(self, offset):
# seek to the beginning of the file to get magic block
- self.reader.seek(0, 0)
+ pos = self.reader.tell()
+ self.reader.seek(offset, 0)
# read header into a dict
header = self.datum_reader.read_data(
@@ -309,6 +310,8 @@ class DataFileReader(object):
# set sync marker
self._sync_marker = header['sync']
+ if pos > 0:
+ self.reader.seek(pos)
def _read_block_header(self):
self.block_count = self.raw_decoder.read_long()
@@ -359,7 +362,7 @@ class DataFileReader(object):
else:
self._read_block_header()
- datum = self.datum_reader.read(self.datum_decoder)
+ datum = self.datum_reader.read(self.datum_decoder)
self.block_count -= 1
return datum
--
2.6.3
#!/usr/bin/env python
import os
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
def fix_file(schemafile, infile, outfile):
schema = avro.schema.parse(open(schemafile).read())
writer = DataFileWriter(open(outfile, "wb"), DatumWriter(), schema,
codec='deflate')
f = open(infile, "rb")
while (f.tell() != os.fstat(f.fileno()).st_size):
print("{0:6.2f}%".format(
100.0 * float(f.tell()) / float(os.fstat(f.fileno()).st_size)))
prev = f.tell()
# HACK: had to modify DatFileReader's _read_header() to seek to the
# given offset.
reader = DataFileReader(f, DatumReader(), prev)
try:
for row in reader:
writer.append(row)
prev = f.tell()
# once we have finished looping, we are done with the file.
break
except Exception:
writer.close()
writer = DataFileWriter(open(outfile, "ab+"), DatumWriter())
# need to skip 16 byte sync
f.seek(prev+16)
continue
reader.close()
writer.close()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
description="Fix avro file that has its schema defined multiple times.")
parser.add_argument("-s", "--schema", help="JSON formatted Avro schema",
required=True)
parser.add_argument("-i", "--input", help="Input Avro file to read",
required=True)
parser.add_argument("-o", "--output", help="Output Avro file to write",
required=True)
args = parser.parse_args()
fix_file(args.schema, args.input, args.output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment