Skip to content

Instantly share code, notes, and snippets.

@Nuttymoon
Last active October 10, 2018 11:12
Show Gist options
  • Save Nuttymoon/d8f44d1fbb61d7218f75bf3ebf024aca to your computer and use it in GitHub Desktop.
Save Nuttymoon/d8f44d1fbb61d7218f75bf3ebf024aca to your computer and use it in GitHub Desktop.
Python scripts to manipulate Avro files
import argparse
from avro.datafile import DataFileReader
from avro.io import DatumReader
parser = argparse.ArgumentParser(
description='Equivalent of cat command for avro files.')
parser.add_argument('avropath', type=str, nargs=1,
help='the avro file to cat')
args = parser.parse_args()
def cat():
avro_reader = DataFileReader(open(args.avropath[0], 'rb'), DatumReader())
print(avro_reader.meta)
for row in avro_reader:
print(row)
avro_reader.close()
if __name__ == '__main__':
cat()
import csv
import json
import argparse
import avro.schema as sc
from datetime import datetime
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
AVRO_TIME_FORMATS = [
'date',
'time-millis',
'time-micros',
'timestamp-millis',
'timestamp-micros'
]
def to_avro_time(python_time, avro_format):
timestamp = int(python_time.timestamp())
date = int(timestamp / 3600 / 24)
time_millis = timestamp % 86400
time_formats = {
'date': date,
'time-millis': time_millis,
'time-micros': time_millis * 1000,
'timestamp-millis': timestamp * 1000,
'timestamp-micros': timestamp * 1000000
}
return time_formats.get(avro_format, False)
parser = argparse.ArgumentParser(description='Convert a csv file to avro.')
parser.add_argument('csvpath', type=str, nargs=1,
help='The csv file to convert')
parser.add_argument('-d', '--delimiter', default=',', type=str,
help='The delimiter used in the csv')
parser.add_argument('-q', '--quotechar', default='|', type=str,
help='The quotechar used in the csv')
parser.add_argument('-a', '--avropath', required=True,
type=str, help='the destination file path')
parser.add_argument('-s', '--schema', required=True,
type=str, help='The schema file path')
parser.add_argument('-n', '--nullvalue', type=str, default='N/A',
help='Value meaning NULL in the dataset')
parser.add_argument('--header', type=bool, default=False, const=True,
nargs='?',
help='Add this option if the cv file contains a header')
parser.add_argument('-t', '--timeformat', type=str,
default='%d/%m/%Y %H:%M:%S',
help='The date format used in the csv '
'(using strftime format http://strftime.org/). '
'A column will be converted only if the Avro logicalType '
'is a time format.')
parser.add_argument('--hivetime', nargs='+', type=int,
help='The indexes of the columns that have to be '
'converted to Hive Timestamp format. It uses --timeformat '
'to parse times.')
args = parser.parse_args()
def convert():
avro_schema = sc.Parse(open(args.schema).read())
avro_writer = DataFileWriter(open(args.avropath, 'wb+'),
DatumWriter(), avro_schema)
with open(args.schema, 'r') as sch:
json_schema = json.load(sch)
raw_csv = []
with open(args.csvpath[0], 'r') as csv_file:
reader = csv.reader(csv_file, delimiter=args.delimiter,
quotechar=args.quotechar)
for row in reader:
raw_csv.append(row[:-1])
if args.header:
raw_csv = raw_csv[1:]
for row in raw_csv:
avro_row = {}
for i, field in enumerate(json_schema['fields']):
value = row[i] if row[i] != args.nullvalue else None
if value and i+1 in args.hivetime:
value = datetime.strptime(
value, args.timeformat).isoformat(' ')
elif value and isinstance(field['type'], dict):
if field['type'].get('logicalType'):
if field['type']['logicalType'] in AVRO_TIME_FORMATS:
value = to_avro_time(datetime.strptime(
value,
args.timeformat),
field['type']['logicalType'])
avro_row.update({
field['name']: value
})
avro_writer.append(avro_row)
avro_writer.close()
if __name__ == '__main__':
convert()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment