本文是对[Spark Guides]的要点总结,会持续更新。
Resilient Distributed Datasets (RDDs)
可并行、容错的元素集合,建立RDDs的两种方式:
在driver program中并行化已存在的数据集合
使用SparkContext的parallelize将一个scala seq转化为并行集合。该方法的一个重要参数是数据集的分区数(partitions number)。Spark会为每个partition起一个task。通常,每个CPU分配2-4 partitions。一般地,Spark会根据集群自动设置partitions数,当然你也可以手动设置(e.g. sc.parallelize(data, 10))读取外部数据集
Spark可以从来自于任何支持Hadoop的存储来源建立为分布式数据集,包括你的本地文件系统、HDFS、Cassandra、HBase等。输入格式支持text files, SequenceFiles, 和其他任何Hadoop InputFormat。注意:
- Spark所有文件输入方法支持目录、压缩文件和正则匹配
- Spark一般为文件每个block创建一个分区(HDFS默认128M)。注意partitions数不能小于blocks数
RDD操作
RDDs支持两种类型的操作:
- transformations 转换
基于已有数据集建立新的数据集 - actions 动作
在数据集上执行计算后,返回给driver program一个值
注意,spark所有的transformations都是惰性(lazy)的,即它们并不立即计算结果。它们仅记忆应用到数据集上的transformations,直到一个要求返回结果的action执行时,才会开始计算,这种设计使得spark更高效。
默认的,transformed RDD在每次跑一个action时都会重新计算。但是,你可以使用persist (or cache) 方法把一个RDD持久化到内存中,在这种情况下,下次再访问该RDD时会非常快。当然,也支持硬盘持久化,或者在多个结点复制。
理解闭包(closures)
为了执行任务,Spark会把RDD操作的过程拆分成tasks,每一个任务被一个executor执行。在执行前,Spark会计算任务的闭包。closure指的是那些必须对executor可见的变量和方法,以用来执行在RDD上的计算。closure会被序列化然后送到每个executor。
为了确保well-defined的行为,应当使用Accumulator。Accumulators是当execution被拆分到集群工作节点间时,特意用来提供安全更新变量的机制。
Transformations
- map(func)
- filter(func)
- flatMap(func)
- mapPartitions(func)
- mapPartitionsWithIndex(func)
- sample(withReplacement, fraction, seed)
- union(otherDataset)
- intersection(otherDataset)
- distinct([numTasks]))
- groupByKey([numTasks])
- reduceByKey(func, [numTasks])
- aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]
- sortByKey([ascending], [numTasks])
- join(otherDataset, [numTasks])
- cogroup(otherDataset, [numTasks])
- cartesian(otherDataset)
- pipe(command, [envVars])
- coalesce(numPartitions)
- repartition(numPartitions)
- repartitionAndSortWithinPartitions(partitioner)
Actions
- reduce(func)
- collect()
- count()
- first()
- take(n)
- takeSample(withReplacement, num, [seed])
- takeOrdered(n, [ordering])
- saveAsTextFile(path)
- saveAsSequenceFile(path)
- saveAsObjectFile(path)
- countByKey()
- foreach(func)
Shuffle operations
shuffle用于重分布(re-distributing)数据,以便数据在分区之间被不同地分组(grouped differently across partitions)。这一般会导致在executors和机器间复制,使得shuffle成为复杂、costly的操作。
在Spark中,数据一般不会跨区分布,以便对于特定操作能够处在必要的位置。因此,对于reduceByKey操作,Spark需要执行一次all-to-all操作。它必须从所有分区读取,以查找所有键的所有values,然后将分区的值汇总在一起以计算每个键的最终结果—这就是所谓的shuffle。
触发shuffle的操作有
- repartition操作如 repartition 和 coalesce
- ByKey操作如 groupByKey 和 reduceByKey
- join操作如 cogroup 和 join
RDD持久化
在操作间持久化一个数据集是Spark的重要特性。当你持久化一个RDD,每个节点都会存储它计算的任何分区至memory中,在此数据集上(或从它衍生的数据集)的actions中都会重利用之。这使得之后的actions变得更快(通常多于10X提速)。Caching是迭代算法和快速交互式应用(fast interactive use)的重要工具。
可以使用 persist() 或者 cache() 方法进行持久化。当它在一次action中第一次计算出来后,它就会保存在节点memory中。Spark的cache是容错的,如果一个RDD的任何分区丢失后,它会自动重计算之,通过使用原来建立它的transformations。
此外,有不同的持久化等级:
- MEMORY_ONLY
- MEMORY_AND_DISK
- MEMORY_ONLY_SER
- MEMORY_AND_DISK_SER
- DISK_ONLY
- MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc
- OFF_HEAP (experimental)
如何选择持久化等级
这是关于memory使用状况和CPU效率间的trade-off,建议通过以下过程来选择一个:
如果你的RDDs在默认存储等级 (MEMORY_ONLY)下fit comfortably,就保持这样的设定。这种是最高CPU效率的选择。
如果不是,试着使用MEMORY_ONLY_SER并选择一个快速的序列化包来使得对象们存储更空间高效(space-efficient)
除非计算你的数据集的函数集(functions)非常expensive,或者它们过滤了大量数据,否则不要将之溢出到磁盘(spill to disk)。除此以外,重计算一个分区和把它从磁盘读取速度是一样的。
- 如果你想拥有快速的错误恢复(fault recovery),使用冗余(replicated)存储级别。所有的存储级别通过重计算损失的数据来提供容错,但是冗余备份使得你可以继续在RDDs上运行,而不需要等待重新计算丢失的分区。
Shared Variables(共享变量)
Broadcast Variables 与 Accumulators