This is a gist of collection of simple spark tests for Jupyter and spark-submit.
Last active
March 5, 2019 22:34
-
-
Save SunnyBingoMe/176863bd79e8627f56c9432734277125 to your computer and use it in GitHub Desktop.
Simple Spark Tests: simple demos to test if spark is installed well and working
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Demo 1: Init Hello-world to Check Spark in Parallel | |
# bash: | |
pyspark | |
# Note 1: usually is the one in /usr/lib/bin/pyspark | |
# Note 2: should show python version and spark version | |
# py: | |
sc.defaultParallelism # check is using enough parallel CPUs | |
pF = sc.textFile('/usr/lib/spark/README.md') | |
pF.count() | |
pF.first() | |
sc.stop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Demo 2: Check one Function in Parallel | |
## usage: wget goo.gl/kRQhTE -O demo2.py && spark-submit demo2.py | |
## this script can take one argument which is the spark master host/ip, otherwise will use localhost. | |
import sys | |
import socket | |
import linecache | |
from pyspark import SparkContext, SparkConf | |
if(len(sys.argv) > 1): | |
master_host = sys.argv[1] | |
else: | |
master_host = 'localhost' | |
distributed_input_file_path = '/etc/passwd' # or /usr/lib/spark/README.md which is usually > 100 lines | |
local_output_file_name = 'all_result_listing_demo2.txt' # current folder | |
appName = 'demo2' | |
master = 'spark://' + master_host + ':7077' | |
conf = SparkConf().setAppName(appName).setMaster(master) | |
sc = SparkContext(conf=conf) | |
def get_one_node_one_line(line_index): | |
return_str = 'Line: ' | |
return_str += str(line_index) | |
return_str += ', Host: ' | |
return_str += socket.gethostname() | |
return_str += ', Content: ' | |
return_str += linecache.getline(distributed_input_file_path, line_index) # TO-IMPROVE: sub-str for lone lines. | |
return_str += '\n' | |
return(return_str) | |
line_nr = sum(1 for line in open(distributed_input_file_path)) | |
main_data = list(range(line_nr)) | |
partition_nr = min(int(3.5 * sc.defaultParallelism), 1000, len(main_data)) | |
my_rdd = sc.parallelize(main_data, partition_nr) | |
map_result = my_rdd.map(get_one_node_one_line) | |
all_result_listing = map_result.collect() | |
print(all_result_listing) | |
with open(local_output_file_name, 'w') as f: | |
for item in all_result_listing: | |
f.write("%s\n" % item) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Demo 3: Check Spark Tasks Submitted to k8s (compatible with demo2) | |
## usage 1 (in console): wget <this file> -O demo3.py && \ | |
## spark-submit <other spark-submit-options> demo3.py [spark://...:7077 or k8s://https://...:443, default is spark://localhost:7077] | |
## usage 2 (in spark-jupyter): copy and paste all and run cell (spark session or context already available). | |
## usage 3 (in pure-jupyter): copy and paste all, modify master host/ip and run cell. | |
## make sure k8s server is >= 1.6 (by: kubectl version) | |
## make sure spark >= 2.1 (by: pyspark --version) (tested on ...) | |
## this script can take one argument which is the spark or k8s master / api server, otherwise will use localhost. | |
import sys | |
import socket | |
import linecache | |
from pyspark import SparkContext, SparkConf | |
from pyspark.sql import SparkSession | |
app_name = 'demo3' | |
distributed_input_file_path = '/etc/passwd' # /usr/lib/spark/README.md usually > 100 lines | |
local_output_file_name = 'all_result_listing_demo2.txt' # current folder | |
use_k8s = True # else, use spark w/o k8s. | |
############################### start auto-config ############################### | |
if(len(sys.argv) > 1): | |
master = sys.argv[1] | |
else: | |
master = 'spark://localhost:7077' | |
defined_spark = False | |
try: | |
spark | |
except NameError: | |
print('spark is NOT defined, will fall back to sc') | |
spark = SparkSession.builder \ | |
.master(master) \ | |
.appName(app_name) \ | |
.config("spark.some.config.option", "some-value") \ | |
.getOrCreate() | |
defined_spark = True | |
else: | |
print('ok, spark is defined') | |
defined_spark = True | |
try: | |
sc | |
except NameError: | |
print('sc is NOT defined, defining...') | |
if defined_spark: | |
sc = spark.sparkContext | |
else: | |
conf = SparkConf().setAppName(app_name).setMaster(master) | |
sc = SparkContext(conf=conf) | |
print('ok, sc is newly defined.') | |
else: | |
print('ok, sc is defined.') | |
############################### end auto-config ############################### | |
print("Current python interpreter host:") | |
socket.gethostname() | |
print("\n\n################# distributed processing of /etc/passwd file: #################n\n") | |
def get_one_node_one_line(line_index): | |
return_str = 'Line: ' | |
return_str += str(line_index) | |
return_str += ', Host: ' | |
return_str += socket.gethostname() | |
return_str += ', Content: ' | |
return_str += linecache.getline(distributed_input_file_path, line_index) # TO-IMPROVE: sub-str for lone lines. | |
return_str += '\n' | |
return(return_str) | |
line_nr = sum(1 for line in open(distributed_input_file_path)) | |
main_data = list(range(line_nr)) | |
partition_nr = min(int(3.5 * sc.defaultParallelism), 1000, len(main_data)) | |
my_rdd = sc.parallelize(main_data, partition_nr) | |
map_result = my_rdd.map(get_one_node_one_line) | |
all_result_listing = map_result.collect() | |
print(all_result_listing) | |
with open(local_output_file_name, 'w') as f: | |
for item in all_result_listing: | |
f.write("%s\n" % item) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment