Walt You - 知行合一

Spark中RDD的介绍

2018-06-19

RDD是Spark的重要组件,这次来介绍了一下RDD的定义、和DSM的对比、基本操作以及优缺点。



RDD

1. 名称解释

Spark的关键抽象是RDD(Resilient Distributed Dataset),它是弹性分布式数据集的首字母缩略词。

  • Resilient:在 DAG 的帮助下容错,因此能够重新计算由于节点故障而丢失或损坏的partitions。
  • Distributed:数据驻留在多个节点上
  • Dataset:代表用户所使用的数据,可以是json文件、txt文件或者通过jdbc链接的数据库。

2. 特点

它是Spark中数据的基本单位。基本上,它是跨群集节点的分布式元素集合,也执行并行操作。

而且,Spark RDD本质上是不可改变的。尽管它可以通过转换现有的Spark RDD来生成新的RDD。

RDD 可以被缓存和手动分区。缓存可以帮助我们提高多次处理RDD的效率,较小的分区可以帮助任务负载均衡。

3. 为什么需要RDD

RDD概念背后的主要动机是:

  • 迭代算法
  • 交互式数据挖掘工具
  • DSM(Distributed Shared Memory)是一种非常普遍的抽象,但这种普遍性使得在商品集群上很难以高效且容错的方式实现。
  • 在分布式计算系统中,数据存储在中间稳定的分布式存储中,例如HDFS或Amazon S3。这会使作业的计算速度变慢,因为它涉及许多IO操作,复制和序列化过程。

为了有效地实现容错,RDD基于粗粒度转换提供限制形式的共享内存,而不是对共享状态进行细粒度更新。


和DSM的对比

DSM(Distributed Shared Memory)分布式共享内存,它是一种通用的内存数据抽象。在DSM中,应用可以向全局地址空间的任意位置进行读写操作。

1. Read

  • RDD:粗粒度
  • DSM:细粒度

2. Write

  • RDD:粗粒度
  • DSM:细粒度

3. 一致性

  • RDD:天生是不变的
  • DSM:如果程序员遵守规则,内存将是一致的

4. 故障恢复机制

  • RDD:在任何时候丢失Spark RDD中的数据,都可以使用谱系图轻松地恢复。由于每次转换都会形成新的RDD,并且RDD本质上是不可变的,所以很容易恢复。
  • DSM:容错能力通过检查点技术来实现,该技术允许应用程序回滚到最近的检查点而不是重新启动。

5. 落后节点缓解

有时,一些节点比起同期启动节点,处理很慢。

  • RDD:使用备用任务缓解
  • DSM:相当困难

6. 缺少内存后的行为

  • RDD:如果没有足够的空间将RDD存储在RAM中,则将RDD转移到磁盘。
  • DSM:在这种类型的系统中,如果RAM的存储空间不足,性能会下降。

创建 RDD

基本上,有三种方法创建RDD。

1. 并行集合

通过在驱动程序中调用并行化方法,我们可以创建并行化的集合。

val data=spark.sparkContext.parallelize(Seq(("maths",52),("english",75),("science",82), ("computer",65),("maths",85)))
val sorted = data.sortByKey()
sorted.foreach(println)

2. 外部数据集

可以通过调用textFile方法来创建Spark RDD。因此,这种方法会获取文件的URL并将其作为行的集合读取。

  1. csv

     import org.apache.spark.sql.SparkSession
    
     def main(args: Array[String]):Unit = {
         object DataFormat {
             val spark =  SparkSession.builder.appName("AvgAnsTime").master("local").getOrCreate()
             val dataRDD = spark.read.csv("path/of/csv/file").rdd
         }
     }
    
  2. json

     val dataRDD = spark.read.json("path/of/json/file").rdd
    
  3. textFile

     val dataRDD = spark.read.textFile("path/of/text/file").rdd
    

3. 现有的RDD

可以通过对现有的RDD进行转换操作(transformation),在spark中创建新的RDD。

val words=spark.sparkContext.parallelize(Seq("the", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog"))
val wordPair = words.map(w => (w.charAt(0), w))
wordPair.foreach(println)

对RDD的操作

有两种类型的操作:Transformation 和 Action。

常用函数可以参考这里.

1. Transformation

它从现有的创建一个新的Spark RDD。它接收RDD,返回一个或者多个新的RDD,并且不更改输入RDD。

Transformation 是个lazy操作。只有在执行Action动作时,才会真正进行 transformation。

某些转换可以是流水线处理,这是一种优化方法,Spark用于提高计算性能。

1)窄转换

输出RDD的分区记录,仅来自输入RDD中的单个分区。它也被叫做流水线。

2)宽转换

在输出RDD中单个分区的记录所需的数据,可能存在于父RDD的许多分区中。它也被叫做shuffle转换。

2. Action

在Apache Spark中,Action将最终结果返回给驱动程序或将其写入外部数据存储。

它使用谱系图触发执行,将数据加载到原始RDD中,执行所有中间转换并将最终结果返回到Driver程序或将其写入到文件系统。

First(),take(),reduce(),collect(),count()是spark中的一些Actions。


RDD优缺点

1. 优点

  1. 内存中计算
  2. Lazy Evaluation
  3. 容错
  4. 不变性
  5. 持续性
  6. Partitioning
  7. 并行
  8. 本地粘性
  9. 粗粒度操作
  10. 类型化
  11. 没有限制

2. 缺点

  1. 没有内置的优化引擎
  2. 处理结构化数据
  3. 性能限制
  4. 存储限制

持久RDD的存储级别

1. MEMORY_ONLY

RDD作为反序列化Java对象存储在JVM中。

如果RDD的大小大于内存,它不会缓存某些分区,并且在下次需要它们时,重新计算它们。

在这个级别中,用于存储的空间非常大,CPU计算时间很短,数据存储在内存中。它不使用磁盘。

2. MEMORY_AND_DISK

在此级别中,RDD作为反序列化Java对象存储在JVM中。

当RDD的大小大于内存大小时,它将多余的分区存储在磁盘上,并在需要时从磁盘中检索。

在这个级别中,用于存储的空间很大,CPU计算时间中等,它使用内存和磁盘存储。

3. MEMORY_ONLY_SER

此级别的Spark将RDD存储为序列化的Java对象(每个分区一个字节的阵列)。

与反序列化的对象相比,它更节省空间,尤其是在使用快速序列化器的情况下。但它增加了CPU的开销。

在这个级别中,存储空间很小,CPU计算时间很长,数据存储在内存中。它不使用磁盘。

4. MEMORY_AND_DISK_SER

它类似于MEMORY_ONLY_SER,但它将内存放不下的分区放入磁盘,而不是每次需要时重新计算。

在此存储级别中,用于存储的空间很少,CPU计算时间很长,它使用内存和磁盘存储。

5. DISK_ONLY

在此存储级别中,RDD仅存储在磁盘上。用于存储的空间很小,CPU计算时间很长,并且使用磁盘存储。


参考链接

  1. Spark RDD – Introduction, Features & Operations of RDD

Similar Posts

Content