Scala 是 Spark 的主要编程语言,但是用 Java、Python 也可以写 Spark 应用。
Spark 使用一种叫做 RDD(弹性分布式数据集)的方式将分布式的元素集合进行抽象。
RDD 支持两种类型的操作:actions(计算后返回值) 和 transformations(数据集转换)。
环境:
CentOS 7.4
java-1.8.0-openjdk
scala-2.11.11
sbt-1.0.3
下载:
wget http://mirror.ox.ac.uk/sites/rsync.apache.org/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar xvf spark-2.2.0-bin-hadoop2.7.tgz
cd spark-2.2.0-bin-hadoop2.7/
设置环境变量:
echo "export PATH=`pwd`/bin:`pwd`/sbin:\$PATH" | sudo tee /etc/profile.d/spark.sh
source /etc/profile.d/spark.sh
新建配置:
cp conf/spark-env.sh.template conf/spark-env.sh
cp conf/log4j.properties.template conf/log4j.properties
修改 conf/log4j.properties
,禁止 INFO
日志,否则输出太多:
# log4j.rootCategory=INFO, console
log4j.rootCategory=WARN, console
修改 conf/spark-env.sh
:
SPARK_DIST_CLASSPATH=$(hadoop classpath)
运行 Example:
$ run-example SparkPi 2>&1 | grep "Pi is roughly"
Pi is roughly 3.1398356991784957
或者运行 Python 版本:
$ spark-submit examples/src/main/python/pi.py 2>&1 | grep "Pi is roughly"
Pi is roughly 3.146220
spark-shell
> val textFile = sc.textFile("file:///home/vagrant/apps/spark-2.2.0-bin-hadoop2.7/README.md") // 默认读 HDFS
> textFile.count() // 元素数量。对于文件就是行数
> textFile.first() // 第一个元素。对于文件就是首行
> lines = textFile.filter(line => line.contains("Spark")) // 转换操作
> lines.count()
> lines.cache() // 缓存结果至每个节点的内存中
启动 spark-shell 时会自动初始化对象 Spark Context
和 Spark Session
。
$ cat examples/src/main/resources/people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
$ spark-shell
# 注意下面两行输出
Spark context available as 'sc' (master = local[*], app id = local-1510197720606).
Spark session available as 'spark'.
# 初始化 org.apache.spark.sql.DataFrame 对象
> val d = spark.read.json("file:///home/vagrant/apps/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.json")
> d.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
> d.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
> d.filter(d("age") > 20).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
> d.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
# 使用 SQL
> d.registerTempTable("people")
> val result = spark.sql("SELECT name, age FROM people WHERE age < 20")
> result.show()
+------+---+
| name|age|
+------+---+
|Justin| 19|
+------+---+
下面的样例可演示实时单词统计。
创建文件 network_wordcount.py
(来自 example 目录):
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
# 默认的下面这行在终端看不到输出
# sc = SparkContext(appName="PythonStreamingNetworkWordCount")
sc = SparkContext(master="local[2]", appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
终端1执行:
$ nc -lk 9999
终端2执行:
$ spark-submit spark-streaming-demo.py
在终端1输入若干单词后回车,就能在终端2看到单词统计。
创建工程目录结构:
sbt new sbt/scala-seed.g8 # 提示输入项目名,比如 demo
cd demo
新建 src/main/scala/SimpleApp.scala
:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
def main(args: Array[String]) {
val logFile = "file:///home/vagrant/apps/spark-2.2.0-bin-hadoop2.7/README.md"
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile, 2).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
}
}
修改 build.sbt
:
import Dependencies._
lazy val root = (project in file(".")).
settings(
inThisBuild(List(
organization := "com.example",
scalaVersion := "2.11.1", // 修改
version := "0.1.0-SNAPSHOT"
)),
name := "SimpleApp", // 修改
libraryDependencies += scalaTest % Test,
libraryDependencies += sparkCore // 新增
)
修改 project/Dependencies.scala
:
import sbt._
object Dependencies {
lazy val scalaTest = "org.scalatest" %% "scalatest" % "3.0.3"
lazy val sparkCore = "org.apache.spark" %% "spark-core" % "2.2.0" // 新增
}
打包后提交到 Spark 中运行:
$ sbt package
$ spark-submit --class "SimpleApp" target/scala-2.11/simpleapp_2.11-0.1.0-SNAPSHOT.jar 2>&1 | grep "Lines with a:"
Lines with a: 61, Lines with b: 30
设置 master="local[2]"
,见 Spark 2.2.0 文档。