Skip to content

Instantly share code, notes, and snippets.

@dvu4
Last active April 18, 2023 15:34
Show Gist options
  • Save dvu4/c7252b9cdec86170d9ca544171216232 to your computer and use it in GitHub Desktop.
Save dvu4/c7252b9cdec86170d9ca544171216232 to your computer and use it in GitHub Desktop.
import yaml
from pyspark.dbutils import DBUtils
import pyspark.sql.functions as F
import time



def get_dir_content(path):
    """
    get_dir_content:
        For a folder in the data lake, get the list of files it contains, including all subfolders. 
        Return Full File Name as well as Last Modified Date time as a generator object. 
        Output requires conversion into list for consumption.
    """
    #This for loop will check all directories and files inside the provided path
    #For each file it contains, return a 2-D array with the file name and the last modified date time
    #The consuming code will need to convert the generator object this returns to a list to consume it
    #The yield function is used to ensure the entire directory contents is scanned. If you used return it would stop after the first object encountered. 
    for dir_path in dbutils.fs.ls(path):
        if dir_path.isFile():
            #os.stat gets statistics on a path. st_mtime gets the most recent content modification date time
            yield [dir_path.path, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(dir_path.modificationTime/ 1000))]
        elif dir_path.isDir() and pPath != dir_path.path:
            #if the path is a directory, call the function on it again to check its contents
            yield from get_dir_content(dir_path.path)


def get_latest_modified_file_from_directory(path):
    """
    get_latest_modified_file_from_directory:
        For a given path to a directory in the data lake, return the file that was last modified. 
        Uses the get_dir_content function as well.
        Input path format expectation: '/mnt/datalake_rawdata'
            You can add sub directories as well, as long as you use a registered mount point
        Performance: With 588 files, it returns in less than 10 seconds on the lowest cluster size. 
    """
    #Call get_dir_content to get a list of all files in this directory and the last modified date time of each
    path_list = list(get_dir_content(path))
    print(path_list)

    #Convert the list returned from get_dir_content into a dataframe so we can manipulate the data easily. Provide it with column headings. 
    #You can alternatively sort the list by LastModifiedDateTime and get the top record as well. 
    df = spark.createDataFrame(path_list ,['filePath', 'lastModifiedDateTime'])
    df.display()

    #Get the latest modified date time scalar value
    maxLatestModifiedDateTime = df.agg({"lastModifiedDateTime": "max"}).collect()[0][0]
    print(maxLatestModifiedDateTime)

    #Filter the data frame to the record with the latest modified date time value retrieved
    df_filtered = df.filter(df.lastModifiedDateTime == maxLatestModifiedDateTime)
    df_filtered.display()
    
    #return the file name that was last modifed in the given directory
    return df_filtered.first()['filePath']

Load file from tdtrans storage account

env = "prodfix"
workspace_type = "t"
short_code = "dtrans"


sa = f"{env}dseus2{workspace_type}{short_code}sa01"
container = 'ds-tdtrans-landing'

path = f"abfss://{container}@{sa}.dfs.core.windows.net/"
file_path = get_latest_modified_file_from_directory(path)
print(file_path)

# Load spi into dataframe
df = spark.read.format("csv").load(file_path, inferSchema = True, header = True)

# Adopt business user format
df = df.withColumnRenamed('EID', 'consumer_src_mid').select(F.col('consumer_src_mid'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment