Spark Guides

本文是对[Spark Guides]的要点总结,会持续更新。

Resilient Distributed Datasets (RDDs)

可并行、容错的元素集合,建立RDDs的两种方式:

  • 在driver program中并行化已存在的数据集合
    使用SparkContext的parallelize将一个scala seq转化为并行集合。该方法的一个重要参数是数据集的分区数(partitions number)。Spark会为每个partition起一个task。通常,每个CPU分配2-4 partitions。一般地,Spark会根据集群自动设置partitions数,当然你也可以手动设置(e.g. sc.parallelize(data, 10))

  • 读取外部数据集
    Spark可以从来自于任何支持Hadoop的存储来源建立为分布式数据集,包括你的本地文件系统、HDFS、Cassandra、HBase等。输入格式支持text files, SequenceFiles, 和其他任何Hadoop InputFormat。

    注意:

    • Spark所有文件输入方法支持目录、压缩文件和正则匹配
    • Spark一般为文件每个block创建一个分区(HDFS默认128M)。注意partitions数不能小于blocks数

RDD操作

RDDs支持两种类型的操作:

  • transformations 转换
    基于已有数据集建立新的数据集
  • actions 动作
    在数据集上执行计算后,返回给driver program一个值

注意,spark所有的transformations都是惰性(lazy)的,即它们并不立即计算结果。它们仅记忆应用到数据集上的transformations,直到一个要求返回结果的action执行时,才会开始计算,这种设计使得spark更高效。

默认的,transformed RDD在每次跑一个action时都会重新计算。但是,你可以使用persist (or cache) 方法把一个RDD持久化到内存中,在这种情况下,下次再访问该RDD时会非常快。当然,也支持硬盘持久化,或者在多个结点复制。

理解闭包(closures)

为了执行任务,Spark会把RDD操作的过程拆分成tasks,每一个任务被一个executor执行。在执行前,Spark会计算任务的闭包。closure指的是那些必须对executor可见的变量和方法,以用来执行在RDD上的计算。closure会被序列化然后送到每个executor。

为了确保well-defined的行为,应当使用Accumulator。Accumulators是当execution被拆分到集群工作节点间时,特意用来提供安全更新变量的机制。

Transformations

  • map(func)
  • filter(func)
  • flatMap(func)
  • mapPartitions(func)
  • mapPartitionsWithIndex(func)
  • sample(withReplacement, fraction, seed)
  • union(otherDataset)
  • intersection(otherDataset)
  • distinct([numTasks]))
  • groupByKey([numTasks])
  • reduceByKey(func, [numTasks])
  • aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]
  • sortByKey([ascending], [numTasks])
  • join(otherDataset, [numTasks])
  • cogroup(otherDataset, [numTasks])
  • cartesian(otherDataset)
  • pipe(command, [envVars])
  • coalesce(numPartitions)
  • repartition(numPartitions)
  • repartitionAndSortWithinPartitions(partitioner)

Actions

  • reduce(func)
  • collect()
  • count()
  • first()
  • take(n)
  • takeSample(withReplacement, num, [seed])
  • takeOrdered(n, [ordering])
  • saveAsTextFile(path)
  • saveAsSequenceFile(path)
  • saveAsObjectFile(path)
  • countByKey()
  • foreach(func)

Shuffle operations

shuffle用于重分布(re-distributing)数据,以便数据在分区之间被不同地分组(grouped differently across partitions)。这一般会导致在executors和机器间复制,使得shuffle成为复杂、costly的操作。

在Spark中,数据一般不会跨区分布,以便对于特定操作能够处在必要的位置。因此,对于reduceByKey操作,Spark需要执行一次all-to-all操作。它必须从所有分区读取,以查找所有键的所有values,然后将分区的值汇总在一起以计算每个键的最终结果—这就是所谓的shuffle。

触发shuffle的操作有

  • repartition操作如 repartition 和 coalesce
  • ByKey操作如 groupByKey 和 reduceByKey
  • join操作如 cogroup 和 join

RDD持久化

在操作间持久化一个数据集是Spark的重要特性。当你持久化一个RDD,每个节点都会存储它计算的任何分区至memory中,在此数据集上(或从它衍生的数据集)的actions中都会重利用之。这使得之后的actions变得更快(通常多于10X提速)。Caching是迭代算法和快速交互式应用(fast interactive use)的重要工具。

可以使用 persist() 或者 cache() 方法进行持久化。当它在一次action中第一次计算出来后,它就会保存在节点memory中。Spark的cache是容错的,如果一个RDD的任何分区丢失后,它会自动重计算之,通过使用原来建立它的transformations。

此外,有不同的持久化等级:

  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK_SER
  • DISK_ONLY
  • MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc
  • OFF_HEAP (experimental)

如何选择持久化等级

这是关于memory使用状况和CPU效率间的trade-off,建议通过以下过程来选择一个:

  • 如果你的RDDs在默认存储等级 (MEMORY_ONLY)下fit comfortably,就保持这样的设定。这种是最高CPU效率的选择。

  • 如果不是,试着使用MEMORY_ONLY_SER并选择一个快速的序列化包来使得对象们存储更空间高效(space-efficient)

  • 除非计算你的数据集的函数集(functions)非常expensive,或者它们过滤了大量数据,否则不要将之溢出到磁盘(spill to disk)。除此以外,重计算一个分区和把它从磁盘读取速度是一样的。

  • 如果你想拥有快速的错误恢复(fault recovery),使用冗余(replicated)存储级别。所有的存储级别通过重计算损失的数据来提供容错,但是冗余备份使得你可以继续在RDDs上运行,而不需要等待重新计算丢失的分区。

Shared Variables(共享变量)

Broadcast Variables 与 Accumulators