Skip to content

Instantly share code, notes, and snippets.

@apeletz512
Created March 12, 2017 21:41
Show Gist options
  • Save apeletz512/0addbfd698f355828a370a71a7218b9b to your computer and use it in GitHub Desktop.
Save apeletz512/0addbfd698f355828a370a71a7218b9b to your computer and use it in GitHub Desktop.
Generate Hive DDL string from pyspark.sql.DataFrame.schema object
def build_hive_ddl(
table_name, object_schema, location, file_format, partition_schema=None, verbose=False):
"""
:param table_name: the name of the table you want to register in the Hive metastore
:param object_schema: an instance of pyspark.sql.Dataframe.schema
:param location: the storage location for this data (and S3 or HDFS filepath)
:param file_format: a string compatible with the 'STORED AS <format>' Hive DDL syntax
:param partition_schema: an optional instance of pyspark.sql.Dataframe.schema that stores the
columns that are used for partitioning on disk
:param verbose:
:return: None
"""
columns = (','.join([field.simpleString() for field in object_schema])).replace(':', ' ')
ddl = 'CREATE EXTERNAL TABLE '+table_name+' ('\
+ columns + ')'\
+ (
' PARTITIONED BY ('
+ (','.join([field.simpleString() for field in partition_schema])).replace(':', ' ')
+ ')'
if partition_schema else ''
)\
+ ' STORED AS '+file_format\
+ ' LOCATION "'+location+'"'
if verbose:
print('Generated Hive DDL:\n'+ddl)
return ddl
@ericbellet
Copy link

ericbellet commented Jan 5, 2023

def build_hive_ddl(
        table_name, df, location, file_format, partition_col=None, verbose=False):
    """
    :param table_name: the name of the table you want to register in the Hive metastore
    :param df: the dataframe
    :param location: the storage location for this data (and S3 or HDFS filepath)
    :param file_format: a string compatible with the 'STORED AS <format>' Hive DDL syntax
    :param partition_col: columns to partition
    :param verbose:
    :return: None
    """
    df_tmp = df.select([f.col(c).alias(c + '_column') for c in df.columns])
    object_schema = df_tmp.schema
    df_columns = df_tmp.columns

    columns = (','.join([field.simpleString() for field in object_schema]))
    for col in df_columns:
        columns = columns.replace(f"{col}:", f"{col} ")

    ddl = 'CREATE EXTERNAL TABLE ' + table_name + ' (' \
          + columns + ')' \
          + ' USING ' + file_format \
          + (
              ' PARTITIONED BY (' + partition_col + ')'
              if partition_col else ''
          ) \
          + ' LOCATION "' + location + '"'
    if verbose:
        print('Generated Hive DDL:\n' + ddl)
    return ddl.replace("_column", "")

@sunmoon4ever
Copy link

Is it possible to segregate dataframe columns into partitioned and non-partitioned? object_schema and partioned_schema from the above example.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment