Spark入门
一.简介
Spark是一种基于内存的快速、通用、可扩展的大数据分析计算引擎。
(1)官网地址:Apache Spark™ - Unified Engine for large-scale data analytics
(2)文档查看地址:Overview - Spark 3.0.0 Documentation
(3)下载地址:Downloads | Apache Spark
https://archive.apache.org/dist/spark/
1.Hadoop与Spark框架对比
2.Spark目前支持的部署模式
(1)Local模式:在本地部署单个Spark服务
(2)Standalone模式:Spark自带的任务调度模式。
(3)YARN模式:Spark使用Hadoop的YARN组件进行资源与任务调度。
(4)Mesos模式:Spark使用Mesos平台进行资源与任务的调度。
3.常用端口号
(1)Spark查看当前Spark-shell运行任务情况端口号:4040
(2)Spark Master内部通信服务端口号:7077 (类比于yarn的8032(RM和NM的内部通信)端口)
(3)Spark Standalone模式Master Web端口号:8080(类比于Hadoop YARN任务运行情况查看端口号:8088)
(4)Spark历史服务器端口号:18080 (类比于Hadoop历史服务器端口号:19888)
4.集群角色简单介绍
1.Master和Worker集群资源管理
Master和Worker是Spark的守护进程、集群资源管理者,即Spark在特定模式(Standalone)下正常运行必须要有的后台常驻进程。
Master:Spark特有资源调度系统的Leader,掌管着整个集群的资源信息,类似于Yarn框架中的ResourceManager。
Worker:Spark特有资源调度系统的Slave,有多个,每个Slave掌管着所在节点的资源信息,类似于Yarn框架中的NodeManager。
2.Driver和Executor任务的管理者
Driver和Executor是临时程序,当有具体任务提交到Spark集群才会开启的程序。
Driver:执行程序时Spark Shell中会预加载一个叫做sc的SparkContext对象,Driver此时的作用:①把用户程序转为作业(job) ②跟踪Executor的任务运行状况 ③为执行器节点调度任务 ④WebUI界面展示应用运行情况。
Executor:负责执行Spark的具体任务。
二.Yarn模式的基本使用
Spark客户端直接连接Yarn,不需要额外构建Spark集群。
1.运行流程
Spark有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。
yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出。
yarn-cluster:Driver程序运行在由ResourceManager启动的APPMaster。
yarn-client运行模式介绍
yarn-cluster运行模式介绍
spark的几种模式对比:
模式 |
Spark安装机器数 |
需启动的进程 |
所属者 |
Local |
1 |
无 |
Spark |
Standalone |
3 |
Master及Worker |
Spark |
Yarn |
1 |
Yarn及HDFS |
Hadoop |
三.RDD
1.RDD概述
RDD(Resilient Distributed Dataset)弹性分布式数据集,是Spark中最基本的数据抽象。它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
弹性:①存储的弹性:内存与磁盘的自动切换②容错的弹性:数据丢失可以自动恢复③计算的弹性:计算出错重试机制④分片的弹性:可根据需要重新分片
分布式:数据存储在大数据集群不同节点上
数据集:封装了计算逻辑,但不保存数据
数据抽象:代码中是一个抽象类(abstract class RDD),需要子类具体实现
不可变:封装了具体的计算逻辑,是不可以改变的,想要改变只能生成新的RDD进行封装
注意:在Spark中只有遇到action等行动算子才会执行RDD的计算(延迟计算),所有RDD算子相关操作都在Executor端执行,RDD算子之外的操作都在Driver端执行。
五大特性:
(1)一组分区,是数据集的基本组成单位,标记数据是哪个分区的
protect def getPartitions:Array[Partition]
(2)一个计算每个分区的函数
def compute(split:Partition,context TaskContext):Interator[T]
(3)RDD之间的依赖关系
protected def getDependencies:Seq[Dependency[ _ ]]=deps
(4)一个Partitioner,RDD的分片函数,控制分区的数据流向(键值对)
val partitioner:scala.Option[org.apache.spark.Partitioner]
(5)一个列表,存储存取每个Partition的优先位置
protected def getPreferredLocations(split:Partition):scala.Seq[String]
2.RDD编程
2.1RDD的创建
RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。
2.1.1从集合中创建
主要使用两种函数:parallelize和makeRDD
-
def main(args: Array[String]): Unit = {
-
-
//1.创建SparkConf并设置App名称
-
val conf: SparkConf = new
-
SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
-
//2.创建SparkContext,该对象是提交Spark App的入口
-
val sc: SparkContext = new SparkContext(conf)
-
-
//3.使用parallelize()创建rdd
-
val rdd: RDD[Int] = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8))
-
-
rdd.collect().foreach(println)
-
-
//4.使用makeRDD()创建rdd
-
val rdd1: RDD[Int] = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8))
-
-
rdd1.collect().foreach(println)
-
-
sc.stop()
-
}
2.1.2从外部存储创建
由外部存储系统的数据集创建RDD包括:本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、HBase等。
-
def main(args: Array[String]): Unit = {
-
-
//1.创建SparkConf并设置App名称
-
val conf: SparkConf = new
-
SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
-
//2.创建SparkContext,该对象是提交Spark App的入口
-
val sc: SparkContext = new SparkContext(conf)
-
-
//3.读取文件。此处举例集群路径:hdfs://hadoop102:8020/input
-
val lineWordRdd: RDD[String] = sc.textFile("input")
-
-
//4.打印
-
lineWordRdd.foreach(println)
-
-
//5.关闭
-
sc.stop()
-
}
2.1.3从其他RDD创建
2.2分区规则
2.2.1数据从集合中创建
分区数量:不手动填写分区数的情况下,默认分区数跟本地模式的cpu核数有关
local : 1个 local[*] : 笔记本所有核心数 local[K]:K个
数据分区规则:整数除,多余的数放到后面
例:List(1,2,3,4,5),设置了2个分区,0号分区数据为(1,2);1号分区数据为(3,4,5)
左闭右开
分区的开始位置 = 分区号 * 数据总长度/分区总数
分区的结束位置 =(分区号 1)* 数据总长度/分区总数
例:List(1,2,3,4,5),设置了3个分区,最终0号分区数据为(1);1号分区数据为(2,3);2号分区数据为(4,5)
2.2.1数据从文件中创建
分区数量:默认填写数值 math.min(defaultParallelism,2)
环境的核数和2比较取最小值,一般为2,但最终分区数不一定是2
最终分区切分规则:
计算文件长度除以填写的分区数:
goalSize = length / partitions
splitSize = Math.max(1,Math.min(goalSize,128m))
拿splitSize按照1.1倍切分规则去切分文件,得到最终的分区数.
例:设一个文件长度为10,手动设置分区数为3,goalSize=10/3=3
splitSize=3 分区数=10(length)/3(splitSize) =>3,3,4 4大于3的1.1倍,符合hadoop切片1.1倍的策略,因此会多创建一个分区,即一共有4个分区 3,3,3,1
数据分区规则:
Spark读取文件,采用的是hadoop的方式读取,所以一行一行读取,跟字节数没有关系,数据读取位置计算是以偏移量为单位来进行计算的。
如果切分大小的时候,出现在一行的中间,默认将这整行的数据放到前一个分区中。
2.3Transformation转换算子
RDD整体上分为Value类型、双Value类型和Key-Value类型
2.3.1 Value类型
2.3.1.1 map()映射
(1)函数签名:def map[U: Class Tag](f : T => U): RDD[U]
(2)功能说明:参数f是一个函数,它可以接收一个参数。当某个RDD执行map方法时,会遍历该RDD中的每一个数据项,并依次应用f函数,从而产生一个新的RDD。这个新RDD中的每一个元素都是由原来RDD中的元素依次应用f函数而得到的。
例:
-
-
def main(args: Array[String]): Unit = {
-
-
//1.创建SparkConf并设置App名称
-
val conf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
-
-
//2.创建SparkContext,该对象是提交Spark App的入口
-
val sc = new SparkContext(conf)
-
-
//3.具体业务逻辑
-
// 3.1 创建一个RDD
-
val rdd: RDD[Int] = sc.makeRDD(1 to 4, 2)
-
-
// 3.2 调用map方法,每个元素乘以2
-
val mapRdd: RDD[Int] = rdd.map(_ * 2)
-
-
// 3.3 打印修改后的RDD中数据
-
mapRdd.collect().foreach(println)
-
-
//4.关闭连接
-
sc.stop()
-
}
2.3.1.2 mapPartitions()以分区为单位执行Map
(1)函数签名:def mapPartitions[U : ClassTag](
f : iterator[T] => iterator[U], //f函数把每一个分区的数据分别放入到迭代器中,进行批处理
preservesPartitioning:Boolean = false) : RDD[U] //是否保留上游RDD的分区信息,默认false
(2)功能说明:mapPartitions一次处理一个分区数据
2.3.1.3 map()和mapPartitions()区别
map():每次处理一条数据,一条数据调用一次函数
mapPartitions():每次处理一个分区的数据,一个分区调用一次函数,效率更高,但这个分区的数据处理完后原RDD分区的数据才能释放
2.3.1.4 mapPartitionsWithIndex()带分区号
1)函数签名:
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U], // Int表示分区编号
preservesPartitioning: Boolean = false): RDD[U]
2)功能说明:类似于mapPartitions,比mapPartitions多一个整数参数表示分区号
2.3.1.5 flatMap()扁平化
1)函数签名:def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
2)功能说明:与map操作类似,将RDD中的每一个元素通过应用f函数依次转换为新的元素,并封装到RDD中。
区别:在flatMap操作中,f函数的返回值是一个集合,并且会将每一个该集合中的元素拆分出来放到新的RDD中。
-
def main(args: Array[String]): Unit = {
-
//1.创建SparkConf并设置APP名称
-
val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
-
-
//2.创建SparkContext对象
-
val sc = new SparkContext(conf)
-
-
//3.创建RDD
-
-
val arrayRDD: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4)))
-
-
val result: RDD[Int] = arrayRDD.flatMap(list => list)
-
-
result.collect().foreach(println)
-
-
-
//4.关闭sc
-
sc.stop()
-
}
输出结果:
-
1
-
2
-
3
-
4
2.3.1.6 groupBy()分组
(1)函数签名:def groupBy[K](f:T => K)(implicit kt : ClassTag[K]) : RDD[(k,iterable[T])]
(2)功能说明:按照传入函数的返回值进行分组,将相同key的对应值放入一个迭代器。存在shuffle过程。
-
def main(args: Array[String]): Unit = {
-
//1.创建SparkConf并设置APP名称
-
val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
-
-
//2.创建SparkContext对象
-
val sc = new SparkContext(conf)
-
-
//3.创建RDD
-
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
-
//将数据按奇偶分开
-
val result: RDD[(Int, Iterable[Int])] = rdd1.groupBy(i => i % 2)
-
-
result.collect().foreach(println)
-
-
//4.关闭sc
-
sc.stop()
-
}
输出结果:
-
(0,CompactBuffer(2, 4, 6))
-
(1,CompactBuffer(1, 3, 5))
2.3.1.7 filter()过滤
(1)函数签名: def filter(f: T => Boolean): RDD[T]
(2)功能说明:接收一个返回值为布尔类型的函数作为参数。当某个RDD调用filter方法时,会对该RDD中每一个元素应用f函数,如果返回值类型为true,则该元素会被添加到新的RDD中。
-
def main(args: Array[String]): Unit = {
-
//1.创建SparkConf并设置APP名称
-
val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
-
-
//2.创建SparkContext对象
-
val sc = new SparkContext(conf)
-
-
//3.创建RDD
-
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
-
-
//过滤出为偶数的数据
-
val result: RDD[Int] = rdd1.filter(i => i % 2 == 0)
-
-
result.collect().foreach(println)
-
-
//4.关闭sc
-
sc.stop()
-
}
输出结果:
-
2
-
4
-
6
2.3.1.8 distinct()去重
(1)函数签名:def distinct():RDD[T] //默认情况下,distinct会生成与原RDD分区个数一致的分区数
def distinct(num Partitions:Int)(implict ord:Ordering[T]=null):RDD[T] //可以去重后修改分区个数
(2)功能说明:对内部的元素去重,并将去重后的元素放到新的RDD中。存在shuffle过程。是分布式的,相比HashSet集合方式不容易OOM。
-
def main(args: Array[String]): Unit = {
-
//1.创建SparkConf并设置APP名称
-
val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
-
-
//2.创建SparkContext对象
-
val sc = new SparkContext(conf)
-
-
//3.创建RDD
-
val rdd1: RDD[Int] = sc.makeRDD(List(1, 3, 4, 3, 1, 4, 3, 7),2)
-
-
val result: RDD[Int] = rdd1.distinct(3)
-
-
result.mapPartitionsWithIndex(
-
(index,list) => list.map((index,_))).collect().foreach(println)
-
-
//4.关闭sc
-
sc.stop()
-
}
输出结果:
-
(0,3)
-
(1,4)
-
(1,1)
-
(1,7)
2.3.1.9 coalesce()合并分区
Coalesce算子包括:配置执行Shuffle和配置不执行Shuffle两种方式。
(1)函数签名:
def coalesce(numPartitions: Int, shuffle: Boolean = false, //默认false不执行shuffle
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null) : RDD[T]
(2)功能说明:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。
1.不执行shuffle方式
-
def main(args: Array[String]): Unit = {
-
//1.创建SparkConf并设置APP名称
-
val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
-
-
//2.创建SparkContext对象
-
val sc = new SparkContext(conf)
-
-
//3.创建RDD
-
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 4)
-
-
//缩减分区
-
val result: RDD[Int] = rdd1.coalesce(2)
-
-
result.mapPartitionsWithIndex(
-
(index,list) => list.map((index,_))).collect().foreach(println)
-
-
//4.关闭sc
-
sc.stop()
-
}
输出结果:
-
(0,1)
-
(0,2)
-
(1,3)
-
(1,4)
2.执行shuffle方式
-
//缩减分区
-
val result: RDD[Int] = rdd1.coalesce(2,true)
-
-
result.mapPartitionsWithIndex(
-
(index,list) => list.map((index,_))).collect().foreach(println)
输出结果:
-
(0,1)
-
(0,3)
-
(0,4)
-
(1,2)
2.3.1.10 repartition()重新分区
(1)函数签名: def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
(2)功能说明:该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,repartition操作都可以完成,因为无论如何都会经shuffle过程。
coalesce和repartition区别:
(1)coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
(2)repartition实际上是调用的coalesce,进行shuffle。源码如下:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)}
(3)coalesce一般为缩减分区,如果扩大分区,不使用shuffle是没有意义的,repartition扩大分区执行shuffle。
2.3.1.11 sortBy()排序
(1)函数签名:def sortBy[K]( f: (T) => K,ascending: Boolean = true, // 默认为正序排列
numPartitions: Int = this.partitions.length) (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
(2)功能说明:该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。存在shuffle过程。
-
def main(args: Array[String]): Unit = {
-
//1.创建SparkConf并设置APP名称
-
val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
-
-
//2.创建SparkContext对象
-
val sc = new SparkContext(conf)
-
-
//3.创建RDD
-
val rdd1: RDD[Int] = sc.makeRDD(List(2, 5, 1, 3, 6, 4),2)
-
-
//默认升序排列
-
val result: RDD[Int] = rdd1.sortBy(i => i)
-
-
//改为降序排列
-
val result1: RDD[Int] = rdd1.sortBy(i => i, false)
-
-
result.mapPartitionsWithIndex(
-
(index,list) => list.map((index,_))).collect().foreach(println)
-
-
println("=====================================")
-
-
result1.mapPartitionsWithIndex(
-
(index,list) => list.map((index,_))).collect().foreach(println)
-
//4.关闭sc
-
sc.stop()
-
}
输出结果:
-
(0,1)
-
(0,2)
-
(0,3)
-
(1,4)
-
(1,5)
-
(1,6)
-
=====================================
-
(0,6)
-
(0,5)
-
(0,4)
-
(1,3)
-
(1,2)
-
(1,1)
注意:Spark的排序能够保证全局有序,根据分区号的大小来排序,分区号小的在前面,默认情况下0号分区的数据统一都小于1号分区的数据。使用range分区,每一个分区有对应自己的值范围大小,将同一个范围内的key放入到同一个分区内。
2.3.2 双Value类型交互
2.3.2.1 intersection()交集
(1)函数签名:def intersection(other: RDD[T]): RDD[T]
(2)功能说明:对源RDD和参数RDD求交集后返回一个新的RDD。数据会打散重新分区,存在shuffle过程。
-
def main(args: Array[String]): Unit = {
-
//1.创建SparkConf并设置APP名称
-
val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
-
-
//2.创建SparkContext对象
-
val sc = new SparkContext(conf)
-
-
//3.创建RDD
-
val rdd1: RDD[Int] = sc.makeRDD(List(4, 3, 2, 1), 2)
-
-
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6), 2)
-
-
//交集
-
val result: RDD[Int] = rdd1.intersection(rdd2)
-
-
result.mapPartitionsWithIndex(
-
(index,list) => list.map((index,_))).collect().foreach(println)
-
-
//4.关闭sc
-
sc.stop()
-
}
输出结果:
-
(0,4)
-
(1,3)
2.3.2.2 union()并集不去重
(1)函数签名:def union(other: RDD[T]): RDD[T]
(2)功能说明:对源RDD和参数RDD求并集后返回一个新的RDD。数据不会打散重新分区,最终的分区个数就是rdd的分区数之和,简单地将数据一个分区一个分区拿过来。
-
//创建RDD
-
val rdd1: RDD[Int] = sc.makeRDD(List(4, 3, 2, 1), 2)
-
-
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6), 2)
-
-
-
//并集
-
val result: RDD[Int] = rdd1.union(rdd2)
-
-
result.mapPartitionsWithIndex(
-
(index,list) => list.map((index,_))).collect().foreach(println)
输出结果:
-
(0,4)
-
(0,3)
-
(1,2)
-
(1,1)
-
(2,3)
-
(2,4)
-
(3,5)
-
(3,6)
2.3.2.3 subtract()差集
(1)函数签名:def subtract(other: RDD[T]): RDD[T]
(2)功能说明:计算差的一种函数,去除两个RDD中相同元素,不同的RDD将保留下来。数据需要打散重新分区,存在shuffle过程。
-
//创建RDD
-
val rdd1: RDD[Int] = sc.makeRDD(List(4, 3, 2, 1), 2)
-
-
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6), 2)
-
-
-
//差集
-
val result: RDD[Int] = rdd1.subtract(rdd2)
-
-
result.mapPartitionsWithIndex(
-
(index,list) => list.map((index,_))).collect().foreach(println)
输出结果:
-
(0,2)
-
(1,1)
2.3.2.4 zip()拉链
(1)函数签名:def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
(2)功能说明:该操作可以将两个RDD中的元素以键值对的形式进行合并。键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的元素。
将两个RDD组合成Key-Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
-
//创建RDD
-
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3), 3)
-
-
val rdd2: RDD[String] = sc.makeRDD(List("a", "b", "c"),3)
-
-
//拉链
-
val result: RDD[(Int, String)] = rdd1.zip(rdd2)
-
-
result.mapPartitionsWithIndex(
-
(index,list) => list.map((index,_))).collect().foreach(println)
输出结果:
-
(0,(1,a))
-
(1,(2,b))
-
(2,(3,c))
2.3.3 Key-Value类型
2.3.3.1 partitionBy()按照K重新分区
(1)函数签名:def partitionBy(partitioner: Partitioner): RDD[(K, V)]
(2)功能说明:将RDD[K,V]中的K按照指定Partitioner(分区器)重新进行分区;如果原有的RDD和新的RDD是一致的话就不进行分区,否则会产生Shuffle过程。不能针对单value数据进行调用,必须使用二元组。
-
//创建RDD
-
val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1, "aaa"), (2, "bbb"), (3, "ccc")),3)
-
-
//使用分区器进行分区,此处使用hash分区器
-
//系统的hash分区器会对key进行hashcode,之后对填写的分区个数取模,得到分区号
-
val result: RDD[(Int, String)] = rdd1.partitionBy(new HashPartitioner(2))
-
-
result.mapPartitionsWithIndex(
-
(index,list) => list.map((index,_))).collect().foreach(println)
输出结果:
-
(0,(2,bbb))
-
(1,(1,aaa))
-
(1,(3,ccc))
2.3.3.2 自定义分区
要实现自定义分区器,需要继承org.apache.spark.Partitioner类,并实现下面三个方法:
(1)numPartitions: Int:返回创建出来的分区数。
(2)getPartition(key: Any): Int:返回给定键的分区编号(0到numPartitions-1)。
(3)equals():Java 判断相等性的标准方法。这个方法的实现非常重要,Spark需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样Spark才可以判断两个RDD的分区方式是否相同。
注意:Spark的分区器只能根据key进行分区。
-
object customPartitioner {
-
def main(args: Array[String]): Unit = {
-
//1.创建SparkConf并设置APP名称
-
val conf: SparkConf = new SparkConf().setAppName("SparkCore").setMaster("local[*]")
-
-
//2.创建SparkContext对象
-
val sc = new SparkContext(conf)
-
-
//3.创建RDD
-
val rdd1: RDD[(Int, String)] = sc.makeRDD(List((1, "aaa"), (2, "bbb"), (3, "ccc"), (4, "ddd")), 4)
-
-
//使用自定义分区器进行分区
-
val result: RDD[(Int, String)] = rdd1.partitionBy(new myPartitioner(3))
-
-
result.mapPartitionsWithIndex(
-
(index, list) => list.map((index, _))).collect().foreach(println)
-
-
println("====================================================")
-
-
//实现匿名子类自定义分区器
-
//需求:根据key的不同类型进行分区,将int值放入1号分区,str值放入2号分区,其余放入0号分区
-
val rdd2: RDD[(Any, Int)] = sc.makeRDD(List((1, 1), (2, 2), (3.1, 3), ("a", 4), ("b", 5)), 1)
-
-
val result1: RDD[(Any, Int)] = rdd2.partitionBy(new Partitioner {
-
override def numPartitions = 3
-
-
override def getPartition(key: Any) = {
-
key match {
-
case i: Int => 1
-
case s: String => 2
-
case _ => 0
-
}
-
}
-
})
-
result1.mapPartitionsWithIndex(
-
(index, list) => list.map((index, _))).collect().foreach(println)
-
-
//4.关闭sc
-
sc.stop()
-
}
-
}
-
-
class myPartitioner(num: Int) extends Partitioner {
-
//该分区器有几个分区
-
override def numPartitions: Int = num
-
-
//具体分区逻辑:根据传入数据的key返回对应的分区号
-
override def getPartition(key: Any): Int = {
-
key match {
-
case i: Int => i % num
-
case s: String => s.hashCode % num
-
//所有返回的分区号都不能超过分区个数的值
-
case _ => 0
-
}
-
}
-
}
输出结果:
-
(0,(3,ccc))
-
(1,(1,aaa))
-
(1,(4,ddd))
-
(2,(2,bbb))
-
=======================
-
(0,(3.1,3))
-
(1,(1,1))
-
(1,(2,2))
-
(2,(a,4))
-
(2,(b,5))
2.3.3.3 reduceByKey()按照K聚合V
(1)函数签名:def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
(2)功能说明:该操作可以将RDD[K,V]中的元素按照相同的K对V进行聚合。其存在多种重载形式,还可以设置新RDD的分区数。按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。
-
//创建RDD
-
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("a", 5), ("b", 2)))
-
-
//需求:计算相同key对应值的相加结果
-
//res初始值为第一个元素
-
val result: RDD[(String, Int)] = rdd1.reduceByKey((res, elem) => res elem)
-
-
result.collect().foreach(println)
输出结果:
-
(a,6)
-
(b,7)
2.3.3.4 groupByKey()按照K重新分组
(1)函数签名:def groupByKey(): RDD[(K, Iterable[V])]
(2)功能说明:对每个key进行操作,但只生成一个seq,并不进行聚合。按照key进行分组,直接进行shuffle,后面的集合中只有单独一个value值。该操作可以指定分区器或者分区数(默认使用HashPartitioner)。
-
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("a", 5), ("b", 2)))
-
-
val result: RDD[(String, Iterable[Int])] = rdd1.groupByKey()
-
-
result.collect().foreach(println)
输出结果:
-
(a,CompactBuffer(1, 5))
-
(b,CompactBuffer(5, 2))
2.3.3.5 aggregateByKey()按照K处理分区内和分区间逻辑
(1)函数签名:def aggregateByKey[U: Class Tag](zeroValue:U)(seqOp:(U,V)=>U,combOp:(U,U)=>U):RDD[(K,U)]
①zeroValue(初始值):给每一个分区中的每一种key一个初始值,只在分区内使用
②seqOp(分区内):函数用于在每一个分区中用初始值逐步迭代value
③combOp(分区间):函数用于合并每个分区中的结果,初始值为第一个元素
-
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5), ("b", 6)), 2)
-
-
//需求:取出每个分区相同key对应值的最大值然后相加
-
val result: RDD[(String, Int)] = rdd1.aggregateByKey(0)((res, elem) => math.max(res, elem), (res, elem) => (res elem))
-
//使用匿名函数化简后可写为:rdd1.aggregateByKey(0)(math.max(_,_),_ _)
-
-
result.collect().foreach(println)
输出结果:
-
(b,8)
-
(a,8)
2.3.3.6 foldByKey()分区内和分区间相同的aggregateByKey()
(1)函数签名: def foldByKey(zeroVlaue:V)(func:(V,V) => V):RDD[(K,V)]
zeroVlaue:初始化值,可以是任意类型
func:函数,两个输入参数相同
(2)功能说明:是aggreegateByKey的简化操作,seqOp和combOp相同,即分区内逻辑和分区间逻辑相同。
-
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("b", 4), ("a", 5), ("b", 6)), 2)
-
-
val result: RDD[(String, Int)] = rdd1.foldByKey(10)(_ _)
-
-
result.collect().foreach(println)
输出结果:
-
(b,32)
-
(a,29)
2.3.3.7 combineByKey()转换结构后分区内和分区间操作
(1)函数签名: def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
(1)createCombiner(转换数据的结构): combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值
(2)mergeValue(分区内): 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并
(3)mergeCombiners(分区间): 由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。
(2)功能说明:针对相同K,将V合并成一个集合。
-
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)),2)
-
-
//需求:计算每种key对应值的和,再根据key计算每种key的平均值
-
//先进行分区内累加 ("a",88) ("a",91) => "a",(179,2)
-
val result: RDD[(String, (Int, Int))] = rdd1.combineByKey(
-
//转换结构 "a",88 => (88,1)
-
i => (i, 1),
-
// "a",(88,1) (91,1) => (179,2)
-
(c: (Int, Int), elem) => (c._1 elem, c._2 1),
-
// "a",(179,2) (95,1) => (274,3)
-
(c1: (Int, Int), c2: (Int, Int)) => (c1._1 c2._1, c1._2 c2._2)
-
)
-
-
result.collect().foreach(println)
-
-
println("=============================")
-
//偏函数写法求平均值
-
val value1: RDD[(String, Double)] = result.map({
-
case (key, value) => (key, value._1.toDouble / value._2)
-
})
-
-
//其他写法
-
val value2: RDD[(String, Double)] = result.map(tuple => (tuple._1, tuple._2._1.toDouble / tuple._2._2))
-
-
value1.collect().foreach(println)
-
println("==========================")
-
value2.collect().foreach(println)
输出结果:
-
(b,(286,3))
-
(a,(274,3))
-
======================
-
(b,95.33333333333333)
-
(a,91.33333333333333)
-
======================
-
(b,95.33333333333333)
-
(a,91.33333333333333)
2.3.3.8 reduceByKey、foldByKey、aggregateByKey、combineByKey的联系
reduceByKey:底层调用combineByKey,没有初始值,使用第一个元素作为初始值,分区内和分区间逻辑一致
foldByKey:底层调用combineByKey,有初始值,分区内和分区间计算逻辑一致
aggregateByKey:底层调用combineByKey,有初始值,分区内和分区间计算逻辑可以不一致,更灵活
combineByKey:有初始值,并且初始值还支持改变数据结构,最灵活
2.3.3.9 sortByKey()按照K进行排序
(1)函数签名: def sortByKey(ascending: Boolean = true, // 默认,升序
numPartitions: Int = self.partitions.length) : RDD[(K, V)]
(2)功能说明:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
-
val rdd1: RDD[(Int, String)] = sc.makeRDD(List((3, "aa"), (6, "cc"), (2, "bb"), (1, "dd")))
-
-
//默认正序排序
-
val result: RDD[(Int, String)] = rdd1.sortByKey()
-
-
//使用倒序排序
-
val result1: RDD[(Int, String)] = rdd1.sortByKey(false)
-
-
result.collect().foreach(println)
-
-
println("====================")
-
-
result1.collect().foreach(println)
输出结果:
-
(1,dd)
-
(2,bb)
-
(3,aa)
-
(6,cc)
-
==========
-
(6,cc)
-
(3,aa)
-
(2,bb)
-
(1,dd)
2.3.3.10 mapValues()只对V进行操作
(1)函数签名:def mapValues[U](f: V => U): RDD[(K, U)]
(2)功能说明:针对于(K,V)形式的类型只对V进行操作
-
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)))
-
-
//将元素值以二倍值返回
-
val result: RDD[(String, Int)] = rdd1.mapValues(_ * 2)
-
-
result.collect().foreach(println)
输出结果:
-
(a,2)
-
(b,4)
-
(c,6)
-
(d,8)
2.3.3.11 join()等同于sql里的内连接,关联上的要,关联不上的舍弃
(1)函数签名: def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
(2)功能说明:在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
-
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("c", 2), ("d", 6)))
-
-
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("e", 2), ("f", 6)))
-
-
//join等同于sql中的内连接
-
//能够join上的值保留,反之删除
-
val result: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
-
-
result.collect().foreach(println)
输出结果:
-
(a,(1,1))
-
(b,(5,5))
2.3.3.12 cogroup()类似于sql的全连接,但是在同一个RDD中对key聚合
(1)函数签名:def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
(2)功能说明:在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD。操作两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。
-
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("c", 2), ("d", 6)))
-
-
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 5), ("e", 3), ("f", 4)))
-
-
//类似于sql的满外连接
-
//先将当前rdd中的同一个key数据聚合为一个集合,之后用二元组形式表现出来
-
val result: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
-
-
result.collect().foreach(println)
输出结果:
-
(a,(CompactBuffer(1),CompactBuffer(1)))
-
(b,(CompactBuffer(5),CompactBuffer(5)))
-
(c,(CompactBuffer(2),CompactBuffer()))
-
(d,(CompactBuffer(6),CompactBuffer()))
-
(e,(CompactBuffer(),CompactBuffer(3)))
-
(f,(CompactBuffer(),CompactBuffer(4)))
2.4 Action行动算子
行动算子是触发了整个作业的执行。因为转换算子都是懒加载,并不会立即执行。
2.4.1 collect()以数组的形式返回数据集
(1)函数签名:def collect(): Array[T]
(2)功能说明:在驱动程序中,以数组Array的形式返回数据集的所有元素。Executor端所有的数据都会被拉取到Drive端,会按照分区的编号按顺序收集数据,最终结果全局有序。
2.4.2 count()返回RDD中元素个数
(1)函数签名:def count(): Long
(2)功能说明:返回RDD中元素的个数
-
val rdd1: RDD[Int] = sc.makeRDD(1 to 100, 16)
-
-
val result: Long = rdd1.count()
-
-
println(result)
输出结果:
100
2.4.3 first()返回RDD中的第一个元素
(1)函数签名:def first(): T
(2)功能说明:返回RDD中的第一个元素
-
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
-
-
val i: Int = rdd1.first()
-
-
println(i)
输出结果:
1
2.4.4 take()返回由RDD前n个元素组成的数组
(1)函数签名:def take(num: Int): Array[T]
(2)功能说明:返回一个由RDD的前n个元素组成的数组,默认按照全局有序进行拉取数据。
-
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3), ("d", 4)), 2)
-
-
val result: Array[(String, Int)] = rdd1.take(2)
-
-
result.foreach(println)
输出结果:
-
(a,1)
-
(b,2)
2.4.5 takeOrdered()返回该RDD排序后前n个元素组成的数组
1)函数签名:def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
2)功能说明:返回该RDD排序后的前n个元素组成的数组,默认按照升序排序。
-
val rdd1: RDD[Int] = sc.makeRDD(List(5, 8, 3, 1, 6, 7, 2))
-
-
//默认升序
-
val result: Array[Int] = rdd1.takeOrdered(3)
-
-
//降序排列 rdd1.takeOrdered(3)(Ordering[Int].reverse)
-
-
result.foreach(println)
输出结果:
-
1
-
2
-
3
2.4.6 countByKey()统计每种key的个数
(1)函数签名:def countByKey(): Map[K, Long]
(2)功能说明:统计每种key的个数,需要RDD中的数据是二元组,返回结果为map。
-
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("a", 4)))
-
-
val result: collection.Map[String, Long] = rdd1.countByKey()
-
-
result.foreach(println)
输出结果:
-
(a,3)
-
(b,1)
2.4.7 save相关算子
(1)saveAsTextFile(path)保存成Text文件
功能说明:将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本。
(2)saveAsSequenceFile(path) 保存成Sequencefile文件
功能说明:将数据集中的元素以Hadoop Sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。只有kv类型RDD有该操作,单值的没有。
(3)saveAsObjectFile(path) 序列化成对象保存到文件
功能说明:用于将RDD中的元素序列化成对象,存储到文件中。
2.4.8 foreach()遍历RDD中每一个元素
(1)函数签名:def foreach(f:T => Unit) : Unit
(2)功能说明:遍历RDD中的每一个元素,并依次应用f函数。
注意:rdd.collect().foreach(println),在Driver端执行foreach打印,此处的foreach是方法,打印结果是全局有序的。
rdd.foreach(println),在Executor端执行foreach打印,此处的foreach是行动算子,是分布式的,打印结果整体无序。
2.5 RDD序列化
初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,涉及到了跨进程通信,对象需要经过序列化变成byte数组才能传输,(不序列化会报错,无法运行)。
Spark自带闭包检查,有闭包也是需要序列化的。算子以外的代码都是在Driver端执行,算子里面的代码都是在Executor端执行。
序列化的方法:(1)类继承scala.Serializable接口(2)把类变成样例类(case class),样例类默认是序列化的。
当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化。即使使用Kryo序列化,也要继承Serializable接口或者使用样例类。
使用Kryo序列化的方法:
new SparkConf().set("spark.serializer","org.apache.spark.serializer.KryoSerializer").registerKryoClasses(Array(classOf[自定义类名]))
2.6 RDD依赖关系
2.6.1 血缘关系
RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
查看血缘关系的方法:toDebugString
-
val rdd: RDD[String] = sc.textFile("input/1.txt")
-
-
//查看血缘关系
-
println(rdd.toDebugString)
-
-
println("=========================")
-
-
val rdd1: RDD[String] = rdd.flatMap(_.split(""))
-
-
//查看血缘关系
-
println(rdd1.toDebugString)
-
-
println("=========================")
-
-
val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
-
-
//查看血缘关系
-
println(rdd2.toDebugString)
-
-
println("=========================")
-
-
val rdd3: RDD[(String, Int)] = rdd2.reduceByKey(_ _)
-
-
//查看血缘关系
-
println(rdd3.toDebugString)
输出结果:会输出当前RDD往上所有的血缘关系。
-
(2) input/1.txt MapPartitionsRDD[1] at textFile at Test01_Dependency.scala:19 []
-
| input/1.txt HadoopRDD[0] at textFile at Test01_Dependency.scala:19 []
-
=========================
-
(2) MapPartitionsRDD[2] at flatMap at Test01_Dependency.scala:26 []
-
| input/1.txt MapPartitionsRDD[1] at textFile at Test01_Dependency.scala:19 []
-
| input/1.txt HadoopRDD[0] at textFile at Test01_Dependency.scala:19 []
-
=========================
-
(2) MapPartitionsRDD[3] at map at Test01_Dependency.scala:33 []
-
| MapPartitionsRDD[2] at flatMap at Test01_Dependency.scala:26 []
-
| input/1.txt MapPartitionsRDD[1] at textFile at Test01_Dependency.scala:19 []
-
| input/1.txt HadoopRDD[0] at textFile at Test01_Dependency.scala:19 []
-
=========================
-
(2) ShuffledRDD[4] at reduceByKey at Test01_Dependency.scala:40 []
-
-(2) MapPartitionsRDD[3] at map at Test01_Dependency.scala:33 []
-
| MapPartitionsRDD[2] at flatMap at Test01_Dependency.scala:26 []
-
| input/1.txt MapPartitionsRDD[1] at textFile at Test01_Dependency.scala:19 []
-
| input/1.txt HadoopRDD[0] at textFile at Test01_Dependency.scala:19 []
2.6.2 依赖关系
查看依赖关系的方法:dependencies
-
val rdd: RDD[String] = sc.textFile("input/1.txt")
-
-
//查看依赖关系
-
println(rdd.dependencies)
-
-
println("=========================")
-
-
val rdd1: RDD[String] = rdd.flatMap(_.split(""))
-
-
//查看依赖关系
-
println(rdd1.dependencies)
-
-
println("=========================")
-
-
val rdd2: RDD[(String, Int)] = rdd1.map((_, 1))
-
-
//查看依赖关系
-
println(rdd2.dependencies)
-
-
println("=========================")
-
-
val rdd3: RDD[(String, Int)] = rdd2.reduceByKey(_ _)
-
-
//查看依赖关系
-
println(rdd3.dependencies)
输出结果:
-
List(org.apache.spark.OneToOneDependency@80bfdc6)
-
=========================
-
List(org.apache.spark.OneToOneDependency@46963479)
-
=========================
-
List(org.apache.spark.OneToOneDependency@3dffc764)
-
=========================
-
List(org.apache.spark.ShuffleDependency@6ed043d3)
RDD之间的关系可以从两个维度来理解:一个是RDD是从哪些RDD转换而来,也就是RDD的parent RDD(s)是什么(血缘); 另一个就是RDD依赖于parent RDD(s)的哪些Partition(s),这种关系就是RDD之间的依赖。
RDD和它依赖的父RDD(s)的依赖关系有两种不同的类型,即窄依赖(NarrowDependency)和宽依赖(ShuffleDependency)。
窄依赖:表示每一个父RDD的Partition最多被子RDD的一个Partition使用(一对一or多对一),比喻为独生子女。
宽依赖:表示同一个父RDD的Partition被多个子RDD的Partition依赖(只能是一对多),会引起Shuffle,比喻为超生。
具有宽依赖的transformations包括:sort、reduceByKey、groupByKey、join和调用rePartition函数的任何操作。
宽依赖对Spark去评估一个transformations有更加重要的影响,比如对性能的影响。在不影响业务要求的情况下,要尽量避免使用有宽依赖的转换算子,因为有宽依赖,就一定会走shuffle,影响性能。
2.6.3 Stage任务划分
2.6.3.1 DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。
2.6.3.2 RDD任务切分中间分为:Application、Job、Stage和Task
(1)Application:初始化一个SparkContext即生成一个Application;
(2)Job:一个Action算子就会生成一个Job;
(3)Stage:Stage等于宽依赖的个数加1;
(4)Task:一个Stage阶段中,最后一个RDD的分区个数就是Task的个数。
注意:Application->Job->Stage->Task每一层都是1对n的关系。
2.7 RDD持久化
2.7.1 RDD Cache缓存
RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action算子时,该RDD将会被缓存在计算节点的内存中,并供后面重用。cache操作会增加血缘关系,不改变原有的血缘关系。
缓存级别介绍
注意:默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。SER:表示序列化。
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。
2.7.2 RDD CheckPoint检查点
(1)检查点:通过将RDD中间结果写入磁盘。
(2)为什么要做检查点?由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。
(3)检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统
(4)检查点数据存储格式为:二进制的文件
(5)检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。
(6)检查点触发时间:对RDD进行Checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍。
(7)设置检查点步骤
(1)设置检查点数据存储路径:sc.setCheckpointDir("./checkpoint1")
(2)调用检查点方法:wordToOneRdd.checkpoint()
2.7.3 缓存和检查点区别
(1)Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
(2)Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
(3)建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
(4)如果使用完了缓存,可以通过unpersist()方法释放缓存
2.8 键值对RDD数据分区
Spark目前支持Hash分区、Range分区和用户自定义分区。Hash分区为当前的默认分区。分区器直接决定了RDD中分区的个数、RDD中每条数据经过Shuffle后进入哪个分区和Reduce的个数。
注意:(1)只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区的值是None
(2)每个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。
(3)需要使用到分区器时才会往下游RDD添加分区器。涉及到分组时使用Hash分区器,涉及到排序时使用Range分区器。
2.8.1 Hash分区
HashPartitioner分区的原理:对于给定的key,计算其hashCode,并除以分区的个数取余,如果余数小于0,则用余数 分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。
HashPartitioner分区弊端:可能导致每个分区中数据量的不均匀,极端情况下会导致某些分区拥有RDD的全部数据(数据倾斜)。可以使用预聚合减轻这种情况。
2.8.2 Ranger分区
RangePartitioner作用:将一定范围内的数映射到某一个分区内,尽量保证每个分区中数据量均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大,但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。
实现过程为:第一步:先从整个RDD中采用水塘抽样算法,抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;第二步:判断key在rangeBounds中所处的范围,给出该key值在下一个RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的。
2.9 累加器(和RDD平级)
累加器:分布式共享只写变量。(Executor和Executor之间不能读数据)累加器用来把Executor端变量信息聚合到Driver端。在Driver中定义的一个变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行合并计算。
2.9.1 系统累加器
(1)累加器定义(SparkContext.accumulator(initialValue)方法)
val sum: LongAccumulator = sc.longAccumulator("sum")
(2)累加器添加数据(累加器.add方法)
sum.add(count)
(3)累加器获取数据(累加器.value)
sum.value
注意:①Executor端的任务不能读取累加器的值(例如:在Executor端调用sum.value,获取的值不是累加器最终的值)。因此我们说,累加器是一个分布式共享只写变量。②累加器要放在行动算子中,因为转换算子执行的次数取决于job的数量,如果一个spark应用有多个行动算子,那么转换算子中的累加器可能会发生不止一次更新,导致结果错误。所以,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动算子中。对于在行动算子中使用的累加器,Spark只会把每个Job对各累加器的修改应用一次。因为行动算子只会执行一次,而转换算子的执行次数不确定。
2.9.2 自定义累加器
自定义累加器使用步骤
(1)继承AccumulatorV2,设定输入、输出泛型
(2)重写6个抽象方法
(3)使用自定义累加器需要注册::sc.register(累加器,"累加器名字")
2.10 广播变量
广播变量:分布式共享只读变量。广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,以供一个或多个Spark Task操作使用。比如,如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量用起来会很顺手。在多个Task并行操作中使用同一个变量,但是Spark会为每个Task任务分别发送。
使用步骤:
(1)调用SparkContext.broadcast(广播变量)创建出一个广播对象,任何可序列化的类型都可以这么实现。
(2)通过广播变量.value,访问该对象的值。
(3)广播变量只会被发到各个节点一次,作为只读值处理(修改这个值不会影响到别的节点)。
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhiaccee
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22