翻译:Hadoop权威指南之Spark-3

弹性分布式数据集

RDD是每个spark程序的核心,本节我们来看看更多细节。

创建

创建RDD有三种方式:从一个内存中的对象集合,被称为并行化(parallelizing) 一个集合;使用一个外部存储(比如HDFS)的数据集;转换已存在的RDD。在对少量的输入数据并行地进行CPU密集型运算时,第一种方式非常有用。例如,下面执行从1到10的独立运算:

1
2
val params = sc.parallelize(1 to 10)
val result = params.map(performExpensiveComputation)

函数performExpensiveComputation并行处理输入数据。并行性的级别由属性spark.default.parallelism决定,该属性的默认值取决于Spark的运行方式。本地运行时,是本地机器的核心数量,集群运行时,是集群中所有执行器(executor)节点的核心总数量。

可以为某个特定运算设置并行性级别,指定parallelize()方法的第二个参数即可:

1
sc.parallelize(1 to 10, 10)

创建RDD的第二种方式,是创建一个指向外部数据集的引用。我们已经见过怎样为一个文本文件创建String对象的RDD:

1
val text:RDD[String] = sc.textFile(inputPath)

路径inputPath可以是任意的Hadoop文件系统路径,比如本地文件系统或HDFS上的一个文件。内部来看,Spark使用旧的MapReduce API中的TextInputFormat来读取这个文件。这就意味着文件切分行为与Hadoop是一样的,因此在HDFS的情况下,一个Spark分区对应一个HDFS块(block)。这个默认行为可以改变,传入第二个参数来请求一个特殊的切分数量:

1
sc.textFile(inputPath, 10)

另外一个方法允许把多个文本文件作为一个整体来处理,返回的RDD中,是成对的string,第一个string是文件的路径,第二个string是文件的内容。因为每个文件都会加载进内存,所以这种方式仅仅适合于小文件:

1
val files:RDD[(String, String)] = sc.wholeTextFiles(inputPath)

Spark能够处理文本文件以外的其他文件格式,比如,序列文件可以这样读入:

1
sc.sequenceFile[IntWritable, Text](inputPath)

注意这里指定序列文件的键和值的Writable类型的方式。对于常用的Writable类型,Spark能够映射到Java中的等价物,因此我们可以使用等价的方式:

1
sc.sequenceFile[Int, String](inputPath)

从任意的Hadoop InputFormat来创建RDD,有两种方式:基于文件的格式,使用hadoopFile(),接收一个路径;其他格式,比如HBase的TableInputFormat,使用hadoopRDD()。这些方法使用的是旧的MapReduce API。如果要用新的MapReduce API,使用newAPIHadoopFile()和newAPIHadoopRDD()。下面是读取Avro数据文件的示例,使用特定的API和一个WeatherRecord类:

1
2
3
4
5
6
val job = new Job()
AvroJob.setInputKeySchema(job, WeatherRecord.getClassSchema)
val data = sc.newAPIHadoopFile(inputPath,
classOf[AvroKeyInputFormat[WeatherRecord]],
classOf[AvroKey[WeatherRecord]], classOf[NullWritable],
job.getConfiguration)

除了路径之外,newAPIHadoopFile()方法还需要InputFormat的类型、键的类型、值的类型,再加上Hadoop配置,该配置中带有Avro的模式 schema,在第二行我们使用AvroJob帮助类做了设置。

创建RDD的第三种方式,是转换已存在的RDD。

转换和行动

Spark提供两种类型的操作:转换transformations和行动actions。“转换”从已存在的RDD生成新的RDD,而“行动”会触发运算并输出结果——返回给用户,或者保存到外部存储。

行动会立刻产生影响,而转换不会——它们是lazy的,它们不做任何工作,直到行动被触发。下面的例子,把文本文件中的每一行转为小写:

1
2
3
val text = sc.textFile(inputPath)
val lower: RDD[String] = text.map(_.toLowerCase())
lower.foreach(println(_))

map()方法是个转换操作,Spark内部这样处理:稍晚的时候,一个函数(这里是toLowerCase())会被调用,来处理RDD中的每一个元素。这个函数实际上没有执行,直到foreach()方法(这是个行动)被调用,然后Spark会运行一个作业,读取输入的文件,对文件中的每一行调用toLowerCase(),然后把结果写到控制台。

怎样分辨一个操作究竟是转换还是行动呢?一个方法是看它的返回类型:如果返回类型是RDD,这是个转换;否则就是行动。当你查阅RDD的文档时,这种方法是很有用的。对RDD执行的大多数操作,可以在RDD的文档(org.apache.spark.rdd包)中找到,更多的操作在PairRDDFunctions里,这里包含了处理键值对RDD的转换和行动。

Spark的库中包含了丰富的操作,有转换者诸如映射(mapping)、分组(grouping)、聚合(aggregating)、再分区(repartitioning)、取样(sampling)、连接(joining)多个RDD、把RDDs作为集合(sets)对待。还有行动者诸如把RDDs物化(materializing)为集合(collections)、对RDD进行计算统计、从RDD中取样出固定数目的元素,把RDD保存到外部存储。细节内容,查看文档。

Spark中的MapReduce

尽管名字很有暗示性,Spark中的map()和reduce()操作,与Hadoop MapReduce中相同名字的函数,不是直接对应的。Hadoop MapReduce中的map和reduce的通常形式是:

1
2
map: (K1, V1) -> list(K2, V2)
reduce: (K2, list(V2)) -> list(K3, V3)

从list标记可以看出,这两个函数都可以返回多个输出对。这种操作在Spark(Scala)中被实现为flatMap(),与map()很像,但是移除了一层嵌套:

1
2
3
4
5
6
7
8
scala> val l = List(1, 2, 3)
l: List[Int] = List(1, 2, 3)

scala> l.map(a => List(a))
res0: List[List[Int]] = List(List(1), List(2), List(3))

scala> l.flatMap(a => List(a))
res1: List[Int] = List(1, 2, 3)

有一种朴素的方式,可以在Spark中模拟Hadoop MapReduce。用两个flatMap()操作,中间用groupByKey()和sortByKey()来执行MapReduce的混洗和排序:

1
2
3
4
val input: RDD[(K1, V1)] = ...
val mapOutput: RDD[(K2, V2)] = input.flatMap(mapFn)
val shuffled: RDD[(K2, Iterable[V2])] = mapOutput.groupByKey().sortByKey()
val output: RDD[(K3, V3)] = shuffled.flatMap(reduceFn)

这里key的类型K2要继承自Scala的Ordering类型,以满足sortByKey()。

这个例子可以帮助我们理解MapReduce和Spark的关系,但是不能盲目应用。首先,这里的语义和Hadoop的MapReduce有微小的差别,sortByKey()执行的是全量排序。使用repartitionAndSortWithinPartitions()方法来执行部分排序,可以避免这个问题。然而,这样还是无效的,因为Spark有两次混洗的过程(一次groupByKey(),一次sort)。

与其重造MapReduce,不如仅仅使用那些你实际需要的操作。比如,如果不需要按key排序,你可以省略sortByKey(),这在Hadoop MapReduce中是不可能的。

同样的,大多数情况下groupByKey()太普遍了。通常只在聚合数据时需要混洗,因此应该使用reduceByKey(),foldByKey(),或者aggregateByKey(),这些函数比groupByKey()更有效率,因为它们可以在map任务中作为combiner运行。最后,flatMap()可能总是不需要的,如果总有一个返回值,map()是首选,如果有0或1个返回值,使用filter()。

聚合转换

根据key来聚合键值对RDD的三个主要的转换操作是reduceByKey(),foldByKey(),和aggregateByKey()。它们的工作方式稍有不同,但它们都是根据键来聚合值的,为每一个键生成一个单独的值。对应的行动是reduce(),fold()和aggregate(),它们以类似的方式运行,为整个RDD输出一个单独的值。

最简单的是reduceByKey(),它对成对儿的值反复执行一个函数,直到生成一个单独的值。例如:

1
2
3
4
val pairs: RDD[(String, Int)] =
sc.parallelize(Array(("a", 3), ("a", 1), ("b", 7), ("a", 5)))
val sums: RDD[(String, Int)] = pairs.reduceByKey(_+_)
assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))

键 a 对应的值,使用相加函数(_+_)聚合起来,(3 + 1)+ 5 = 9,而键 b 对应的值只有一个,因此不需要聚合。一般来说,这些操作是分布式的,在RDD的不同分区对应的任务中分别执行,因此这些函数要具有互换性和连接性。换句话说,操作的顺序和分组是不重要的。这种情况下,聚合函数可以这样执行 5 +(3 + 1),或者 3 + (1 + 5),都会返回相同的结果。

在assert语句中使用的三联相等操作符(===),来自ScalaTest,比通常的 == 操作符提供更多有用的失败信息。

下面是用foldByKey()来执行相同的操作:

1
2
val sums: RDD[(String, Int)] = pairs.foldByKey(0)(_+_)
assert(sums.collect().toSet === Set(("a", 9), ("b", 7)))

注意到这次我们需要提供一个 零值,整数相加时是0,但如果是别的类型和操作,零值将是其他不同的东西。这一次,键 a 对应的值聚合的方式是((0 + 3)+ 1)+ 5)= 9(也可能是其他的顺序,不过加 0 总是第一个操作)。对于 b 是0 + 7 = 7。

使用foldByKey(),并不比reduceByKey()更强或更弱。特别地,也不能改变聚合结果的值类型。为此我们需要aggregateByKey(),例如,我们可以把那些整数值聚合到一个集合里:

1
2
3
val sets: RDD[(String, HashSet[Int])] =
pairs.aggregateByKey(new HashSet[Int])(_+=_, _++=_)
assert(sets.collect.toSet === Set(("a", Set(1, 3, 5)), ("b", Set(7))))

集合相加时,零值是空集合,因此我们用new HashSet[Int]来创建一个新的可变集合。我们需要向aggregateByKey()提供两个函数作为参数。第一个函数用来控制怎样把一个Int和一个HashSet[Int]相加,本例中我们用加等函数 _+=_ 把整数加到集合里面(_+_ 会返回一个新集合,旧集合不会改变)。

第二个函数用来控制怎样把两个HashSet[Int]相加(这种情况发生在map任务的combiner执行之后,reduce任务把两个分区聚合之时),这里我们使用 ++= 把第二个集合的所有元素加到第一个集合里。

对于键 a,操作的顺序可能是:
(( ∅ + 3) + 1) + 5) = (1, 3, 5)
或者:
( ∅ + 3) + 1) ++ ( ∅ + 5) = (1, 3) ++ (5) = (1, 3, 5)
如果Spark使用了组合器(combiner)的话。

转换后的RDD可以持久化到内存中,因此后续的操作效率很高。