sparkstreaming 和 kafka重分区的场景应用
sparkstreaming 与 kafka重分区的场景应用
昨天线上发现个bug,导致数据的重复,原因如下
线上场景是二个sparkstreaming程序。程序1主要是用来接收数据放入kafka集群,程序2读取数据进行处理,redis数据缓存。因为数据量很大,所以在程序1上先用reduceByKey去重。
程序1发送使用的是Avro序列化对象,要把固定条数一批数据都放在一个Avro对象然后传输到Kafka
对于kafka的分区器:
1 如果消息自带key则对key可以hash然后选择目标分区;
2 如果消息无key则采用RoundRobin轮询算法,这样可以最大限度确保消息在所有分区的均匀性;
3 特别的,生产者API赋予用户自行指定分区的权利,在发送消息时如果指定了分区则可以跳过以上分区法则。
程序一开始没有指定key进行发送的情况
(https://img-blog.csdnimg.cn/c8075a7606f949d5be566ba492038458.png
这里的程序二的场景是通过redis存储用户的信息 更新信息,如果sparkStreaming同一批次 存在同一个用户的多条记录去get redis ,因为redis更新不在这一阶段,如果满足业务条件的记录有多条那就可能会出现业务数据的冗余。
bug的原因找到了,解决的方法非常直观 就是程序二的sparkStreaming一批次在这一个rdd中同一个用户的数据只能有一个。
这里满足条件可以分为二个方面
1.一个用户消息在只能存在同一个分区
先来看怎么满足第一个条件,程序一通过reduceByKey将一个用户消息放置在同一个分区,之前因为发送时Producer没有指定key所以导致发送时类轮询发送。
此时可能很容易想到指定用户唯一标识为key就行,但发送采用的是Avro对象,必定会存在一个对象存储多个用户信息,reduceByKey的分区策略和Kafka默认的分区策略肯定不相同。
思考片刻后发现如果要把一个对象里所有的用户数据都发到一个分区,也就是他们分区算法要相同 ,且分区也要相同
reduceByKey的可以指定分区策略(默认HashPartitioner),KafkaProducer也可以
这二个分区算法是我这边可控的,但还有一个因素就是分区数,程序一的分区要和kafka的分区保持一致,所以如果kafka增加了分区,那么我们这边程序应该实时的感知到,然后改变分区
程序一自定义reduceByKey和kafka分区器代码如下
这里的用socket流演示下
import BroadcastUtil.{parFunc, parFunc1}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Durations, StreamingContext}
import java.util
import java.util.Properties
import java.util.concurrent.{LinkedBlockingDeque, ThreadPoolExecutor, TimeUnit}
import scala.util.Random
object Hash_Partition {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("SparkStreamingSocket").setMaster("local[4]")
val ssc = new StreamingContext(conf, Durations.seconds(5))
ssc.sparkContext.setLogLevel("ERROR")
BroadcastUtil.creatInstance(ssc.sparkContext)
val properties = new util.HashMap[java.lang.String,Object]()
properties.put("bootstrap.servers", "localhost:9092")
properties.put("key.serializer", classOf[StringSerializer])
properties.put("value.serializer", classOf[StringSerializer])
val kafkaProducer:Broadcast[KafkaSink[String,String]] = ssc.sparkContext.broadcast(KafkaSink(properties))
val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.21.160", 6666)
inputDS.foreachRDD(rdd => {
ThreadPoolUtil.threadPoolExecutor.execute(new Runnable {
override def run(): Unit = BroadcastUtil.update(rdd.sparkContext)
})
val rdd1 = rdd.map((_, Random.nextInt(10))).reduceByKey(new Partitioner {
override def getPartition(key: Any): Int = {
parFunc(key.toString, numPartitions)
}
override def numPartitions: Int = BroadcastUtil.getInstance().value
}, (v1, v2) => {
if (v1 > v2) v1 else v2
})
rdd1.mapPartitionsWithIndex((index, it) => {
it.map(f => {
println(s"key:${f._1} 当前分区: ${index} ")
f
})
}).collect()
rdd1.foreachPartition(partition => {
//var Num = 0
partition.foreach(f => {
/**
* //这里一般会将多条数据写到一个Avro对象里
*
* GenericRecord.read.(ByteArrayInputStream(f))
* if(Num%一批数量){
* kafkaProducer.value.send(GenericRecord)
* }
* Num =1
*/
val producer:KafkaSink[String,String] = kafkaProducer.value
producer.send("topic_hash", parFunc1(f._1, BroadcastUtil.getInstance().value), null, f._1)
})
})
})
ssc.start()
ssc.awaitTermination()
}
}
线程池类
object ThreadPoolUtil{
var threadPool:ThreadPoolExecutor = null
// 创建单线程-线程池,任务依次执行
def threadPoolExecutor:ThreadPoolExecutor = {
if (threadPool==null) {
threadPool = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable](2), new ThreadPoolExecutor.CallerRunsPolicy)
}
threadPool
}
}
广播变量单例类
object BroadcastUtil extends Serializable {
@volatile private var instance: Broadcast[Int] = null
def creatInstance(sc: SparkContext): Unit = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.broadcast(getKafkaPartitionNums("topic_hash"))
}
}
}
}
def getInstance(): Broadcast[Int] = {
instance
}
def update(sc: SparkContext): Unit = {
val newPartitionNum = getKafkaPartitionNums("topic_hash")
if (instance != null && newPartitionNum != instance) {
instance.unpersist()
instance = sc.broadcast(newPartitionNum)
}
}
private[this] def getKafkaPartitionNums(topic: String): Int = {
val partitionNums = getProducer().partitionsFor(topic).size()
// println(s"${new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(System.currentTimeMillis())} upadte:newPartitionNum ${partitionNums}")
getProducer().close()
producer = null
partitionNums
}
private var producer: KafkaProducer[String, String] = null
private def getProducer(): KafkaProducer[String, String] = {
if (producer == null) {
val properties = new Properties()
properties.put("bootstrap.servers", "localhost:9092")
properties.put("key.serializer", classOf[StringSerializer])
properties.put("value.serializer", classOf[StringSerializer])
producer = new KafkaProducer[String, String](properties)
}
producer
}
def parFunc(key: String, numPartitions: Int): Int = {
val partition = key.hashCode().abs % numPartitions
partition
}
def parFunc1(key: String, numPartitions: Int): Int = {
val partition = parFunc(key, numPartitions)
println(s"key:${key} 指定分区:${partition} 总分区:${numPartitions}")
partition
}
}
封装的kafkaProducer类
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import java.util.concurrent.Future
class KafkaSink[K,V](createProducer: () => KafkaProducer[K,V]) extends Serializable {
lazy val producer: KafkaProducer[K,V] = createProducer()
def send(topic: String, partition:Int,key:K, value: V): Future[RecordMetadata] =
{
producer.send(new ProducerRecord[K,V](topic,partition,key,value),new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
println("发送成功, 累加器 1 & 入库 1")
}
})
}
def flush() = producer.flush()
}
object KafkaSink {
def apply[K, V](config:java.util.Map[java.lang.String, Object]): KafkaSink[K, V] = {
val createProducerFunc: () => KafkaProducer[K, V] = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
producer.close()
}
producer
}
new KafkaSink[K, V](createProducerFunc)
}
}
创建topic 5个分区
.\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 5 --topic topic_hash
启动程序
依次输入 a b c d e f g h i j k l
订阅0分区,j和k是0分区
![在这里插入图片描述](https://img-blog.csdnimg.cn/f582ca3cf690464b98c98770504e50f8.png
当前分区代表spark分区,指定分区代表Kafka的分区
consumer代码
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.{ TopicPartition}
import org.apache.kafka.common.serialization.{StringDeserializer}
import java.util.{Properties}
object KafkaConsumer {
def main(args: Array[String]): Unit = {
val properties = new Properties()
properties.put("bootstrap.servers", "localhost:9092")
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
properties.put("auto.offset.reset", "earliest")
properties.put("enable.auto.commit", "true")
properties.put("group.id", "group0231")
val consumer = new KafkaConsumer[String, String](properties)
consumer.assign(java.util.Arrays.asList(new TopicPartition("topic_hash",0)))
while (true){
val records: ConsumerRecords[String, String] = consumer.poll(100)
import scala.collection.convert.wrapAll._
for (record <- records){
val topicName: String = record.topic()
val partition: Int = record.partition()
val offset: Long = record.offset()
val msg: String = record.value()
println(s"receive: ${msg}\t${topicName}\t${partition}\t${offset}")
}
}
}
}
通过改变kafka集群分区,sparkStreaming分区会感知到(使用通过更新广播变量)
2.同一个分区内用户消息只能存在一个
现在从程序一上看似乎是每一批次同一个用户只存在一个
但从程序二上看,因为接的批次不同,尽管同一用户不再分布在各个分区,但是同一分区会存在同一用户多条记录
这个看似可以在程序二上通过去重解决,但是忽略了业务场景,以下简略列出使用 简单的去重/不使用去重 出现的一些问题
这里假设在极端情况下 取一个用户在某个分区的情况
结果一 :代表使用使用如reduceByKey取一批时间戳最大的去重,线上的场景是根据用户自身的时间戳取数据,隔一段固定时间取粉色那条数据, 存在的问题是数据取少了
还存在一个问题是分组产生shuffle,落地大量文件,影响程序性能
(程序一的reduceByKey数据的间隔很小 ,不会出现过滤跨度超过业务需求)
结果二:代表不去重,很明显有大量数据冗余
这个可以通过用scala的集合类划分数据解决
Dstream_w.foreachRDD(rdd=>{
rdd.foreachPartition(partition=>{
val list:List[ReceiveDataObject] = partition.toList
//person_code List[ReceiveDataObject]
val person_code:Map[String,List[ReceiveDataObject]] = list.groupBy(_.person_code)
var timestamp= 0L
//这里基于拿到需要的数据
val iterable:Iterable[List[ReceiveDataObject]] = person_code.map(entry => {
entry._2.reduce((op1: ReceiveDataObject, op2: ReceiveDataObject) => {
if (timestamp == 0L) timestamp = op1.timestamp
if (op1.timestamp - op2.timestamp > new Properties().get("").toString.toLong) {
timestamp = op1.timestamp
op1.load = true
} else {
if (op2.timestamp - timestamp > new Properties().get("").toString.toLong) {
op2.load = true
}
}
})
entry._2.filter(_.load)
})
val iterable1:Iterable[List[ReceiveDataObject]] = person_code.map(entry => {
entry._2.filter(f=>{
if (timestamp == 0L) timestamp = {
f.timestamp
} else {
if (f.timestamp - timestamp > new Properties().get("").toString.toLong) {
return true
}
}
return false
})
})
})
})
这个根据业务逻辑可以简单划分后就没有问题了
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhgchbbe
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
怎样阻止微信小程序自动打开
PHP中文网 06-13 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01