Skip to content

Instantly share code, notes, and snippets.

@vepetkov
Last active November 12, 2020 20:45
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save vepetkov/b6985f47f7ea201db9859e18b89fedf7 to your computer and use it in GitHub Desktop.
Save vepetkov/b6985f47f7ea201db9859e18b89fedf7 to your computer and use it in GitHub Desktop.
Convert a CSV to Hive DDL + AVRO Schema (with type inference)
#!/usr/bin/python
import pandas
import sys
import argparse
import string
import subprocess
import json
import textwrap
import re
debug = True
def debug(str):
if debug:
print(str)
def error(str):
print(str)
def convNumpy2Avro(dtype):
dataTypes = {
'object': 'string',
'int': 'boolean',
'int': 'int',
'int64': 'long',
'float': 'float',
'float64': 'double'
}
return dataTypes[dtype] if dtype in dataTypes else 'string'
def convNumpy2Hive(dtype):
dataTypes = {
'object': 'STRING',
'int': 'BOOLEAN',
'int': 'INT',
'int64': 'BIGINT',
'float': 'FLOAT',
'float64': 'DOUBLE'
}
return dataTypes[dtype] if dtype in dataTypes else 'STRING'
def genAvroSchema(name, colMetadata, ns="com.example.test.avro"):
fields = [{"name": x[0], "type": convNumpy2Avro(x[1])} for x in colMetadata]
schemaJson = {
"namespace": ns,
"type": "record",
"name": name,
"fields": fields
}
schema = json.dumps(schemaJson, indent=2)
debug(schema)
return schema
def genCsvDDL(db, table, colMetadata, sep):
pattern = re.compile(r"[.:\s]")
cols = ",\n ".join(['`%s` %s COMMENT \'\'' % (pattern.sub("_", x[0]), convNumpy2Hive(x[1]))
for x in colMetadata])
ddl = f"""
CREATE EXTERNAL TABLE {db}.{table}_csv
( {cols} )
COMMENT 'This table is ...'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '{sep}'
STORED AS TEXTFILE
LOCATION '{args.hdfsCSVDir}';
"""
ddl = textwrap.dedent(ddl).strip()
debug(ddl)
return ddl
def genOrcDDL(db, table, colMetadata, sep):
pattern = re.compile(r"[.:\s]")
cols = ",\n ".join(['`%s` %s COMMENT \'\'' % (pattern.sub("_", x[0]), convNumpy2Hive(x[1]))
for x in colMetadata])
ddl = f"""
CREATE TABLE {db}.{table}
( {cols} )
COMMENT 'This table is ...'
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '{sep}'
STORED AS ORC
LOCATION '{args.hdfsTableDir}'
TBLPROPERTIES ("orc.compress"="SNAPPY");
"""
ddl = textwrap.dedent(ddl).strip()
debug(ddl)
return ddl
def genCsv2AvroMergeDDL(db, table, colMetadata):
# TODO
cols = ",\n ".join(['`%s %s' % (x[0], convNumpy2Hive(x[1]))
for x in colMetadata])
sel_join_cols = ", \n".join(['gt.%s' % x[0] for x in colMetadata])
sel_cols = ", \n".join(['%s' % x[0] for x in colMetadata])
joincond = " and ".join(['st.%s = gt.%s ' % (x, x)
for x in args.primarykeys.split(',')])
filter = " and ".join([' st.%s is null ' %
x for x in args.primarykeys.split(',')])
ddl = f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {db}.{table}
( {cols} )
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '{args.hdfsTableDir}'
TBLPROPERTIES ('avro.schema.url'='{args.hdfsSchemaPath}');
INSERT OVERWRITE table {db}.{table}
SELECT * FROM (
SELECT {sel_join_cols}
FROM {db}.{table} gt
LEFT OUTER JOIN {db}.{table}_csv st
ON {joincond}
WHERE {filter}
UNION ALL
SELECT {sel_cols}
FROM {db}.{table}_csv) T;
USE {db};
ALTER TABLE {table} SET LOCATION '{args.hdfsTableDir}';
"""
ddl = textwrap.dedent(ddl).strip()
debug(ddl)
ddl2 = """
DROP TABLE staging.%s_merged;
CREATE external TABLE staging.%s_merged
(%s)
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
LOCATION '%s'
TBLPROPERTIES (
'avro.schema.url'='%s');
insert overwrite table staging.%s_merged
SELECT * FROM (
SELECT %s
FROM %s.%s gt
LEFT OUTER JOIN staging.%s_csv st
ON %s
WHERE %s
UNION ALL
SELECT %s
FROM staging.%s_csv) T;
USE %s;
ALTER TABLE %s SET LOCATION '%s';
DROP TABLE staging.%s_merged;
""" % (args.table, args.table, cols, args.hdfsTableDir, args.hdfsSchemaPath,
args.table, sel_join_cols, args.dbname, args.table, args.table, joincond, filter, sel_cols, args.table,
args.dbname, args.table, args.hdfsTableDir, args.table)
debug(ddl2)
return ddl
def streamStdOut2Hdfs(data, hdfsPath):
subprocess.call(['hdfs', 'dfs', '-rm', hdfsPath], shell=False)
p = subprocess.Popen(
['hdfs', 'dfs', '-put', '-', hdfsPath], stdin=subprocess.PIPE)
p.communicate(input=data)
if (p.returncode != 0):
error(f"Failed streaming to HDFS: {hdfsPath}")
exit(1)
def uploadHdfs(localPath, hdfsPath):
subprocess.call(['hdfs', 'dfs', '-rm', hdfsPath], shell=False)
p = subprocess.Popen(['hdfs', 'dfs', '-put', localPath,
hdfsPath], stdin=subprocess.PIPE)
p.wait(timeout=600) # Wait for 10min a
if (p.returncode != 0):
error(f"Failed uploading file to HDFS: {localPath} --> {hdfsPath}")
exit(1)
############################################
# Parse the CMD Args
parser = argparse.ArgumentParser(
fromfile_prefix_chars='@', description='Generate an Avro schema and Hive DDLs from a CSV')
parser.add_argument('--filename', help='Path to CSV file', required=True)
parser.add_argument(
'--delimiter', help='delimiter used in the CSV file', default=',')
parser.add_argument(
'--table', help='Base name of Hive table', default='sample')
parser.add_argument(
'--db', help='Database where Hive tables will be located', default='default')
parser.add_argument(
'--primarykeys', help='Comma-separated list of columns of the primary key', default='id')
parser.add_argument(
'--csvDDL', help='Path to output with DDL for external CSV table', default='csv_tbl.hql')
parser.add_argument(
'--avroDDL', help='Path to output with DDL for Avro table', default='avro_tbl.hql')
parser.add_argument('--hdfsSchemaPath', help='Path in HDFS to output avro schema file',
default='~/hive/staging/schema.avsc')
parser.add_argument(
'--hdfsCSVDir', help='Location of CSV file in HDFS for loading to Hive', default='~/hive/staging/sample')
parser.add_argument(
'--hdfsTableDir', help='Location of real table file in HDFS', default='~/hive/dw/sample')
args = parser.parse_args()
# Data samples
args = parser.parse_args([
'--filename', 'sample_data.csv',
'--delimiter', ',',
'--db', '',
'--table', 'sample_data',
'--primarykeys', 'ItemID',
'--hdfsCSVDir', 'hdfs://default/user/user1/samples/csv/',
'--hdfsTableDir', 'hdfs://default/user/user1/samples/hive'])
# Read the data
# Using the standard csv parser (no type inference)
# f = open(args.filename,'r')
# data = f.readlines()
# debug(data[0:5])
# csv.field_size_limit(sys.maxsize)
# reader = csv.DictReader(data)
# debug(reader.fieldnames)
# colNames = [x.replace('/','_') for x in reader.fieldnames]
# colMetadata = [(x, 'string') for x in colNames]
# Using pandas (basic type inference)
df = pandas.read_csv(args.filename, sep=args.delimiter, nrows=1000)
colNames = list(df.columns)
colTypes = [x.name for x in df.dtypes]
colMetadata = list(zip(colNames, colTypes))
debug(colMetadata)
# Generate an Avro schema
#schema = genAvroSchema(args.table, colMetadata)
#streamStdOut2Hdfs(schema, args.hdfsSchemaPath)
# Generate the DDL for the CSV table
ddl = genCsvDDL(args.db, args.table, colMetadata, args.delimiter)
# Generate the DDL for the final table in avro format
#genCsv2AvroMergeDDL(args.db, args.table, colMetadata)
print(textwrap.dedent(ddl).strip())
numpy==1.14.3
pandas==0.22.0
python-dateutil==2.7.2
pytz==2018.4
six==1.11.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment