翻译:Hadoop权威指南之Spark-1

本文翻译自O’Reilly出版Tom White所著《Hadoop: The Definitive Guide》第4版第19章,向作者致敬。该书英文第4版已于2015年4月出版,至今已近15个月,而市面上中文第3版还在大行其道。Spark一章是第4版新增的内容,笔者在学习过程中顺便翻译记录。由于笔者也在学习,难免翻译不妥或出错,欢迎方家斧正。翻译纯属兴趣,不做商业用途。


Apache Spark 是一个大规模数据处理的集群计算框架。和本书中讨论的大多数其他处理框架不同,Spark不使用MapReduce作为执行引擎,作为替代,Spark使用自己的分布式运行环境(distributed runtime)来执行集群上的工作。然而,Spark与MapReduce在API和runtime方面有许多相似,本章中我们将会看到。Spark和Hadoop紧密集成:它可以运行在YARN上,处理Hadoop的文件格式,后端存储采用HDFS。

Spark最著名的是它拥有把大量的工作数据集保持在内存中的能力。这种能力使得Spark胜过对应的MapReduce工作流(某些情况下差别显著),在MapReduce中数据集总是要从磁盘加载。两种类型的应用从Spark这种处理模型中受益巨大:1)迭代算法,一个函数在某数据集上反复执行直到满足退出条件。2)交互式分析,用户在某数据集上执行一系列的特定查询。

即使你不需要内存缓存,Spark依然有充满魅力的理由:它的DAG引擎和用户体验。与MapReduce不同,Spark的DAG引擎能够处理任意的多个操作组成的管道(pipelines of operators)并翻译为单个Job。

Spark的用户体验也是首屈一指的(second to none),它有丰富的API用来执行很多常见的数据处理任务,比如join。行文之时,Spark提供三种语言的API:Scala,Java和Python。本章中的大多数例子将采用Scala API,但翻译为别的语言也是容易的。Spark还带有一个基于Scala或Python的REPL(read-eval-print loop)环境,可以快速简便的查看数据集。

Spark是个构建分析工具的好平台,为达此目的,Apache Spark项目包含了众多的模块:机器学习(MLlib),图形处理(GraphX),流式处理(Spark Streaming),还有SQL(Spark SQL)。本章内容不涉及这些模块,感兴趣的读者可以访问 Apache Spark 网站

安装Spark

下载页面 下载Spark二进制分发包的稳定版本(选择和你正在使用的Hadoop版本相匹配的)。在合适的地方解压这个tar包。

1
% tar xzf spark-x.y.z-bin-distro.tgz

把Spark加入到PATH环境变量中

1
2
% export SPARK_HOME=~/sw/spark-x.y.z-bin-distro
% export PATH=$PATH:$SPARK_HOME/bin

我们现在可以运行Spark的例子了。

示例

为了介绍Spark,我们使用spark-shell来运行一个交互式会话,这是带有Spark附加组件的Scala REPL,用下面的命令启动shell:

1
2
3
% spark-shell
Spark context available as sc.
scala>

从控制台的输出,我们可以看到shell创建了一个Scala变量,sc,用来存储SparkContext实例。这是Spark的入口,我们可以这样加载一个文本文件:

1
2
scala> val lines = sc.textFile("input/ncdc/micro-tab/sample.txt")
lines: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12

lines变量是对一个弹性数据集(RDD)的引用,RDD是Spark的核心抽象:分区在集群中多台机器上的只读的对象集合。在典型的Spark程序中,一个或多个RDD被加载进来作为输入,经过一系列的转换操作(transformation),成为一组目标RDD,可以对其执行行动(action)(比如计算结果或者写入持久化存储) 。“弹性数据集”中的“弹性”是指,Spark会通过从源RDD中重新计算的方式,来自动重建一个丢失的分区。

加载RDD和执行转换操作不会触发数据处理,仅仅是创建一个执行计算的计划。当行动(比如 foreach())执行的时候,才会触发计算。

我们要做的第一个转换操作,是把lines拆分为fields:

1
2
scala> val records = lines.map(_.split("\t"))
records: org.apache.spark.rdd.RDD[Array[String]] = MappedRDD[2] at map at <console>:14

这里使用了RDD的map()方法,对RDD中的每一个元素,执行一个函数。本例中,我们把每一行(字符串String)拆分为 Scala 的字符串数组(Array of Strings)。

接下来,我们使用过滤器(filter)来去掉可能存在的坏记录:

1
2
scala> val filtered = records.filter(rec => (rec(1) != "9999" && rec(2).matches("[01459]")))
filtered: org.apache.spark.rdd.RDD[Array[String]] = FilteredRDD[3] at filter at <console>:16

RDD的filter方法接收一个返回布尔值的函数作为参数。这个函数过滤掉那些温度缺失的(由9999表示)或者质量不好的记录。

为了找到每一年的最高气温,我们需要在year字段上执行分组操作,这样才能处理每一年的所有温度值。Spark提供reduceByKey()方法来做这件事情,但它需要一个键值对RDD,因此我们需要通过另一个map来把现有的RDD转变为正确的形式:

1
2
scala> val tuples = filtered.map(rec => (rec(0).toInt, rec(1).toInt))
tuples: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[4] at map at <console>:18

现在可以执行聚合了。reduceByKey()方法的参数是一个函数,这个函数接受两个数值并联合为一个单独的数值。这里我们使用Java的Math.max函数:

1
2
scala> val maxTemps = tuples.reduceByKey((a, b) => Math.max(a, b))
maxTemps: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[7] at reduceByKey at <console>:21

现在可以展示maxTemps的内容了,调用foreach()方法并传入println(),把每个元素打印到控制台:

1
2
3
scala> maxTemps.foreach(println(_))
(1950,22)
(1949,111)

这个foreach()方法,与标准Scala集合(比如List)中的等价物相同,对RDD中的每个元素应用一个函数(此函数具有副作用)。正是这个操作,促使Spark运行一个作业来计算RDD中的数据,使之能够跑步通过println()方法:-)

或者,也可以把RDD保存到文件系统:

1
scala> maxTemps.saveAsTextFile("output")

这样会创建一个output目录,包含分区文件:

1
2
3
% cat output/part-*
(1950,22)
(1949,111)

这个saveAsTextFile()方法也会触发一个Spark作业。主要的区别是没有返回值,而是把RDD的计算结果及其分区文件写入output目录中。

Spark应用、作业、阶段、任务

示例中我们看到,和MapReduce一样,Spark也有作业的概念。然而,Spark作业比MapReduce作业更通用,因为它是由任意的阶段组成的有向无环图(DAG)。每个阶段大致等同于MapReduce中的map或者reduce阶段。

阶段被Spark运行时拆分为任务,并行地运行在RDD的分区之上,就像MapReduce的任务一样。

一个作业总是运行于一个应用的上下文中,由SparkContext实例表示,应用的作用是分组RDD和共享变量。一个应用可以运行多个作业,串行或者并行,并且提供一种机制,使得一个作业可以访问同一应用中前一个作业缓存的RDD。一个交互式的Spark会话,比如spark-shell会话,就是一个应用的实例。