• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

SparkRDD

武飞扬头像
程序消消乐
帮助2

前置介绍:

本文的代码示例全部使用Scala语言进行编写

RDD简介

RDD 其实就是分布式的元素集合(Spark 中的 RDD 就是一个不可变的分布式对象集合和Spar运算的一个基本单位)。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有 RDD 以及调用 RDD 操作进行求值。而在这一切背后,Spark 会自动将 RDD 中的数据分发到集群上,并将操作并行化执行。

RDD创建方式

Spark支持两种模式的RDD创建方式:
1)读取一个外部数据集(最常用)
如从比如HDFS、Hive、HBase中创建RDD。
代码示例:

//准备环境
    val SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(SparkConf)
 	val rdd=sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt");

2)驱动器程序里分发驱动器程序中的对象集合(比如 list 和 set)

 val SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    val sc = new SparkContext(SparkConf)

    // 创建RDD(使用内存的方法逆行创建RDD,有两个方法)
    /**
     * 方法一:
     * val seq = Seq[Int](1, 2, 3, 4)
     * val rdd: RDD[Int] = sc.parallelize(seq)
     * 方法二:
     *  val rdd = sc.makeRDD(seq)[该方法的底层仍然是调用了parallelize()方法]
     */
    val seq=Seq[Int](1,2,3,4)
    //    val rdd: RDD[Int] = sc.parallelize(seq)
    val rdd = sc.makeRDD(seq)

在该方法二种,makeRDD其内部也是使用了parallelize方法

RDD操作

RDD 支持两种类型的操作:转化操作(transformation) 和行动操作(action)
RDD的转化操作仅仅只是对RDD进行数据的转化和处理,每次转换时,并不是对数据的执行。而行动操作才是对数据的执行

RDD操作分为转化和行动的目的和原因(重点理解):

在理解目的和原因之前,我们需要理解以下关于Spark的核心概念:
1)Spark 框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构,如Spark应用大致运行机制概图所示:图形中的 Driver 表示 master,负责管理整个集群中的作业任务调度。图形中的 Executor 则是 slave,负责实际执行任务。
2)Spark中的Diver对job执行前会进行任务划分(即stage划分)形成一个任务执行计划(即 有向无环图),然后将计划分配给Executor执行(划分的依据就是程序员自己写的
RDD转化操作
)如Spark统一运行流程概述图所示。
3) 有向无环图:
下面的图片只是一个例子:
学新通
上述图片就是一个任务计划,是Spark应用程序执行的步骤,从中可以看出,makeRDD和map操作均被划分到了一个Stage0中,而reduceByKey被单独划分到了Stage1中。其中makeRDD和map操作是转化操作,educeByKey是行动操作。该程序的执行逻辑为:
1)首先通过makeRDD创建了一个Rdd1
2)对于Rdd1进行了一个map转换操作生成了Rdd2
3)对Rdd2进行了一次根据Key的聚合,出来结果
上述中的第二步就是一个一次转换,将Rdd1转成了Rdd2
注意:
1)只有程序执行到了action算子时,才会创建有向无环图
2)有向无环图仅仅只是一个程序执行逻辑,该逻辑是用于后续任务的分配
3)关于中间stage的划分并不是单纯的按照程序员自己定义的转换顺序来进行,Spark在执行的时候,可能会对其进行优化,即可能会省略一些转化步骤。
通过上述的讲解,大家应该对于RDD的转化操作和行动操作有了一个大致的印象和概念。对于转化操作实际上就是对RDD进行封装,如上面的Rdd2说Rdd1的一次逻辑封装。在进行逻辑封装时,程序并没有对数据进行执行处理,只是进行了数据得到格式转化,生成了一个执行逻辑,用于告诉Executor如何执行,当进行到行动操作时,Executor才开始进行了实际上的数据执行。
有了上述的基础之后,我们再来看刚才提出的问题 :RDD操作分为转化和行动的目的:
举例:比如目前有ITB的数据需要进行处理,而我们的单个服务器只能容纳128G的数据,这个时候,数据需要进行划分,即对1TB的数据分割成8个数据块,分别基于8个服务器,每个服务器得到仅仅只是128G的原始数据,对于数据的处理,需要按照上述的有向无环图的逻辑进行转化操作,转化完成之后,对数据进行行动操作得到的是目标数据,然后在返回。
大家通过刚才的例子应该已经发现了,由于我们是分布式运算,需要对数据进行分割,然后分配给不同的节点进行处理,而转化的操作相当与告诉该节点,如何去做,好比一道复杂的数学题,告诉了节点做题的步骤,而行动操作就是将解题步骤串联起来进行最终的执行。
即:Spark 只会惰性计算这些 RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。
这样做的好处;
1) 通过转化操作,从已有的 RDD 中派生出新的 RDD,Spark 会使用谱系图(lineage graph)来记录这些不同 RDD 之间的依赖关系。Spark 需要用这些信息来按需计算每个 RDD,也可以依靠谱系图在持久化的 RDD 丢失部分数据时恢复所丢失的数据。
2)这样就可以把一些操作合并到一起来减少计算数据的步骤。在类似 Hadoop MapReduce 的系统中,开发者常常花费大量时间考虑如何把操作组合到一起,以减少 MapReduce 的周期数。而在 Spark 中,写出一个非常复杂的映射并不见得能比使用很多简单的连续操作获得好很多的性能。因此,用户可以用更小的操作来组织他们的程序, 这样也使这些操作更容易管理。

Spark应用大致运行机制概图

学新通
在Spark中运行应用(暂时不谈各种集群资源的管理和分配)时,主要有两个核心组件起作用。如上图所示,分别是Driver(驱动器)和Executor(执行器)。
Driver简介:
Spark 驱动器节点,用于执行 Spark 任务中的 main 方法,负责实际代码的执行工作。
Driver 在 Spark 作业执行时主要负责:

  1. 将用户程序转化为作业(Job);
  2. 在 Executor 之间调度任务(Task);
  3. 跟踪 Executor 的执行情况;
  4. 通过 UI 展示查询运行情况;
    Executor简介:
    Spark Executor 对象是负责在 Spark 作业中运行具体任务,任务彼此之间相互独立。Spark 应用启动时,ExecutorBackend 节点被同时启动,并且始终伴随着整个 Spark 应用的生命周期而存在。如果有 ExecutorBackend 节点发生了故障或崩溃,Spark 应用也可以继续执行,会将出错节点上的任务调度到其他 Executor 节点上继续运行。
    Executor 有两个核心功能:
  5. 负责运行组成 Spark 应用的任务,并将结果返回给驱动器(Driver);
  6. 它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的 RDD 提供内存式存储。RDD 是直接缓存在 Executor 进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
    两者之间的关系(现在笔者没有谈集群资源的管理,目前只是概述),大致可以总结如下。
    用户将Spark应用程序提交给Spark集群。其中的Driver负责对这个应用程序进行注册和划分,将划分好的结果(可以理解为程序的执行步骤)分给Executor执行。

Spark统一运行流程概述

学新通
这个流程是按照如下的核心步骤进行工作的:

  1. 任务提交后,都会先启动 Driver 程序;
  2. 随后 Driver 向集群管理器注册应用程序;
  3. 之后集群管理器根据此任务的配置文件分配 Executor 并启动;
  4. Driver 开始执行 main 函数,Spark 查询为懒执行,当执行到 Action 算子时开始反向推
    算,根据宽依赖进行 Stage 的划分,随后每一个 Stage 对应一个 Taskset,Taskset 中有多个 Task,查找可用资源 Executor 进行调度;
  5. 根据本地化原则,Task 会被分发到指定的 Executor 去执行,在任务执行的过程中,
    Executor 也会不断与 Driver 进行通信,报告任务运行情况

RDD转化操作

RDD 的转化操作是返回新 RDD 的操作。有上述的讲解我们可以知道 转化出来的 RDD 是惰性求值的,只有在行动操作中用到这些 RDD 时才会被计算。许多转化操作都是针对各个元素的,也就是说,这些转化操作每次只会操作 RDD 中的一个元素。
常用的转化方法:
对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD转化操作
学新通
对数据分别为{1, 2, 3}和{3, 4, 5}的RDD进行针对两个RDD的转化操作
学新通
Pair RDD的转化操作(以键值对集合{(1, 2), (3, 4), (3, 6)}为例)
学新通
学新通:针对两个pair RDD的转化操作(rdd = {(1, 2), (3, 4), (3, 6)}other = {(3, 9)})
学新通

RDD行动操作

对一个数据为{1, 2, 3, 3}的RDD进行基本的RDD行动操作

学新通
学新通
Pair RDD的行动操作(以键值对集合{(1, 2), (3, 4), (3, 6)}为例)
学新通
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgbhjce
系列文章
更多 icon
同类精品
更多 icon
继续加载