弹性分布式数据集
RDD是每个spark程序的核心,本节我们来看看更多细节。
创建
创建RDD有三种方式:从一个内存中的对象集合,被称为并行化(parallelizing) 一个集合;使用一个外部存储(比如HDFS)的数据集;转换已存在的RDD。在对少量的输入数据并行地进行CPU密集型运算时,第一种方式非常有用。例如,下面执行从1到10的独立运算:
1 | val params = sc.parallelize(1 to 10) |
函数performExpensiveComputation并行处理输入数据。并行性的级别由属性spark.default.parallelism决定,该属性的默认值取决于Spark的运行方式。本地运行时,是本地机器的核心数量,集群运行时,是集群中所有执行器(executor)节点的核心总数量。
可以为某个特定运算设置并行性级别,指定parallelize()方法的第二个参数即可:
1 | sc.parallelize(1 to 10, 10) |
创建RDD的第二种方式,是创建一个指向外部数据集的引用。我们已经见过怎样为一个文本文件创建String对象的RDD:
1 | val text:RDD[String] = sc.textFile(inputPath) |
路径inputPath可以是任意的Hadoop文件系统路径,比如本地文件系统或HDFS上的一个文件。内部来看,Spark使用旧的MapReduce API中的TextInputFormat来读取这个文件。这就意味着文件切分行为与Hadoop是一样的,因此在HDFS的情况下,一个Spark分区对应一个HDFS块(block)。这个默认行为可以改变,传入第二个参数来请求一个特殊的切分数量:
1 | sc.textFile(inputPath, 10) |
另外一个方法允许把多个文本文件作为一个整体来处理,返回的RDD中,是成对的string,第一个string是文件的路径,第二个string是文件的内容。因为每个文件都会加载进内存,所以这种方式仅仅适合于小文件:
1 | val files:RDD[(String, String)] = sc.wholeTextFiles(inputPath) |
Spark能够处理文本文件以外的其他文件格式,比如,序列文件可以这样读入:
1 | sc.sequenceFile[IntWritable, Text](inputPath) |
注意这里指定序列文件的键和值的Writable类型的方式。对于常用的Writable类型,Spark能够映射到Java中的等价物,因此我们可以使用等价的方式:
1 | sc.sequenceFile[Int, String](inputPath) |
从任意的Hadoop InputFormat来创建RDD,有两种方式:基于文件的格式,使用hadoopFile(),接收一个路径;其他格式,比如HBase的TableInputFormat,使用hadoopRDD()。这些方法使用的是旧的MapReduce API。如果要用新的MapReduce API,使用newAPIHadoopFile()和newAPIHadoopRDD()。下面是读取Avro数据文件的示例,使用特定的API和一个WeatherRecord类:
1 | val job = new Job() |
除了路径之外,newAPIHadoopFile()方法还需要InputFormat的类型、键的类型、值的类型,再加上Hadoop配置,该配置中带有Avro的模式 schema,在第二行我们使用AvroJob帮助类做了设置。
创建RDD的第三种方式,是转换已存在的RDD。
转换和行动
Spark提供两种类型的操作:转换transformations和行动actions。“转换”从已存在的RDD生成新的RDD,而“行动”会触发运算并输出结果——返回给用户,或者保存到外部存储。
行动会立刻产生影响,而转换不会——它们是lazy的,它们不做任何工作,直到行动被触发。下面的例子,把文本文件中的每一行转为小写:
1 | val text = sc.textFile(inputPath) |
map()方法是个转换操作,Spark内部这样处理:稍晚的时候,一个函数(这里是toLowerCase())会被调用,来处理RDD中的每一个元素。这个函数实际上没有执行,直到foreach()方法(这是个行动)被调用,然后Spark会运行一个作业,读取输入的文件,对文件中的每一行调用toLowerCase(),然后把结果写到控制台。
怎样分辨一个操作究竟是转换还是行动呢?一个方法是看它的返回类型:如果返回类型是RDD,这是个转换;否则就是行动。当你查阅RDD的文档时,这种方法是很有用的。对RDD执行的大多数操作,可以在RDD的文档(org.apache.spark.rdd包)中找到,更多的操作在PairRDDFunctions里,这里包含了处理键值对RDD的转换和行动。
Spark的库中包含了丰富的操作,有转换者诸如映射(mapping)、分组(grouping)、聚合(aggregating)、再分区(repartitioning)、取样(sampling)、连接(joining)多个RDD、把RDDs作为集合(sets)对待。还有行动者诸如把RDDs物化(materializing)为集合(collections)、对RDD进行计算统计、从RDD中取样出固定数目的元素,把RDD保存到外部存储。细节内容,查看文档。
Spark中的MapReduce
尽管名字很有暗示性,Spark中的map()和reduce()操作,与Hadoop MapReduce中相同名字的函数,不是直接对应的。Hadoop MapReduce中的map和reduce的通常形式是:
1 | map: (K1, V1) -> list(K2, V2) |
从list标记可以看出,这两个函数都可以返回多个输出对。这种操作在Spark(Scala)中被实现为flatMap(),与map()很像,但是移除了一层嵌套:
1 | scala> val l = List(1, 2, 3) |
有一种朴素的方式,可以在Spark中模拟Hadoop MapReduce。用两个flatMap()操作,中间用groupByKey()和sortByKey()来执行MapReduce的混洗和排序:
1 | val input: RDD[(K1, V1)] = ... |
这里key的类型K2要继承自Scala的Ordering类型,以满足sortByKey()。
这个例子可以帮助我们理解MapReduce和Spark的关系,但是不能盲目应用。首先,这里的语义和Hadoop的MapReduce有微小的差别,sortByKey()执行的是全量排序。使用repartitionAndSortWithinPartitions()方法来执行部分排序,可以避免这个问题。然而,这样还是无效的,因为Spark有两次混洗的过程(一次groupByKey(),一次sort)。
与其重造MapReduce,不如仅仅使用那些你实际需要的操作。比如,如果不需要按key排序,你可以省略sortByKey(),这在Hadoop MapReduce中是不可能的。
同样的,大多数情况下groupByKey()太普遍了。通常只在聚合数据时需要混洗,因此应该使用reduceByKey(),foldByKey(),或者aggregateByKey(),这些函数比groupByKey()更有效率,因为它们可以在map任务中作为combiner运行。最后,flatMap()可能总是不需要的,如果总有一个返回值,map()是首选,如果有0或1个返回值,使用filter()。
聚合转换
根据key来聚合键值对RDD的三个主要的转换操作是reduceByKey(),foldByKey(),和aggregateByKey()。它们的工作方式稍有不同,但它们都是根据键来聚合值的,为每一个键生成一个单独的值。对应的行动是reduce(),fold()和aggregate(),它们以类似的方式运行,为整个RDD输出一个单独的值。
最简单的是reduceByKey(),它对成对儿的值反复执行一个函数,直到生成一个单独的值。例如:
1 | val pairs: RDD[(String, Int)] = |
键 a 对应的值,使用相加函数(_+_)聚合起来,(3 + 1)+ 5 = 9,而键 b 对应的值只有一个,因此不需要聚合。一般来说,这些操作是分布式的,在RDD的不同分区对应的任务中分别执行,因此这些函数要具有互换性和连接性。换句话说,操作的顺序和分组是不重要的。这种情况下,聚合函数可以这样执行 5 +(3 + 1),或者 3 + (1 + 5),都会返回相同的结果。
在assert语句中使用的三联相等操作符(===),来自ScalaTest,比通常的 == 操作符提供更多有用的失败信息。
下面是用foldByKey()来执行相同的操作:
1 | val sums: RDD[(String, Int)] = pairs.foldByKey(0)(_+_) |
注意到这次我们需要提供一个 零值,整数相加时是0,但如果是别的类型和操作,零值将是其他不同的东西。这一次,键 a 对应的值聚合的方式是((0 + 3)+ 1)+ 5)= 9(也可能是其他的顺序,不过加 0 总是第一个操作)。对于 b 是0 + 7 = 7。
使用foldByKey(),并不比reduceByKey()更强或更弱。特别地,也不能改变聚合结果的值类型。为此我们需要aggregateByKey(),例如,我们可以把那些整数值聚合到一个集合里:
1 | val sets: RDD[(String, HashSet[Int])] = |
集合相加时,零值是空集合,因此我们用new HashSet[Int]来创建一个新的可变集合。我们需要向aggregateByKey()提供两个函数作为参数。第一个函数用来控制怎样把一个Int和一个HashSet[Int]相加,本例中我们用加等函数 _+=_ 把整数加到集合里面(_+_ 会返回一个新集合,旧集合不会改变)。
第二个函数用来控制怎样把两个HashSet[Int]相加(这种情况发生在map任务的combiner执行之后,reduce任务把两个分区聚合之时),这里我们使用 ++= 把第二个集合的所有元素加到第一个集合里。
对于键 a,操作的顺序可能是:
(( ∅ + 3) + 1) + 5) = (1, 3, 5)
或者:
( ∅ + 3) + 1) ++ ( ∅ + 5) = (1, 3) ++ (5) = (1, 3, 5)
如果Spark使用了组合器(combiner)的话。
转换后的RDD可以持久化到内存中,因此后续的操作效率很高。