Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save dharma6872/fa0b93f6e48d0edc1d639a8758aac599 to your computer and use it in GitHub Desktop.
Save dharma6872/fa0b93f6e48d0edc1d639a8758aac599 to your computer and use it in GitHub Desktop.
[mapPartitions RDD transformation] mapPartitions RDD transformation #pyspark #pyspark101
from pyspark.sql import SparkSession
# mapPartition 을 사용할 경우 generator 를 생성해서 처리해야 되는 것으로 보임
def process_partition(partition):
yield sum(partition)
def process_partition_y_sum(partition):
number_sum = 0
for element in partition:
number_sum = number_sum + element
yield number_sum
def process_partition_y_size(partition):
element_list = []
for element in partition:
element_list.append(len(element))
yield element_list
if __name__ == "__main__":
print("PySpark 101 Tutorial")
print("Part 6 - How to use mapPartitions RDD transformation in PySpark")
spark = SparkSession \
.builder \
.appName("Part 6 - How to use mapPartitions RDD transformation in PySpark") \
.master("local[*]") \
.enableHiveSupport() \
.getOrCreate()
py_number_list = [1,2,1,1,2,1,1,2,1]
print("Printing Python Number List: ")
print(py_number_list)
print("Creating First RDD from Python Number List")
number_rdd = spark.sparkContext.parallelize(py_number_list, 3)
print("Get Partition Count: ")
print(number_rdd.getNumPartitions())
# mapPartitions() 은 partition 별로 연산을 수행하는것으로 보임
number_processed_rdd = number_rdd.mapPartitions(process_partition)
print(number_processed_rdd.collect())
input_file_path = "file:///g:/WS/data/pyspark101/input"
tech_overview_rdd = spark.sparkContext.textFile(input_file_path)
print("Get Partition Count: ")
print(tech_overview_rdd.getNumPartitions())
print("Printing tech_overview_rdd(collect()): ")
print(tech_overview_rdd.collect())
# 각 파티션별로 뭉쳐진 데이터를 반환한다.
tech_overview_rdd = tech_overview_rdd.mapPartitions(process_partition_y_size)
tech_overview_list = tech_overview_rdd.collect()
print("Printing tech_overview_list: ")
for element in tech_overview_list:
print(element)
print("Stopping the SparkSession object")
spark.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment