Skip to content

Instantly share code, notes, and snippets.

@idiomer
Last active June 18, 2024 08:21
Show Gist options
  • Save idiomer/ef70cd0ac6bf0326599db2286f41a974 to your computer and use it in GitHub Desktop.
Save idiomer/ef70cd0ac6bf0326599db2286f41a974 to your computer and use it in GitHub Desktop.
Using PySpark to handle HDFS, such as list (ls), rename (mv), delete (rm)
'''
The path is a directory by default
'''
def hdfs_list(path, subtract_one=True):
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
list_status = fs.listStatus(spark._jvm.org.apache.hadoop.fs.Path(path))
# file.getPath().getName(), file.getBlockSize(), file.getLen()
files_size = [file.getLen() for file in list_status]
totol_size_in_MB = sum(files_size) / 1024.0 / 1024.0
total_num_files = len(files_size) - 1 if subtract_one else len(files_size) # dont count _SUCCESS file
return totol_size_in_MB, total_num_files
def hdfs_rename(old_path, new_path):
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
fs.rename(
spark._jvm.org.apache.hadoop.fs.Path(old_path),
spark._jvm.org.apache.hadoop.fs.Path(new_path)
)
return True
def hdfs_delete(path):
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
fs.delete(spark._jvm.org.apache.hadoop.fs.Path(path), True)
return True
def smart_overwrite(path, fmt='parquet', compression='gzip', block_size_in_MB=128, min_max_scale=8.0):
# 只支持parquet
totol_size_in_MB, total_num_files = hdfs_list(path)
avg_file_size_in_MB = totol_size_in_MB / total_num_files
min_file_size, max_file_size = min_max_scale / min_max_scale, block_size_in_MB * min_max_scale
import math
n_files = int(math.ceil(totol_size_in_MB / (block_size_in_MB * 0.9)))
bak_path = path.rstrip('/') + '.bak'
if min_file_size <= avg_file_size_in_MB <= max_file_size:
print("INFO: file size is normal. Don't overwrite")
return False
elif avg_file_size_in_MB < min_file_size:
if total_num_files <= 10:
print("INFO: file size is too small, but number of files <= 10. Don't overwrite")
return False
else:
print("WARN: file size is too small, will read data, coalesce and overwrite")
spark.read.load(path).coalesce(max(10, n_files)).write.parquet(
bak_path, compression=compression, mode='overwrite'
)
hdfs_delete(path)
hdfs_rename(bak_path, path)
return True
else:
if total_num_files >= 10000:
print("INFO: file size is too large, but number of files >= 10000. Don't overwrite")
return False
else:
print("WARN: file size is too large, will read data, repartition and overwrite")
spark.read.load(path).repartition(min(10000, n_files)).write.parquet(
bak_path, compression=compression, mode='overwrite'
)
hdfs_delete(path)
hdfs_rename(bak_path, path)
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment