• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Spark入门

武飞扬头像
白小脑电
帮助2

一.简介

        Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎

        (1)官网地址:Apache Spark™ - Unified Engine for large-scale data analytics

        (2)文档查看地址:Overview - Spark 3.0.0 Documentation

        (3)下载地址:Downloads | Apache Spark

                https://archive.apache.org/dist/spark/

1.Hadoop与Spark框架对比

学新通

 2.Spark目前支持的部署模式

(1)Local模式:在本地部署单个Spark服务

(2)Standalone模式:Spark自带的任务调度模式。

(3)YARN模式:Spark使用Hadoop的YARN组件进行资源与任务调度。

(4)Mesos模式:Spark使用Mesos平台进行资源与任务的调度。

3.常用端口号

  (1)Spark查看当前Spark-shell运行任务情况端口号:4040

  (2)Spark Master内部通信服务端口号:7077 (类比于yarn的8032(RM和NM的内部通信)端口)

  (3)Spark Standalone模式Master Web端口号:8080(类比于Hadoop YARN任务运行情况查看端口号:8088)

  (4)Spark历史服务器端口号:18080 (类比于Hadoop历史服务器端口号:19888)

4.集群角色简单介绍

     1.Master和Worker集群资源管理

        Master和Worker是Spark的守护进程、集群资源管理者,即Spark在特定模式(Standalone)下正常运行必须要有的后台常驻进程。

        Master:Spark特有资源调度系统的Leader,掌管着整个集群的资源信息,类似于Yarn框架中的ResourceManager。

        Worker:Spark特有资源调度系统的Slave,有多个,每个Slave掌管着所在节点的资源信息,类似于Yarn框架中的NodeManager。

    2.Driver和Executor任务的管理者

        Driver和Executor是临时程序,当有具体任务提交到Spark集群才会开启的程序。

        Driver:执行程序时Spark Shell中会预加载一个叫做sc的SparkContext对象,Driver此时的作用:①把用户程序转为作业(job) ②跟踪Executor的任务运行状况 ③为执行器节点调度任务 ④WebUI界面展示应用运行情况。

        Executor:负责执行Spark的具体任务。

二.Yarn模式的基本使用

        Spark客户端直接连接Yarn,不需要额外构建Spark集群。

1.运行流程     

    Spark有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。

    yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出。

    yarn-cluster:Driver程序运行在由ResourceManager启动的APPMaster。

yarn-client运行模式介绍

学新通

yarn-cluster运行模式介绍

学新通

spark的几种模式对比:

模式

Spark安装机器数

需启动的进程

所属者

Local

1

Spark

Standalone

3

Master及Worker

Spark

Yarn

1

Yarn及HDFS

Hadoop

三.RDD

 1.RDD概述       

   RDD(Resilient Distributed Dataset)弹性分布式数据集,是Spark中最基本的数据抽象。它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

弹性:①存储的弹性:内存与磁盘的自动切换②容错的弹性:数据丢失可以自动恢复③计算的弹性:计算出错重试机制④分片的弹性:可根据需要重新分片

分布式:数据存储在大数据集群不同节点上

数据集:封装了计算逻辑,但不保存数据

数据抽象:代码中是一个抽象类(abstract class RDD),需要子类具体实现

不可变:封装了具体的计算逻辑,是不可以改变的,想要改变只能生成新的RDD进行封装

 注意:在Spark中只有遇到action等行动算子才会执行RDD的计算(延迟计算),所有RDD算子相关操作都在Executor端执行,RDD算子之外的操作都在Driver端执行。

五大特性:

(1)一组分区,是数据集的基本组成单位,标记数据是哪个分区的

        protect def getPartitions:Array[Partition]

(2)一个计算每个分区的函数

        def compute(split:Partition,context TaskContext):Interator[T]

(3)RDD之间的依赖关系

        protected def getDependencies:Seq[Dependency[ _ ]]=deps

(4)一个Partitioner,RDD的分片函数,控制分区的数据流向(键值对)

        val partitioner:scala.Option[org.apache.spark.Partitioner]

(5)一个列表,存储存取每个Partition的优先位置

        protected def getPreferredLocations(split:Partition):scala.Seq[String]

2.RDD编程

2.1RDD的创建

        RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。

2.1.1从集合中创建

        主要使用两种函数:parallelize和makeRDD

  1.  
    def main(args: Array[String]): Unit = {
  2.  
     
  3.  
    //1.创建SparkConf并设置App名称
  4.  
    val conf: SparkConf = new
  5.  
    SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  6.  
     
  7.  
    //2.创建SparkContext,该对象是提交Spark App的入口
  8.  
    val sc: SparkContext = new SparkContext(conf)
  9.  
     
  10.  
    //3.使用parallelize()创建rdd
  11.  
    val rdd: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8))
  12.  
     
  13.  
    rdd.collect().foreach(println)
  14.  
     
  15.  
    //4.使用makeRDD()创建rdd
  16.  
    val rdd1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8))
  17.  
     
  18.  
    rdd1.collect().foreach(println)
  19.  
     
  20.  
    sc.stop()
  21.  
    }
学新通

2.1.2从外部存储创建

        由外部存储系统的数据集创建RDD包括:本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、HBase等。

  1.  
    def main(args: Array[String]): Unit = {
  2.  
     
  3.  
    //1.创建SparkConf并设置App名称
  4.  
    val conf: SparkConf = new
  5.  
    SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  6.  
     
  7.  
    //2.创建SparkContext,该对象是提交Spark App的入口
  8.  
    val sc: SparkContext = new SparkContext(conf)
  9.  
     
  10.  
    //3.读取文件。此处举例集群路径:hdfs://hadoop102:8020/input
  11.  
    val lineWordRdd: RDD[String] = sc.textFile("input")
  12.  
     
  13.  
    //4.打印
  14.  
    lineWordRdd.foreach(println)
  15.  
     
  16.  
    //5.关闭
  17.  
    sc.stop()
  18.  
    }
学新通

2.1.3从其他RDD创建

2.2分区规则

2.2.1数据从集合中创建

分区数量:不手动填写分区数的情况下,默认分区数跟本地模式的cpu核数有关

local : 1个   local[*] : 笔记本所有核心数    local[K]:K个

数据分区规则:整数除,多余的数放到后面

例:List(1,2,3,4,5),设置了2个分区,0号分区数据为(1,2);1号分区数据为(3,4,5)

左闭右开

分区的开始位置 = 分区号 * 数据总长度/分区总数

分区的结束位置 =(分区号 1)* 数据总长度/分区总数

例:List(1,2,3,4,5),设置了3个分区,最终0号分区数据为(1);1号分区数据为(2,3);2号分区数据为(4,5)

2.2.1数据从文件中创建

分区数量:默认填写数值 math.min(defaultParallelism,2)

环境的核数和2比较取最小值,一般为2,但最终分区数不一定是2

最终分区切分规则:

计算文件长度除以填写的分区数:

goalSize = length / partitions

splitSize = Math.max(1,Math.min(goalSize,128m))

拿splitSize按照1.1倍切分规则去切分文件,得到最终的分区数.

例:设一个文件长度为10,手动设置分区数为3,goalSize=10/3=3

splitSize=3  分区数=10(length)/3(splitSize) =>3,3,4    4大于3的1.1倍,符合hadoop切片1.1倍的策略,因此会多创建一个分区,即一共有4个分区  3,3,3,1
数据分区规则:

Spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,跟字节数没有关系,数据读取位置计算是以偏移量为单位来进行计算的。

如果切分大小的时候,出现在一行的中间,默认将这整行的数据放到前一个分区中。

2.3Transformation转换算子

RDD整体上分为Value类型、双Value类型和Key-Value类型

2.3.1 Value类型

2.3.1.1 map()映射

(1)函数签名:def map[U: Class Tag](f : T => U): RDD[U]

(2)功能说明:参数f是一个函数,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。这个新RDD中的每一个元素都是由原来RDD中的元素依次应用f函数而得到的。

例:

  1.  
     
  2.  
    def main(args: Array[String]): Unit = {
  3.  
     
  4.  
    //1.创建SparkConf并设置App名称
  5.  
    val conf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
  6.  
     
  7.  
    //2.创建SparkContext,该对象是提交Spark App的入口
  8.  
    val sc = new SparkContext(conf)
  9.  
     
  10.  
    //3.具体业务逻辑
  11.  
    // 3.1 创建一个RDD
  12.  
    val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2)
  13.  
     
  14.  
    // 3.2 调用map方法,每个元素乘以2
  15.  
    val mapRdd: RDD[Int] = rdd.map(_ * 2)
  16.  
     
  17.  
    // 3.3 打印修改后的RDD中数据
  18.  
    mapRdd.collect().foreach(println)
  19.  
     
  20.  
    //4.关闭连接
  21.  
    sc.stop()
  22.  
    }
学新通

2.3.1.2 mapPartitions()以分区为单位执行Map

(1)函数签名:def mapPartitions[U : ClassTag](

f : iterator[T] => iterator[U],                             //f函数把每一个分区的数据分别放入到迭代器中,进行批处理

preservesPartitioning:Boolean = false) : RDD[U]  //是否保留上游RDD的分区信息,默认false

(2)功能说明:mapPartitions一次处理一个分区数据

2.3.1.3 map()和mapPartitions()区别

map():每次处理一条数据,一条数据调用一次函数

mapPartitions():每次处理一个分区的数据,一个分区调用一次函数,效率更高,但这个分区的数据处理完后原RDD分区的数据才能释放

2.3.1.4 mapPartitionsWithIndex()带分区号

1)函数签名:

      def mapPartitionsWithIndex[U: ClassTag](

      f: (Int, Iterator[T]) => Iterator[U],                     // Int表示分区编号

      preservesPartitioning: Boolean = false): RDD[U]

2)功能说明:类似于mapPartitions,比mapPartitions多一个整数参数表示分区号

2.3.1.5 flatMap()扁平化

1)函数签名:def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

2)功能说明:与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。

区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。

  1.  
    def main(args: Array[String]): Unit = {
  2.  
    //1.创建SparkConf并设置APP名称
  3.  
    val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
  4.  
     
  5.  
    //2.创建SparkContext对象
  6.  
    val sc = new SparkContext(conf)
  7.  
     
  8.  
    //3.创建RDD
  9.  
     
  10.  
    val arrayRDD: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4)))
  11.  
     
  12.  
    val result: RDD[Int] = arrayRDD.flatMap(list => list)
  13.  
     
  14.  
    result.collect().foreach(println)
  15.  
     
  16.  
     
  17.  
    //4.关闭sc
  18.  
    sc.stop()
  19.  
    }
学新通

输出结果:

  1.  
    1
  2.  
    2
  3.  
    3
  4.  
    4

2.3.1.6 groupBy()分组

(1)函数签名:def groupBy[K](f:T => K)(implicit kt : ClassTag[K]) : RDD[(k,iterable[T])]

(2)功能说明:按照传入函数的返回值进行分组,将相同key的对应值放入一个迭代器。存在shuffle过程。

  1.  
    def main(args: Array[String]): Unit = {
  2.  
    //1.创建SparkConf并设置APP名称
  3.  
    val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
  4.  
     
  5.  
    //2.创建SparkContext对象
  6.  
    val sc = new SparkContext(conf)
  7.  
     
  8.  
    //3.创建RDD
  9.  
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
  10.  
    //将数据按奇偶分开
  11.  
    val result: RDD[(Int, Iterable[Int])] = rdd1.groupBy(i => i % 2)
  12.  
     
  13.  
    result.collect().foreach(println)
  14.  
     
  15.  
    //4.关闭sc
  16.  
    sc.stop()
  17.  
    }
学新通

输出结果:

  1.  
    (0,CompactBuffer(2, 4, 6))
  2.  
    (1,CompactBuffer(1, 3, 5))

2.3.1.7 filter()过滤

(1)函数签名: def filter(f: T => Boolean): RDD[T]

(2)功能说明:接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。

  1.  
    def main(args: Array[String]): Unit = {
  2.  
    //1.创建SparkConf并设置APP名称
  3.  
    val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
  4.  
     
  5.  
    //2.创建SparkContext对象
  6.  
    val sc = new SparkContext(conf)
  7.  
     
  8.  
    //3.创建RDD
  9.  
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
  10.  
     
  11.  
    //过滤出为偶数的数据
  12.  
    val result: RDD[Int] = rdd1.filter(i => i % 2 == 0)
  13.  
     
  14.  
    result.collect().foreach(println)
  15.  
     
  16.  
    //4.关闭sc
  17.  
    sc.stop()
  18.  
    }
学新通

输出结果:

  1.  
    2
  2.  
    4
  3.  
    6

2.3.1.8 distinct()去重

(1)函数签名:def distinct():RDD[T]           //默认情况下,distinct会生成与原RDD分区个数一致的分区数

def distinct(num Partitions:Int)(implict ord:Ordering[T]=null):RDD[T]      //可以去重后修改分区个数

(2)功能说明:对内部的元素去重,并将去重后的元素放到新的RDD中。存在shuffle过程。是分布式的,相比HashSet集合方式不容易OOM。

  1.  
    def main(args: Array[String]): Unit = {
  2.  
    //1.创建SparkConf并设置APP名称
  3.  
    val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
  4.  
     
  5.  
    //2.创建SparkContext对象
  6.  
    val sc = new SparkContext(conf)
  7.  
     
  8.  
    //3.创建RDD
  9.  
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 3, 4, 3, 1, 4, 3, 7),2)
  10.  
     
  11.  
    val result: RDD[Int] = rdd1.distinct(3)
  12.  
     
  13.  
    result.mapPartitionsWithIndex(
  14.  
    (index,list) => list.map((index,_))).collect().foreach(println)
  15.  
     
  16.  
    //4.关闭sc
  17.  
    sc.stop()
  18.  
    }
学新通

输出结果:

  1.  
    (0,3)
  2.  
    (1,4)
  3.  
    (1,1)
  4.  
    (1,7)

2.3.1.9 coalesce()合并分区

Coalesce算子包括:配置执行Shuffle和配置不执行Shuffle两种方式。

(1)函数签名:

def coalesce(numPartitions: Int, shuffle: Boolean = false,        //默认false不执行shuffle

    partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null) : RDD[T]

(2)功能说明:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

1.不执行shuffle方式

  1.  
    def main(args: Array[String]): Unit = {
  2.  
    //1.创建SparkConf并设置APP名称
  3.  
    val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
  4.  
     
  5.  
    //2.创建SparkContext对象
  6.  
    val sc = new SparkContext(conf)
  7.  
     
  8.  
    //3.创建RDD
  9.  
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 4)
  10.  
     
  11.  
    //缩减分区
  12.  
    val result: RDD[Int] = rdd1.coalesce(2)
  13.  
     
  14.  
    result.mapPartitionsWithIndex(
  15.  
    (index,list) => list.map((index,_))).collect().foreach(println)
  16.  
     
  17.  
    //4.关闭sc
  18.  
    sc.stop()
  19.  
    }
学新通

输出结果:

  1.  
    (0,1)
  2.  
    (0,2)
  3.  
    (1,3)
  4.  
    (1,4)

2.执行shuffle方式

  1.  
    //缩减分区
  2.  
    val result: RDD[Int] = rdd1.coalesce(2,true)
  3.  
     
  4.  
    result.mapPartitionsWithIndex(
  5.  
    (index,list) => list.map((index,_))).collect().foreach(println)

输出结果:

  1.  
    (0,1)
  2.  
    (0,3)
  3.  
    (0,4)
  4.  
    (1,2)

2.3.1.10 repartition()重新分区

(1)函数签名: def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

(2)功能说明:该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。

coalesce和repartition区别:

(1)coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。

(2)repartition实际上是调用的coalesce,进行shuffle。源码如下:

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    coalesce(numPartitions, shuffle = true)}

(3)coalesce一般为缩减分区,如果扩大分区,不使用shuffle是没有意义的,repartition扩大分区执行shuffle。

2.3.1.11 sortBy()排序

(1)函数签名:def sortBy[K]( f: (T) => K,ascending: Boolean = true,       // 默认为正序排列

    numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

(2)功能说明:该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。存在shuffle过程。

  1.  
    def main(args: Array[String]): Unit = {
  2.  
    //1.创建SparkConf并设置APP名称
  3.  
    val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
  4.  
     
  5.  
    //2.创建SparkContext对象
  6.  
    val sc = new SparkContext(conf)
  7.  
     
  8.  
    //3.创建RDD
  9.  
    val rdd1: RDD[Int] = sc.makeRDD(List(2, 5, 1, 3, 6, 4),2)
  10.  
     
  11.  
    //默认升序排列
  12.  
    val result: RDD[Int] = rdd1.sortBy(i => i)
  13.  
     
  14.  
    //改为降序排列
  15.  
    val result1: RDD[Int] = rdd1.sortBy(i => i, false)
  16.  
     
  17.  
    result.mapPartitionsWithIndex(
  18.  
    (index,list) => list.map((index,_))).collect().foreach(println)
  19.  
     
  20.  
    println("=====================================")
  21.  
     
  22.  
    result1.mapPartitionsWithIndex(
  23.  
    (index,list) => list.map((index,_))).collect().foreach(println)
  24.  
    //4.关闭sc
  25.  
    sc.stop()
  26.  
    }
学新通

输出结果:

  1.  
    (0,1)
  2.  
    (0,2)
  3.  
    (0,3)
  4.  
    (1,4)
  5.  
    (1,5)
  6.  
    (1,6)
  7.  
    =====================================
  8.  
    (0,6)
  9.  
    (0,5)
  10.  
    (0,4)
  11.  
    (1,3)
  12.  
    (1,2)
  13.  
    (1,1)

注意:Spark的排序能够保证全局有序,根据分区号的大小来排序,分区号小的在前面,默认情况下0号分区的数据统一都小于1号分区的数据。使用range分区,每一个分区有对应自己的值范围大小,将同一个范围内的key放入到同一个分区内。

2.3.2 双Value类型交互

2.3.2.1 intersection()交集

(1)函数签名:def intersection(other: RDD[T]): RDD[T]

(2)功能说明:对源RDD和参数RDD求交集后返回一个新的RDD。数据会打散重新分区,存在shuffle过程。

  1.  
    def main(args: Array[String]): Unit = {
  2.  
    //1.创建SparkConf并设置APP名称
  3.  
    val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
  4.  
     
  5.  
    //2.创建SparkContext对象
  6.  
    val sc = new SparkContext(conf)
  7.  
     
  8.  
    //3.创建RDD
  9.  
    val rdd1: RDD[Int] = sc.makeRDD(List(4, 3, 2, 1), 2)
  10.  
     
  11.  
    val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6), 2)
  12.  
     
  13.  
    //交集
  14.  
    val result: RDD[Int] = rdd1.intersection(rdd2)
  15.  
     
  16.  
    result.mapPartitionsWithIndex(
  17.  
    (index,list) => list.map((index,_))).collect().foreach(println)
  18.  
     
  19.  
    //4.关闭sc
  20.  
    sc.stop()
  21.  
    }
学新通

输出结果:

  1.  
    (0,4)
  2.  
    (1,3)

2.3.2.2 union()并集不去重

(1)函数签名:def union(other: RDD[T]): RDD[T]

(2)功能说明:对源RDD和参数RDD求并集后返回一个新的RDD。数据不会打散重新分区,最终的分区个数就是rdd的分区数之和,简单地将数据一个分区一个分区拿过来。

  1.  
    //创建RDD
  2.  
    val rdd1: RDD[Int] = sc.makeRDD(List(4, 3, 2, 1), 2)
  3.  
     
  4.  
    val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6), 2)
  5.  
     
  6.  
     
  7.  
    //并集
  8.  
    val result: RDD[Int] = rdd1.union(rdd2)
  9.  
     
  10.  
    result.mapPartitionsWithIndex(
  11.  
    (index,list) => list.map((index,_))).collect().foreach(println)

输出结果:

  1.  
    (0,4)
  2.  
    (0,3)
  3.  
    (1,2)
  4.  
    (1,1)
  5.  
    (2,3)
  6.  
    (2,4)
  7.  
    (3,5)
  8.  
    (3,6)

2.3.2.3 subtract()差集

(1)函数签名:def subtract(other: RDD[T]): RDD[T]

(2)功能说明:计算差的一种函数,去除两个RDD中相同元素,不同的RDD将保留下来。数据需要打散重新分区,存在shuffle过程。

  1.  
    //创建RDD
  2.  
    val rdd1: RDD[Int] = sc.makeRDD(List(4, 3, 2, 1), 2)
  3.  
     
  4.  
    val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6), 2)
  5.  
     
  6.  
     
  7.  
    //差集
  8.  
    val result: RDD[Int] = rdd1.subtract(rdd2)
  9.  
     
  10.  
    result.mapPartitionsWithIndex(
  11.  
    (index,list) => list.map((index,_))).collect().foreach(println)

输出结果:

  1.  
    (0,2)
  2.  
    (1,1)

2.3.2.4 zip()拉链

(1)函数签名:def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

(2)功能说明:该操作可以将两个RDD中的元素以键值对的形式进行合并。键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的元素。

将两个RDD组合成Key-Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

  1.  
    //创建RDD
  2.  
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3), 3)
  3.  
     
  4.  
    val rdd2: RDD[String] = sc.makeRDD(List("a", "b", "c"),3)
  5.  
     
  6.  
    //拉链
  7.  
    val result: RDD[(Int, String)] = rdd1.zip(rdd2)
  8.  
     
  9.  
    result.mapPartitionsWithIndex(
  10.  
    (index,list) => list.map((index,_))).collect().foreach(println)

输出结果:

  1.  
    (0,(1,a))
  2.  
    (1,(2,b))
  3.  
    (2,(3,c))

2.3.3 Key-Value类型

2.3.3.1 partitionBy()按照K重新分区

(1)函数签名:def partitionBy(partitioner: Partitioner): RDD[(K, V)]

(2)功能说明:将RDD[K,V]中的K按照指定Partitioner(分区器)重新进行分区;如果原有的RDD和新的RDD是一致的话就不进行分区,否则会产生Shuffle过程。不能针对单value数据进行调用,必须使用二元组。

  1.  
    //创建RDD
  2.  
    val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1, "aaa"), (2, "bbb"), (3, "ccc")),3)
  3.  
     
  4.  
    //使用分区器进行分区,此处使用hash分区器
  5.  
    //系统的hash分区器会对key进行hashcode,之后对填写的分区个数取模,得到分区号
  6.  
    val result: RDD[(Int, String)] = rdd1.partitionBy(new HashPartitioner(2))
  7.  
     
  8.  
    result.mapPartitionsWithIndex(
  9.  
    (index,list) => list.map((index,_))).collect().foreach(println)

输出结果:

  1.  
    (0,(2,bbb))
  2.  
    (1,(1,aaa))
  3.  
    (1,(3,ccc))

2.3.3.2 自定义分区

要实现自定义分区器,需要继承org.apache.spark.Partitioner类,并实现下面三个方法:

(1)numPartitions: Int:返回创建出来的分区数。

(2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。

(3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同。

注意:Spark的分区器只能根据key进行分区。

  1.  
    object customPartitioner {
  2.  
    def main(args: Array[String]): Unit = {
  3.  
    //1.创建SparkConf并设置APP名称
  4.  
    val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
  5.  
     
  6.  
    //2.创建SparkContext对象
  7.  
    val sc = new SparkContext(conf)
  8.  
     
  9.  
    //3.创建RDD
  10.  
    val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1, "aaa"), (2, "bbb"), (3, "ccc"), (4, "ddd")), 4)
  11.  
     
  12.  
    //使用自定义分区器进行分区
  13.  
    val result: RDD[(Int, String)] = rdd1.partitionBy(new myPartitioner(3))
  14.  
     
  15.  
    result.mapPartitionsWithIndex(
  16.  
    (index, list) => list.map((index, _))).collect().foreach(println)
  17.  
     
  18.  
    println("====================================================")
  19.  
     
  20.  
    //实现匿名子类自定义分区器
  21.  
    //需求:根据key的不同类型进行分区,将int值放入1号分区,str值放入2号分区,其余放入0号分区
  22.  
    val rdd2: RDD[(Any, Int)] = sc.makeRDD(List((1, 1), (2, 2), (3.1, 3), ("a", 4), ("b", 5)), 1)
  23.  
     
  24.  
    val result1: RDD[(Any, Int)] = rdd2.partitionBy(new Partitioner {
  25.  
    override def numPartitions = 3
  26.  
     
  27.  
    override def getPartition(key: Any) = {
  28.  
    key match {
  29.  
    case i: Int => 1
  30.  
    case s: String => 2
  31.  
    case _ => 0
  32.  
    }
  33.  
    }
  34.  
    })
  35.  
    result1.mapPartitionsWithIndex(
  36.  
    (index, list) => list.map((index, _))).collect().foreach(println)
  37.  
     
  38.  
    //4.关闭sc
  39.  
    sc.stop()
  40.  
    }
  41.  
    }
  42.  
     
  43.  
    class myPartitioner(num: Int) extends Partitioner {
  44.  
    //该分区器有几个分区
  45.  
    override def numPartitions: Int = num
  46.  
     
  47.  
    //具体分区逻辑:根据传入数据的key返回对应的分区号
  48.  
    override def getPartition(key: Any): Int = {
  49.  
    key match {
  50.  
    case i: Int => i % num
  51.  
    case s: String => s.hashCode % num
  52.  
    //所有返回的分区号都不能超过分区个数的值
  53.  
    case _ => 0
  54.  
    }
  55.  
    }
  56.  
    }
学新通

输出结果:

  1.  
    (0,(3,ccc))
  2.  
    (1,(1,aaa))
  3.  
    (1,(4,ddd))
  4.  
    (2,(2,bbb))
  5.  
    =======================
  6.  
    (0,(3.1,3))
  7.  
    (1,(1,1))
  8.  
    (1,(2,2))
  9.  
    (2,(a,4))
  10.  
    (2,(b,5))

2.3.3.3 reduceByKey()按照K聚合V

(1)函数签名:def reduceByKey(func: (V, V) => V): RDD[(K, V)]

   def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

(2)功能说明:该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。

  1.  
    //创建RDD
  2.  
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("a", 5), ("b", 2)))
  3.  
     
  4.  
    //需求:计算相同key对应值的相加结果
  5.  
    //res初始值为第一个元素
  6.  
    val result: RDD[(String, Int)] = rdd1.reduceByKey((res, elem) => res elem)
  7.  
     
  8.  
    result.collect().foreach(println)

输出结果:

  1.  
    (a,6)
  2.  
    (b,7)

2.3.3.4 groupByKey()按照K重新分组

(1)函数签名:def groupByKey(): RDD[(K, Iterable[V])]

(2)功能说明:对每个key进行操作,但只生成一个seq,并不进行聚合。按照key进行分组,直接进行shuffle,后面的集合中只有单独一个value值。该操作可以指定分区器或者分区数(默认使用HashPartitioner)。

  1.  
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("a", 5), ("b", 2)))
  2.  
     
  3.  
    val result: RDD[(String, Iterable[Int])] = rdd1.groupByKey()
  4.  
     
  5.  
    result.collect().foreach(println)

输出结果:

  1.  
    (a,CompactBuffer(1, 5))
  2.  
    (b,CompactBuffer(5, 2))

2.3.3.5 aggregateByKey()按照K处理分区内和分区间逻辑

(1)函数签名:def aggregateByKey[U: Class Tag](zeroValue:U)(seqOp:(U,V)=>U,combOp:(U,U)=>U):RDD[(K,U)]

     ①zeroValue(初始值):给每一个分区中的每一种key一个初始值,只在分区内使用

     ②seqOp(分区内):函数用于在每一个分区中用初始值逐步迭代value

     ③combOp(分区间):函数用于合并每个分区中的结果,初始值为第一个元素

  1.  
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5), ("b", 6)), 2)
  2.  
     
  3.  
    //需求:取出每个分区相同key对应值的最大值然后相加
  4.  
    val result: RDD[(String, Int)] = rdd1.aggregateByKey(0)((res, elem) => math.max(res, elem), (res, elem) => (res elem))
  5.  
    //使用匿名函数化简后可写为:rdd1.aggregateByKey(0)(math.max(_,_),_ _)
  6.  
     
  7.  
    result.collect().foreach(println)

输出结果:

  1.  
    (b,8)
  2.  
    (a,8)

2.3.3.6 foldByKey()分区内和分区间相同的aggregateByKey()

(1)函数签名: def foldByKey(zeroVlaue:V)(func:(V,V) => V):RDD[(K,V)]

     zeroVlaue:初始化值,可以是任意类型

     func:函数,两个输入参数相同

(2)功能说明:是aggreegateByKey的简化操作,seqOp和combOp相同,即分区内逻辑和分区间逻辑相同。

  1.  
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5), ("b", 6)), 2)
  2.  
     
  3.  
    val result: RDD[(String, Int)] = rdd1.foldByKey(10)(_ _)
  4.  
     
  5.  
    result.collect().foreach(println)

输出结果:

  1.  
    (b,32)
  2.  
    (a,29)

2.3.3.7 combineByKey()转换结构后分区内和分区间操作

(1)函数签名: def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

(1)createCombiner(转换数据的结构): combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值

(2)mergeValue(分区内): 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并

(3)mergeCombiners(分区间): 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。

(2)功能说明:针对相同K,将V合并成一个集合。

  1.  
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
  2.  
     
  3.  
    //需求:计算每种key对应值的和,再根据key计算每种key的平均值
  4.  
    //先进行分区内累加 ("a",88) ("a",91) => "a",(179,2)
  5.  
    val result: RDD[(String, (Int, Int))] = rdd1.combineByKey(
  6.  
    //转换结构 "a",88 => (88,1)
  7.  
    i => (i, 1),
  8.  
    // "a",(88,1) (91,1) => (179,2)
  9.  
    (c: (Int, Int), elem) => (c._1 elem, c._2 1),
  10.  
    // "a",(179,2) (95,1) => (274,3)
  11.  
    (c1: (Int, Int), c2: (Int, Int)) => (c1._1 c2._1, c1._2 c2._2)
  12.  
    )
  13.  
     
  14.  
    result.collect().foreach(println)
  15.  
     
  16.  
    println("=============================")
  17.  
    //偏函数写法求平均值
  18.  
    val value1: RDD[(String, Double)] = result.map({
  19.  
    case (key, value) => (key, value._1.toDouble / value._2)
  20.  
    })
  21.  
     
  22.  
    //其他写法
  23.  
    val value2: RDD[(String, Double)] = result.map(tuple => (tuple._1, tuple._2._1.toDouble / tuple._2._2))
  24.  
     
  25.  
    value1.collect().foreach(println)
  26.  
    println("==========================")
  27.  
    value2.collect().foreach(println)
学新通

输出结果:

  1.  
    (b,(286,3))
  2.  
    (a,(274,3))
  3.  
    ======================
  4.  
    (b,95.33333333333333)
  5.  
    (a,91.33333333333333)
  6.  
    ======================
  7.  
    (b,95.33333333333333)
  8.  
    (a,91.33333333333333)

2.3.3.8 reduceByKey、foldByKey、aggregateByKey、combineByKey的联系

reduceByKey:底层调用combineByKey,没有初始值,使用第一个元素作为初始值,分区内和分区间逻辑一致

foldByKey:底层调用combineByKey,有初始值,分区内和分区间计算逻辑一致

aggregateByKey:底层调用combineByKey,有初始值,分区内和分区间计算逻辑可以不一致,更灵活

combineByKey:有初始值,并且初始值还支持改变数据结构,最灵活

2.3.3.9 sortByKey()按照K进行排序

(1)函数签名: def sortByKey(ascending: Boolean = true, // 默认,升序

     numPartitions: Int = self.partitions.length)  : RDD[(K, V)]

(2)功能说明:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD

  1.  
    val rdd1: RDD[(Int, String)] = sc.makeRDD(List((3, "aa"), (6, "cc"), (2, "bb"), (1, "dd")))
  2.  
     
  3.  
    //默认正序排序
  4.  
    val result: RDD[(Int, String)] = rdd1.sortByKey()
  5.  
     
  6.  
    //使用倒序排序
  7.  
    val result1: RDD[(Int, String)] = rdd1.sortByKey(false)
  8.  
     
  9.  
    result.collect().foreach(println)
  10.  
     
  11.  
    println("====================")
  12.  
     
  13.  
    result1.collect().foreach(println)

输出结果:

  1.  
    (1,dd)
  2.  
    (2,bb)
  3.  
    (3,aa)
  4.  
    (6,cc)
  5.  
    ==========
  6.  
    (6,cc)
  7.  
    (3,aa)
  8.  
    (2,bb)
  9.  
    (1,dd)

2.3.3.10 mapValues()只对V进行操作

(1)函数签名:def mapValues[U](f: V => U): RDD[(K, U)]

(2)功能说明:针对于(K,V)形式的类型只对V进行操作

  1.  
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
  2.  
     
  3.  
    //将元素值以二倍值返回
  4.  
    val result: RDD[(String, Int)] = rdd1.mapValues(_ * 2)
  5.  
     
  6.  
    result.collect().foreach(println)

输出结果:

  1.  
    (a,2)
  2.  
    (b,4)
  3.  
    (c,6)
  4.  
    (d,8)

2.3.3.11 join()等同于sql里的内连接,关联上的要,关联不上的舍弃

(1)函数签名: def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]

(2)功能说明:在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD

  1.  
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("c", 2), ("d", 6)))
  2.  
     
  3.  
    val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("e", 2), ("f", 6)))
  4.  
     
  5.  
    //join等同于sql中的内连接
  6.  
    //能够join上的值保留,反之删除
  7.  
    val result: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
  8.  
     
  9.  
    result.collect().foreach(println)

输出结果:

  1.  
    (a,(1,1))
  2.  
    (b,(5,5))

2.3.3.12 cogroup()类似于sql的全连接,但是在同一个RDD中对key聚合

(1)函数签名:def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

(2)功能说明:在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD。操作两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。

  1.  
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("c", 2), ("d", 6)))
  2.  
     
  3.  
    val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("e", 3), ("f", 4)))
  4.  
     
  5.  
    //类似于sql的满外连接
  6.  
    //先将当前rdd中的同一个key数据聚合为一个集合,之后用二元组形式表现出来
  7.  
    val result: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
  8.  
     
  9.  
    result.collect().foreach(println)

输出结果:

  1.  
    (a,(CompactBuffer(1),CompactBuffer(1)))
  2.  
    (b,(CompactBuffer(5),CompactBuffer(5)))
  3.  
    (c,(CompactBuffer(2),CompactBuffer()))
  4.  
    (d,(CompactBuffer(6),CompactBuffer()))
  5.  
    (e,(CompactBuffer(),CompactBuffer(3)))
  6.  
    (f,(CompactBuffer(),CompactBuffer(4)))

2.4 Action行动算子

  行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。

2.4.1 collect()以数组的形式返回数据集

(1)函数签名:def collect(): Array[T]

(2)功能说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。Executor端所有的数据都会被拉取到Drive端,会按照分区的编号按顺序收集数据,最终结果全局有序。

2.4.2 count()返回RDD中元素个数

(1)函数签名:def count(): Long

(2)功能说明:返回RDD中元素的个数

  1.  
    val rdd1: RDD[Int] = sc.makeRDD(1 to 100, 16)
  2.  
     
  3.  
    val result: Long = rdd1.count()
  4.  
     
  5.  
    println(result)

输出结果:

100

2.4.3 first()返回RDD中的第一个元素

(1)函数签名:def first(): T

(2)功能说明:返回RDD中的第一个元素

  1.  
    val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
  2.  
     
  3.  
    val i: Int = rdd1.first()
  4.  
     
  5.  
    println(i)

输出结果:

1

2.4.4 take()返回由RDD前n个元素组成的数组

(1)函数签名:def take(num: Int): Array[T]

(2)功能说明:返回一个由RDD的前n个元素组成的数组,默认按照全局有序进行拉取数据。

  1.  
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)), 2)
  2.  
     
  3.  
    val result: Array[(String, Int)] = rdd1.take(2)
  4.  
     
  5.  
    result.foreach(println)

输出结果:

  1.  
    (a,1)
  2.  
    (b,2)

2.4.5 takeOrdered()返回该RDD排序后前n个元素组成的数组

1)函数签名:def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

2)功能说明:返回该RDD排序后的前n个元素组成的数组,默认按照升序排序。

  1.  
    val rdd1: RDD[Int] = sc.makeRDD(List(5, 8, 3, 1, 6, 7, 2))
  2.  
     
  3.  
    //默认升序
  4.  
    val result: Array[Int] = rdd1.takeOrdered(3)
  5.  
     
  6.  
    //降序排列 rdd1.takeOrdered(3)(Ordering[Int].reverse)
  7.  
     
  8.  
    result.foreach(println)

输出结果:

  1.  
    1
  2.  
    2
  3.  
    3

2.4.6 countByKey()统计每种key的个数

(1)函数签名:def countByKey(): Map[K, Long]

(2)功能说明:统计每种key的个数,需要RDD中的数据是二元组,返回结果为map。

  1.  
    val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("a", 4)))
  2.  
     
  3.  
    val result: collection.Map[String, Long] = rdd1.countByKey()
  4.  
     
  5.  
    result.foreach(println)

输出结果:

  1.  
    (a,3)
  2.  
    (b,1)

2.4.7 save相关算子

(1)saveAsTextFile(path)保存成Text文件

 功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本。

(2)saveAsSequenceFile(path) 保存成Sequencefile文件

 功能说明:将数据集中的元素以Hadoop Sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。只有kv类型RDD有该操作,单值的没有。

(3)saveAsObjectFile(path) 序列化成对象保存到文件

  功能说明:用于将RDD中的元素序列化成对象,存储到文件中。

2.4.8 foreach()遍历RDD中每一个元素

(1)函数签名:def foreach(f:T => Unit) : Unit

(2)功能说明:遍历RDD中的每一个元素,并依次应用f函数。

注意:rdd.collect().foreach(println),在Driver端执行foreach打印,此处的foreach是方法,打印结果是全局有序的。

rdd.foreach(println),在Executor端执行foreach打印,此处的foreach是行动算子,是分布式的,打印结果整体无序。

2.5 RDD序列化

初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,涉及到了跨进程通信,对象需要经过序列化变成byte数组才能传输,(不序列化会报错,无法运行)。

Spark自带闭包检查,有闭包也是需要序列化的。算子以外的代码都是在Driver端执行,算子里面的代码都是在Executor端执行。

序列化的方法:(1)类继承scala.Serializable接口(2)把类变成样例类(case class),样例类默认是序列化的。

当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。即使使用Kryo序列化,也要继承Serializable接口或者使用样例类。

使用Kryo序列化的方法:

new SparkConf().set("spark.serializer","org.apache.spark.serializer.KryoSerializer").registerKryoClasses(Array(classOf[自定义类名]))

2.6 RDD依赖关系

2.6.1 血缘关系

RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。

查看血缘关系的方法:toDebugString

  1.  
    val rdd: RDD[String] = sc.textFile("input/1.txt")
  2.  
     
  3.  
    //查看血缘关系
  4.  
    println(rdd.toDebugString)
  5.  
     
  6.  
    println("=========================")
  7.  
     
  8.  
    val rdd1: RDD[String] = rdd.flatMap(_.split(""))
  9.  
     
  10.  
    //查看血缘关系
  11.  
    println(rdd1.toDebugString)
  12.  
     
  13.  
    println("=========================")
  14.  
     
  15.  
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
  16.  
     
  17.  
    //查看血缘关系
  18.  
    println(rdd2.toDebugString)
  19.  
     
  20.  
    println("=========================")
  21.  
     
  22.  
    val rdd3: RDD[(String, Int)] = rdd2.reduceByKey(_ _)
  23.  
     
  24.  
    //查看血缘关系
  25.  
    println(rdd3.toDebugString)
学新通

 输出结果:会输出当前RDD往上所有的血缘关系。

  1.  
    (2) input/1.txt MapPartitionsRDD[1] at textFile at Test01_Dependency.scala:19 []
  2.  
    | input/1.txt HadoopRDD[0] at textFile at Test01_Dependency.scala:19 []
  3.  
    =========================
  4.  
    (2) MapPartitionsRDD[2] at flatMap at Test01_Dependency.scala:26 []
  5.  
    | input/1.txt MapPartitionsRDD[1] at textFile at Test01_Dependency.scala:19 []
  6.  
    | input/1.txt HadoopRDD[0] at textFile at Test01_Dependency.scala:19 []
  7.  
    =========================
  8.  
    (2) MapPartitionsRDD[3] at map at Test01_Dependency.scala:33 []
  9.  
    | MapPartitionsRDD[2] at flatMap at Test01_Dependency.scala:26 []
  10.  
    | input/1.txt MapPartitionsRDD[1] at textFile at Test01_Dependency.scala:19 []
  11.  
    | input/1.txt HadoopRDD[0] at textFile at Test01_Dependency.scala:19 []
  12.  
    =========================
  13.  
    (2) ShuffledRDD[4] at reduceByKey at Test01_Dependency.scala:40 []
  14.  
    -(2) MapPartitionsRDD[3] at map at Test01_Dependency.scala:33 []
  15.  
    | MapPartitionsRDD[2] at flatMap at Test01_Dependency.scala:26 []
  16.  
    | input/1.txt MapPartitionsRDD[1] at textFile at Test01_Dependency.scala:19 []
  17.  
    | input/1.txt HadoopRDD[0] at textFile at Test01_Dependency.scala:19 []
学新通

2.6.2 依赖关系

查看依赖关系的方法:dependencies

  1.  
    val rdd: RDD[String] = sc.textFile("input/1.txt")
  2.  
     
  3.  
    //查看依赖关系
  4.  
    println(rdd.dependencies)
  5.  
     
  6.  
    println("=========================")
  7.  
     
  8.  
    val rdd1: RDD[String] = rdd.flatMap(_.split(""))
  9.  
     
  10.  
    //查看依赖关系
  11.  
    println(rdd1.dependencies)
  12.  
     
  13.  
    println("=========================")
  14.  
     
  15.  
    val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
  16.  
     
  17.  
    //查看依赖关系
  18.  
    println(rdd2.dependencies)
  19.  
     
  20.  
    println("=========================")
  21.  
     
  22.  
    val rdd3: RDD[(String, Int)] = rdd2.reduceByKey(_ _)
  23.  
     
  24.  
    //查看依赖关系
  25.  
    println(rdd3.dependencies)
学新通

输出结果:

  1.  
    List(org.apache.spark.OneToOneDependency@80bfdc6)
  2.  
    =========================
  3.  
    List(org.apache.spark.OneToOneDependency@46963479)
  4.  
    =========================
  5.  
    List(org.apache.spark.OneToOneDependency@3dffc764)
  6.  
    =========================
  7.  
    List(org.apache.spark.ShuffleDependency@6ed043d3)

RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是RDD的parent RDD(s)是什么(血缘); 另一个就是RDD依赖于parent RDD(s)的哪些Partition(s),这种关系就是RDD之间的依赖。

RDD和它依赖的父RDD(s)的依赖关系有两种不同的类型,即窄依赖(NarrowDependency)和宽依赖(ShuffleDependency)。

窄依赖:示每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一or多对一),比喻为独生子女。

宽依赖:表示同一个父RDD的Partition被多个子RDD的Partition依赖(只能是一对多),会引起Shuffle,比喻为超生。

具有宽依赖的transformations包括:sort、reduceByKey、groupByKey、join和调用rePartition函数的任何操作。
宽依赖对Spark去评估一个transformations有更加重要的影响,比如对性能的影响。在不影响业务要求的情况下,要尽量避免使用有宽依赖的转换算子,因为有宽依赖,就一定会走shuffle,影响性能。

2.6.3 Stage任务划分

2.6.3.1  DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。

2.6.3.2  RDD任务切分中间分为:Application、Job、Stage和Task

(1)Application:初始化一个SparkContext即生成一个Application;

(2)Job:一个Action算子就会生成一个Job;

(3)Stage:Stage等于宽依赖的个数加1;

(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。

注意:Application->Job->Stage->Task每一层都是1对n的关系。

2.7 RDD持久化

2.7.1 RDD Cache缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。cache操作会增加血缘关系,不改变原有的血缘关系。

缓存级别介绍学新通

 注意:默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。SER:表示序列化。

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

2.7.2 RDD CheckPoint检查点

(1)检查点:通过将RDD中间结果写入磁盘。

(2)为什么要做检查点?由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。

(3)检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统

(4)检查点数据存储格式为:二进制的文件

(5)检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。

(6)检查点触发时间:对RDD进行Checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍。

(7)设置检查点步骤

(1)设置检查点数据存储路径:sc.setCheckpointDir("./checkpoint1")

(2)调用检查点方法:wordToOneRdd.checkpoint()

2.7.3 缓存和检查点区别

(1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。

(2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。

(3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。

(4)如果使用完了缓存,可以通过unpersist()方法释放缓存

2.8 键值对RDD数据分区

Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。

注意:(1)只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None

(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

(3)需要使用到分区器时才会往下游RDD添加分区器。涉及到分组时使用Hash分区器,涉及到排序时使用Range分区器。

2.8.1 Hash分区

HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数 分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。

HashPartitioner分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据(数据倾斜)。可以使用预聚合减轻这种情况。

2.8.2 Ranger分区

RangePartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。

实现过程为:第一步:先从整个RDD中采用水塘抽样算法,抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的。

2.9 累加器(和RDD平级)

累加器:分布式共享只写变量。(Executor和Executor之间不能读数据)累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。

2.9.1 系统累加器

(1)累加器定义(SparkContext.accumulator(initialValue)方法)

           val sum: LongAccumulator = sc.longAccumulator("sum")

(2)累加器添加数据(累加器.add方法)

          sum.add(count)

(3)累加器获取数据(累加器.value)

         sum.value

注意:①Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。因此我们说,累加器是一个分布式共享只写变量。②累加器要放在行动算子中,因为转换算子执行的次数取决于job的数量,如果一个spark应用有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动算子中。对于在行动算子中使用的累加器,Spark只会把每个Job对各累加器的修改应用一次。因为行动算子只会执行一次,而转换算子的执行次数不确定。

2.9.2 自定义累加器

自定义累加器使用步骤

(1)继承AccumulatorV2,设定输入、输出泛型

(2)重写6个抽象方法

(3)使用自定义累加器需要注册::sc.register(累加器,"累加器名字")

2.10 广播变量

广播变量:分布式共享只读变量。广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark Task操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来会很顺手。在多个Task并行操作中使用同一个变量,但是Spark会为每个Task任务分别发送。

使用步骤:

(1)调用SparkContext.broadcast(广播变量)创建出一个广播对象,任何可序列化的类型都可以这么实现。

(2)通过广播变量.value,访问该对象的值。

(3)广播变量只会被发到各个节点一次,作为只读值处理(修改这个值不会影响到别的节点)。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhiaccee
系列文章
更多 icon
同类精品
更多 icon
继续加载