RDD是Spark的重要组件,这次来介绍了一下RDD的定义、和DSM的对比、基本操作以及优缺点。
RDD
1. 名称解释
Spark的关键抽象是RDD(Resilient Distributed Dataset),它是弹性分布式数据集的首字母缩略词。
- Resilient:在 DAG 的帮助下容错,因此能够重新计算由于节点故障而丢失或损坏的partitions。
- Distributed:数据驻留在多个节点上
- Dataset:代表用户所使用的数据,可以是json文件、txt文件或者通过jdbc链接的数据库。
2. 特点
它是Spark中数据的基本单位。基本上,它是跨群集节点的分布式元素集合,也执行并行操作。
而且,Spark RDD本质上是不可改变的。尽管它可以通过转换现有的Spark RDD来生成新的RDD。
RDD 可以被缓存和手动分区。缓存可以帮助我们提高多次处理RDD的效率,较小的分区可以帮助任务负载均衡。
3. 为什么需要RDD
RDD概念背后的主要动机是:
- 迭代算法
- 交互式数据挖掘工具
- DSM(Distributed Shared Memory)是一种非常普遍的抽象,但这种普遍性使得在商品集群上很难以高效且容错的方式实现。
- 在分布式计算系统中,数据存储在中间稳定的分布式存储中,例如HDFS或Amazon S3。这会使作业的计算速度变慢,因为它涉及许多IO操作,复制和序列化过程。
为了有效地实现容错,RDD基于粗粒度转换提供限制形式的共享内存,而不是对共享状态进行细粒度更新。
和DSM的对比
DSM(Distributed Shared Memory)分布式共享内存,它是一种通用的内存数据抽象。在DSM中,应用可以向全局地址空间的任意位置进行读写操作。
1. Read
- RDD:粗粒度
- DSM:细粒度
2. Write
- RDD:粗粒度
- DSM:细粒度
3. 一致性
- RDD:天生是不变的
- DSM:如果程序员遵守规则,内存将是一致的
4. 故障恢复机制
- RDD:在任何时候丢失Spark RDD中的数据,都可以使用谱系图轻松地恢复。由于每次转换都会形成新的RDD,并且RDD本质上是不可变的,所以很容易恢复。
- DSM:容错能力通过检查点技术来实现,该技术允许应用程序回滚到最近的检查点而不是重新启动。
5. 落后节点缓解
有时,一些节点比起同期启动节点,处理很慢。
- RDD:使用备用任务缓解
- DSM:相当困难
6. 缺少内存后的行为
- RDD:如果没有足够的空间将RDD存储在RAM中,则将RDD转移到磁盘。
- DSM:在这种类型的系统中,如果RAM的存储空间不足,性能会下降。
创建 RDD
基本上,有三种方法创建RDD。
1. 并行集合
通过在驱动程序中调用并行化方法,我们可以创建并行化的集合。
val data=spark.sparkContext.parallelize(Seq(("maths",52),("english",75),("science",82), ("computer",65),("maths",85)))
val sorted = data.sortByKey()
sorted.foreach(println)
2. 外部数据集
可以通过调用textFile方法来创建Spark RDD。因此,这种方法会获取文件的URL并将其作为行的集合读取。
-
csv
import org.apache.spark.sql.SparkSession def main(args: Array[String]):Unit = { object DataFormat { val spark = SparkSession.builder.appName("AvgAnsTime").master("local").getOrCreate() val dataRDD = spark.read.csv("path/of/csv/file").rdd } }
-
json
val dataRDD = spark.read.json("path/of/json/file").rdd
-
textFile
val dataRDD = spark.read.textFile("path/of/text/file").rdd
3. 现有的RDD
可以通过对现有的RDD进行转换操作(transformation),在spark中创建新的RDD。
val words=spark.sparkContext.parallelize(Seq("the", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog"))
val wordPair = words.map(w => (w.charAt(0), w))
wordPair.foreach(println)
对RDD的操作
有两种类型的操作:Transformation 和 Action。
常用函数可以参考这里.
1. Transformation
它从现有的创建一个新的Spark RDD。它接收RDD,返回一个或者多个新的RDD,并且不更改输入RDD。
Transformation 是个lazy操作。只有在执行Action动作时,才会真正进行 transformation。
某些转换可以是流水线处理,这是一种优化方法,Spark用于提高计算性能。
1)窄转换
输出RDD的分区记录,仅来自输入RDD中的单个分区。它也被叫做流水线。
2)宽转换
在输出RDD中单个分区的记录所需的数据,可能存在于父RDD的许多分区中。它也被叫做shuffle转换。
2. Action
在Apache Spark中,Action将最终结果返回给驱动程序或将其写入外部数据存储。
它使用谱系图触发执行,将数据加载到原始RDD中,执行所有中间转换并将最终结果返回到Driver程序或将其写入到文件系统。
First(),take(),reduce(),collect(),count()是spark中的一些Actions。
RDD优缺点
1. 优点
- 内存中计算
- Lazy Evaluation
- 容错
- 不变性
- 持续性
- Partitioning
- 并行
- 本地粘性
- 粗粒度操作
- 类型化
- 没有限制
2. 缺点
- 没有内置的优化引擎
- 处理结构化数据
- 性能限制
- 存储限制
持久RDD的存储级别
1. MEMORY_ONLY
RDD作为反序列化Java对象存储在JVM中。
如果RDD的大小大于内存,它不会缓存某些分区,并且在下次需要它们时,重新计算它们。
在这个级别中,用于存储的空间非常大,CPU计算时间很短,数据存储在内存中。它不使用磁盘。
2. MEMORY_AND_DISK
在此级别中,RDD作为反序列化Java对象存储在JVM中。
当RDD的大小大于内存大小时,它将多余的分区存储在磁盘上,并在需要时从磁盘中检索。
在这个级别中,用于存储的空间很大,CPU计算时间中等,它使用内存和磁盘存储。
3. MEMORY_ONLY_SER
此级别的Spark将RDD存储为序列化的Java对象(每个分区一个字节的阵列)。
与反序列化的对象相比,它更节省空间,尤其是在使用快速序列化器的情况下。但它增加了CPU的开销。
在这个级别中,存储空间很小,CPU计算时间很长,数据存储在内存中。它不使用磁盘。
4. MEMORY_AND_DISK_SER
它类似于MEMORY_ONLY_SER,但它将内存放不下的分区放入磁盘,而不是每次需要时重新计算。
在此存储级别中,用于存储的空间很少,CPU计算时间很长,它使用内存和磁盘存储。
5. DISK_ONLY
在此存储级别中,RDD仅存储在磁盘上。用于存储的空间很小,CPU计算时间很长,并且使用磁盘存储。