Skip to content

Instantly share code, notes, and snippets.

@ericzhong
Last active November 26, 2017 08:53
Show Gist options
  • Save ericzhong/c2f31597dd2a69a51212198499cc794d to your computer and use it in GitHub Desktop.
Save ericzhong/c2f31597dd2a69a51212198499cc794d to your computer and use it in GitHub Desktop.
Spark 安装与使用

介绍

Scala 是 Spark 的主要编程语言,但是用 Java、Python 也可以写 Spark 应用。

Spark 使用一种叫做 RDD(弹性分布式数据集)的方式将分布式的元素集合进行抽象。

RDD 支持两种类型的操作:actions(计算后返回值) 和 transformations(数据集转换)。

安装

环境:

CentOS 7.4
java-1.8.0-openjdk
scala-2.11.11
sbt-1.0.3

下载:

wget http://mirror.ox.ac.uk/sites/rsync.apache.org/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
tar xvf spark-2.2.0-bin-hadoop2.7.tgz
cd spark-2.2.0-bin-hadoop2.7/

设置环境变量:

echo "export PATH=`pwd`/bin:`pwd`/sbin:\$PATH" | sudo tee /etc/profile.d/spark.sh
source /etc/profile.d/spark.sh

新建配置:

cp conf/spark-env.sh.template conf/spark-env.sh
cp conf/log4j.properties.template  conf/log4j.properties

修改 conf/log4j.properties,禁止 INFO 日志,否则输出太多:

# log4j.rootCategory=INFO, console
log4j.rootCategory=WARN, console

修改 conf/spark-env.sh

SPARK_DIST_CLASSPATH=$(hadoop classpath)

运行 Example:

$ run-example SparkPi 2>&1 | grep "Pi is roughly"
Pi is roughly 3.1398356991784957

或者运行 Python 版本:

$ spark-submit examples/src/main/python/pi.py 2>&1 | grep "Pi is roughly"
Pi is roughly 3.146220

入门

Spark Shell

spark-shell
> val textFile = sc.textFile("file:///home/vagrant/apps/spark-2.2.0-bin-hadoop2.7/README.md")  // 默认读 HDFS
> textFile.count()     // 元素数量。对于文件就是行数
> textFile.first()     // 第一个元素。对于文件就是首行
> lines = textFile.filter(line => line.contains("Spark"))   // 转换操作
> lines.count()
> lines.cache()   // 缓存结果至每个节点的内存中

Spark SQL 和 DataFrame

启动 spark-shell 时会自动初始化对象 Spark ContextSpark Session

$ cat examples/src/main/resources/people.json 
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

$ spark-shell
# 注意下面两行输出
Spark context available as 'sc' (master = local[*], app id = local-1510197720606).
Spark session available as 'spark'.
# 初始化 org.apache.spark.sql.DataFrame 对象
> val d = spark.read.json("file:///home/vagrant/apps/spark-2.2.0-bin-hadoop2.7/examples/src/main/resources/people.json")
> d.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
> d.select("name").show()
+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+
> d.filter(d("age") > 20).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
> d.groupBy("age").count().show()
+----+-----+                                                                    
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+
# 使用 SQL
> d.registerTempTable("people")
> val result = spark.sql("SELECT name, age FROM people WHERE age < 20")
> result.show()
+------+---+
|  name|age|
+------+---+
|Justin| 19|
+------+---+

Spark Streaming

下面的样例可演示实时单词统计。

创建文件 network_wordcount.py(来自 example 目录):

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    # 默认的下面这行在终端看不到输出
    # sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    sc = SparkContext(master="local[2]", appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
                  .map(lambda word: (word, 1))\
                  .reduceByKey(lambda a, b: a+b)
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

终端1执行:

$ nc -lk 9999

终端2执行:

$ spark-submit spark-streaming-demo.py

在终端1输入若干单词后回车,就能在终端2看到单词统计。

Spark 编程

创建工程目录结构:

sbt new sbt/scala-seed.g8       # 提示输入项目名,比如 demo
cd demo

新建 src/main/scala/SimpleApp.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "file:///home/vagrant/apps/spark-2.2.0-bin-hadoop2.7/README.md"
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

修改 build.sbt

import Dependencies._

lazy val root = (project in file(".")).
  settings(
    inThisBuild(List(
      organization := "com.example",
      scalaVersion := "2.11.1",           // 修改
      version      := "0.1.0-SNAPSHOT"
    )),
    name := "SimpleApp",     // 修改
    libraryDependencies += scalaTest % Test,
    libraryDependencies += sparkCore            // 新增
  )

修改 project/Dependencies.scala

import sbt._

object Dependencies {
  lazy val scalaTest = "org.scalatest" %% "scalatest" % "3.0.3"
  lazy val sparkCore = "org.apache.spark" %% "spark-core" % "2.2.0"    // 新增
}

打包后提交到 Spark 中运行:

$ sbt package
$ spark-submit --class "SimpleApp" target/scala-2.11/simpleapp_2.11-0.1.0-SNAPSHOT.jar  2>&1 | grep "Lines with a:"
Lines with a: 61, Lines with b: 30

Troubleshooting

运行 spark streaming 样例时在终端看不到输出

设置 master="local[2]",见 Spark 2.2.0 文档。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment