Last active
January 13, 2021 08:50
-
-
Save dharma6872/fa0b93f6e48d0edc1d639a8758aac599 to your computer and use it in GitHub Desktop.
[mapPartitions RDD transformation] mapPartitions RDD transformation #pyspark #pyspark101
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
# mapPartition 을 사용할 경우 generator 를 생성해서 처리해야 되는 것으로 보임 | |
def process_partition(partition): | |
yield sum(partition) | |
def process_partition_y_sum(partition): | |
number_sum = 0 | |
for element in partition: | |
number_sum = number_sum + element | |
yield number_sum | |
def process_partition_y_size(partition): | |
element_list = [] | |
for element in partition: | |
element_list.append(len(element)) | |
yield element_list | |
if __name__ == "__main__": | |
print("PySpark 101 Tutorial") | |
print("Part 6 - How to use mapPartitions RDD transformation in PySpark") | |
spark = SparkSession \ | |
.builder \ | |
.appName("Part 6 - How to use mapPartitions RDD transformation in PySpark") \ | |
.master("local[*]") \ | |
.enableHiveSupport() \ | |
.getOrCreate() | |
py_number_list = [1,2,1,1,2,1,1,2,1] | |
print("Printing Python Number List: ") | |
print(py_number_list) | |
print("Creating First RDD from Python Number List") | |
number_rdd = spark.sparkContext.parallelize(py_number_list, 3) | |
print("Get Partition Count: ") | |
print(number_rdd.getNumPartitions()) | |
# mapPartitions() 은 partition 별로 연산을 수행하는것으로 보임 | |
number_processed_rdd = number_rdd.mapPartitions(process_partition) | |
print(number_processed_rdd.collect()) | |
input_file_path = "file:///g:/WS/data/pyspark101/input" | |
tech_overview_rdd = spark.sparkContext.textFile(input_file_path) | |
print("Get Partition Count: ") | |
print(tech_overview_rdd.getNumPartitions()) | |
print("Printing tech_overview_rdd(collect()): ") | |
print(tech_overview_rdd.collect()) | |
# 각 파티션별로 뭉쳐진 데이터를 반환한다. | |
tech_overview_rdd = tech_overview_rdd.mapPartitions(process_partition_y_size) | |
tech_overview_list = tech_overview_rdd.collect() | |
print("Printing tech_overview_list: ") | |
for element in tech_overview_list: | |
print(element) | |
print("Stopping the SparkSession object") | |
spark.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment