Skip to content

Instantly share code, notes, and snippets.

@SunnyBingoMe
Last active March 5, 2019 22:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SunnyBingoMe/176863bd79e8627f56c9432734277125 to your computer and use it in GitHub Desktop.
Save SunnyBingoMe/176863bd79e8627f56c9432734277125 to your computer and use it in GitHub Desktop.
Simple Spark Tests: simple demos to test if spark is installed well and working
# Demo 1: Init Hello-world to Check Spark in Parallel
# bash:
pyspark
# Note 1: usually is the one in /usr/lib/bin/pyspark
# Note 2: should show python version and spark version
# py:
sc.defaultParallelism # check is using enough parallel CPUs
pF = sc.textFile('/usr/lib/spark/README.md')
pF.count()
pF.first()
sc.stop()
# Demo 2: Check one Function in Parallel
## usage: wget goo.gl/kRQhTE -O demo2.py && spark-submit demo2.py
## this script can take one argument which is the spark master host/ip, otherwise will use localhost.
import sys
import socket
import linecache
from pyspark import SparkContext, SparkConf
if(len(sys.argv) > 1):
master_host = sys.argv[1]
else:
master_host = 'localhost'
distributed_input_file_path = '/etc/passwd' # or /usr/lib/spark/README.md which is usually > 100 lines
local_output_file_name = 'all_result_listing_demo2.txt' # current folder
appName = 'demo2'
master = 'spark://' + master_host + ':7077'
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
def get_one_node_one_line(line_index):
return_str = 'Line: '
return_str += str(line_index)
return_str += ', Host: '
return_str += socket.gethostname()
return_str += ', Content: '
return_str += linecache.getline(distributed_input_file_path, line_index) # TO-IMPROVE: sub-str for lone lines.
return_str += '\n'
return(return_str)
line_nr = sum(1 for line in open(distributed_input_file_path))
main_data = list(range(line_nr))
partition_nr = min(int(3.5 * sc.defaultParallelism), 1000, len(main_data))
my_rdd = sc.parallelize(main_data, partition_nr)
map_result = my_rdd.map(get_one_node_one_line)
all_result_listing = map_result.collect()
print(all_result_listing)
with open(local_output_file_name, 'w') as f:
for item in all_result_listing:
f.write("%s\n" % item)
# Demo 3: Check Spark Tasks Submitted to k8s (compatible with demo2)
## usage 1 (in console): wget <this file> -O demo3.py && \
## spark-submit <other spark-submit-options> demo3.py [spark://...:7077 or k8s://https://...:443, default is spark://localhost:7077]
## usage 2 (in spark-jupyter): copy and paste all and run cell (spark session or context already available).
## usage 3 (in pure-jupyter): copy and paste all, modify master host/ip and run cell.
## make sure k8s server is >= 1.6 (by: kubectl version)
## make sure spark >= 2.1 (by: pyspark --version) (tested on ...)
## this script can take one argument which is the spark or k8s master / api server, otherwise will use localhost.
import sys
import socket
import linecache
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
app_name = 'demo3'
distributed_input_file_path = '/etc/passwd' # /usr/lib/spark/README.md usually > 100 lines
local_output_file_name = 'all_result_listing_demo2.txt' # current folder
use_k8s = True # else, use spark w/o k8s.
############################### start auto-config ###############################
if(len(sys.argv) > 1):
master = sys.argv[1]
else:
master = 'spark://localhost:7077'
defined_spark = False
try:
spark
except NameError:
print('spark is NOT defined, will fall back to sc')
spark = SparkSession.builder \
.master(master) \
.appName(app_name) \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
defined_spark = True
else:
print('ok, spark is defined')
defined_spark = True
try:
sc
except NameError:
print('sc is NOT defined, defining...')
if defined_spark:
sc = spark.sparkContext
else:
conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)
print('ok, sc is newly defined.')
else:
print('ok, sc is defined.')
############################### end auto-config ###############################
print("Current python interpreter host:")
socket.gethostname()
print("\n\n################# distributed processing of /etc/passwd file: #################n\n")
def get_one_node_one_line(line_index):
return_str = 'Line: '
return_str += str(line_index)
return_str += ', Host: '
return_str += socket.gethostname()
return_str += ', Content: '
return_str += linecache.getline(distributed_input_file_path, line_index) # TO-IMPROVE: sub-str for lone lines.
return_str += '\n'
return(return_str)
line_nr = sum(1 for line in open(distributed_input_file_path))
main_data = list(range(line_nr))
partition_nr = min(int(3.5 * sc.defaultParallelism), 1000, len(main_data))
my_rdd = sc.parallelize(main_data, partition_nr)
map_result = my_rdd.map(get_one_node_one_line)
all_result_listing = map_result.collect()
print(all_result_listing)
with open(local_output_file_name, 'w') as f:
for item in all_result_listing:
f.write("%s\n" % item)

This is a gist of collection of simple spark tests for Jupyter and spark-submit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment