Last active
October 10, 2018 11:12
-
-
Save Nuttymoon/d8f44d1fbb61d7218f75bf3ebf024aca to your computer and use it in GitHub Desktop.
Python scripts to manipulate Avro files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
from avro.datafile import DataFileReader | |
from avro.io import DatumReader | |
parser = argparse.ArgumentParser( | |
description='Equivalent of cat command for avro files.') | |
parser.add_argument('avropath', type=str, nargs=1, | |
help='the avro file to cat') | |
args = parser.parse_args() | |
def cat(): | |
avro_reader = DataFileReader(open(args.avropath[0], 'rb'), DatumReader()) | |
print(avro_reader.meta) | |
for row in avro_reader: | |
print(row) | |
avro_reader.close() | |
if __name__ == '__main__': | |
cat() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import json | |
import argparse | |
import avro.schema as sc | |
from datetime import datetime | |
from avro.datafile import DataFileWriter | |
from avro.io import DatumWriter | |
AVRO_TIME_FORMATS = [ | |
'date', | |
'time-millis', | |
'time-micros', | |
'timestamp-millis', | |
'timestamp-micros' | |
] | |
def to_avro_time(python_time, avro_format): | |
timestamp = int(python_time.timestamp()) | |
date = int(timestamp / 3600 / 24) | |
time_millis = timestamp % 86400 | |
time_formats = { | |
'date': date, | |
'time-millis': time_millis, | |
'time-micros': time_millis * 1000, | |
'timestamp-millis': timestamp * 1000, | |
'timestamp-micros': timestamp * 1000000 | |
} | |
return time_formats.get(avro_format, False) | |
parser = argparse.ArgumentParser(description='Convert a csv file to avro.') | |
parser.add_argument('csvpath', type=str, nargs=1, | |
help='The csv file to convert') | |
parser.add_argument('-d', '--delimiter', default=',', type=str, | |
help='The delimiter used in the csv') | |
parser.add_argument('-q', '--quotechar', default='|', type=str, | |
help='The quotechar used in the csv') | |
parser.add_argument('-a', '--avropath', required=True, | |
type=str, help='the destination file path') | |
parser.add_argument('-s', '--schema', required=True, | |
type=str, help='The schema file path') | |
parser.add_argument('-n', '--nullvalue', type=str, default='N/A', | |
help='Value meaning NULL in the dataset') | |
parser.add_argument('--header', type=bool, default=False, const=True, | |
nargs='?', | |
help='Add this option if the cv file contains a header') | |
parser.add_argument('-t', '--timeformat', type=str, | |
default='%d/%m/%Y %H:%M:%S', | |
help='The date format used in the csv ' | |
'(using strftime format http://strftime.org/). ' | |
'A column will be converted only if the Avro logicalType ' | |
'is a time format.') | |
parser.add_argument('--hivetime', nargs='+', type=int, | |
help='The indexes of the columns that have to be ' | |
'converted to Hive Timestamp format. It uses --timeformat ' | |
'to parse times.') | |
args = parser.parse_args() | |
def convert(): | |
avro_schema = sc.Parse(open(args.schema).read()) | |
avro_writer = DataFileWriter(open(args.avropath, 'wb+'), | |
DatumWriter(), avro_schema) | |
with open(args.schema, 'r') as sch: | |
json_schema = json.load(sch) | |
raw_csv = [] | |
with open(args.csvpath[0], 'r') as csv_file: | |
reader = csv.reader(csv_file, delimiter=args.delimiter, | |
quotechar=args.quotechar) | |
for row in reader: | |
raw_csv.append(row[:-1]) | |
if args.header: | |
raw_csv = raw_csv[1:] | |
for row in raw_csv: | |
avro_row = {} | |
for i, field in enumerate(json_schema['fields']): | |
value = row[i] if row[i] != args.nullvalue else None | |
if value and i+1 in args.hivetime: | |
value = datetime.strptime( | |
value, args.timeformat).isoformat(' ') | |
elif value and isinstance(field['type'], dict): | |
if field['type'].get('logicalType'): | |
if field['type']['logicalType'] in AVRO_TIME_FORMATS: | |
value = to_avro_time(datetime.strptime( | |
value, | |
args.timeformat), | |
field['type']['logicalType']) | |
avro_row.update({ | |
field['name']: value | |
}) | |
avro_writer.append(avro_row) | |
avro_writer.close() | |
if __name__ == '__main__': | |
convert() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment