You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
importyamlfrompyspark.dbutilsimportDBUtilsimportpyspark.sql.functionsasFimporttimedefget_dir_content(path):
""" get_dir_content: For a folder in the data lake, get the list of files it contains, including all subfolders. Return Full File Name as well as Last Modified Date time as a generator object. Output requires conversion into list for consumption. """#This for loop will check all directories and files inside the provided path#For each file it contains, return a 2-D array with the file name and the last modified date time#The consuming code will need to convert the generator object this returns to a list to consume it#The yield function is used to ensure the entire directory contents is scanned. If you used return it would stop after the first object encountered. fordir_pathindbutils.fs.ls(path):
ifdir_path.isFile():
#os.stat gets statistics on a path. st_mtime gets the most recent content modification date timeyield [dir_path.path, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(dir_path.modificationTime/1000))]
elifdir_path.isDir() andpPath!=dir_path.path:
#if the path is a directory, call the function on it again to check its contentsyieldfromget_dir_content(dir_path.path)
defget_latest_modified_file_from_directory(path):
""" get_latest_modified_file_from_directory: For a given path to a directory in the data lake, return the file that was last modified. Uses the get_dir_content function as well. Input path format expectation: '/mnt/datalake_rawdata' You can add sub directories as well, as long as you use a registered mount point Performance: With 588 files, it returns in less than 10 seconds on the lowest cluster size. """#Call get_dir_content to get a list of all files in this directory and the last modified date time of eachpath_list=list(get_dir_content(path))
print(path_list)
#Convert the list returned from get_dir_content into a dataframe so we can manipulate the data easily. Provide it with column headings. #You can alternatively sort the list by LastModifiedDateTime and get the top record as well. df=spark.createDataFrame(path_list ,['filePath', 'lastModifiedDateTime'])
df.display()
#Get the latest modified date time scalar valuemaxLatestModifiedDateTime=df.agg({"lastModifiedDateTime": "max"}).collect()[0][0]
print(maxLatestModifiedDateTime)
#Filter the data frame to the record with the latest modified date time value retrieveddf_filtered=df.filter(df.lastModifiedDateTime==maxLatestModifiedDateTime)
df_filtered.display()
#return the file name that was last modifed in the given directoryreturndf_filtered.first()['filePath']
Load file from tdtrans storage account
env="prodfix"workspace_type="t"short_code="dtrans"sa=f"{env}dseus2{workspace_type}{short_code}sa01"container='ds-tdtrans-landing'path=f"abfss://{container}@{sa}.dfs.core.windows.net/"file_path=get_latest_modified_file_from_directory(path)
print(file_path)
# Load spi into dataframedf=spark.read.format("csv").load(file_path, inferSchema=True, header=True)
# Adopt business user formatdf=df.withColumnRenamed('EID', 'consumer_src_mid').select(F.col('consumer_src_mid'))