• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Scala Future异步编程

武飞扬头像
大数据搬砖小菜鸟
帮助1

Scala 的Future类主要有两种使用方式,还可以两种同时使用:

首先要清楚的是Await.result(future,Duration.Inf)或者Await.result(Future.sequence(futures), Duration.Inf)是等待future完成并获取所有的future最终的返回值,

一.非阻塞方式(Non-Blocking),也称为回调方式(Callback):父actor或主程序在执行期间启动future,future任务和父actor并行执行,当每个future完成任务,将通知父actor。通过onComplete、onSuccess、onFailure方式使用来获取结果.

1.不设置超时,非阻塞的方式
对于单独的future,直接用回调函数获取结果,优点是可以不阻塞主线程继续执行代码,效率跟高,缺点是是不一定保证能获取到最终future结果,如果一定要获取最终future结果,外层需要通过阻塞的方式Await.result(future,Duration.Inf),单个future一直等待
或者Await.result(Future.sequence(futures), Duration.Inf)多个future一直等待的方式来保证得到最终结果

val future = Future {
            println(s"future逻辑")
            true
          }
          future onComplete {
            case Success(value) =>
              println(s"异步返回值:${value}")
            case Failure(ex: Exception) =>
              println(s"异步错误码:${getExceptionDetail(ex)}")
              ex.printStackTrace()
          }

2.设置超时,非阻塞的方式:

我们可以使用纯scala模式,来模仿实现after的功能
为了更容易使用future的超时设置,我们可以使用隐式类来扩展scala future从而支持超时:
使用时,为了确保在执行前,计时还没有开始,将future设置lazy val。

import scala.concurrent._
import scala.concurrent.duration.FiniteDuration
import ExecutionContext.Implicits.global
import akka.actor.ActorSystem
import akka.pattern.after

implicit class FutureExtensions[T](f: Future[T]) {
  def withTimeout(timeout: => Throwable)(implicit duration: FiniteDuration, system: ActorSystem): Future[T] = {
    Future firstCompletedOf Seq(f, after(duration, system.scheduler)(Future.failed(timeout)))
  }
}
现在,我们可以随时很方便的给future设置超时了:

import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{ Success, Failure }
import ExecutionContext.Implicits.global
import akka.actor.ActorSystem

implicit val system = ActorSystem("theSystem")
implicit val timeout = 1 second

lazy val f = future { Thread.sleep(2000); true }

  f withTimeout new TimeoutException("Future timed out!") onComplete {
  case Success(x) => println(x)
  case Failure(error) => println(error)
}
学新通

这种方式也是不一定保证能获取到最终future结果,只是作为是否超时的一个判断而已,如果超出指定超时时间,会抛出异常,可以通过try catch或者case Failure捕获,或者报错处理。
如果一定要获取最终future结果,外层需要通过阻塞的方式Await.result(future,Duration.Inf),单个future一直等待
或者Await.result(Future.sequence(futures), Duration.Inf)多个future一直等待的方式来保证得到最终结果

回调优化:

def onComplete[U](f: (Try[T]) ⇒ U)(implicit executor: ExecutionContext): Unit 

从上面回调函数的API看出,它们传入了一个隐式参数implicit executor: ExecutionContext,一般只需要引入对应的类就行了,不需要再手动传入

但是在使用过程中发现,默认的ExecutionContext所能开辟的Future线程是受到机器CPU核数影响,那么超过核数的线程将会排队等待前面的线程完成才能开始,所以需要调大线程数。ExecutionContext类中提供了一个自定义的方法

object Implicits {
 val threadPool = Executors.newFixedThreadPool(1000)
 implicit lazy val ec = ExecutionContext.fromExecutor(threadPool)
}

ExecutionContext的线程数为什么受到CPU核数限制呢?从源码中可以看出,在构造的时候传入的执行器是个null
默认创建的是一个ForkJoinPool线程池,而它的线程数大小是通过Runtime.getRuntime.availableProcessors获取的,也就是CPU核数

所以用一个新的线程池创建一个执行器,再作为参数传入构造方法即可。

  private val executor: ThreadPoolExecutor = new ThreadPoolExecutor(
    200,
    500,
    60,
    TimeUnit.SECONDS,
    new ArrayBlockingQueue[Runnable](3000)
  )

二.阻塞方式(Blocking):该方式下,主线程会阻塞住,父actor或主程序停止执行知道所有future完成各自任务。通过scala.concurrent.Await使用

1.设置超时为永久,阻塞的方式

一般情况下,不论是单个future,还是嵌套结构,里层单个future,外层是多个future的List[Future]集合,只要是最终需要等待里层所有的future完成,或者一定要获取结果,最外层都要通过Await.result(Future.sequence(futures), Duration.Inf)或者Await.result(Future.sequence(futures), Duration.Inf)一直等待的方式保证得到最终结果

val future= Future {    
        println(s"future中执行的逻辑代码")
      }
Await.result(future, Duration.Inf)

2.设置超时为指定时间,阻塞的方式
这种适合代码中只有单个查询,指定查询超时时间,如果超时就捕获,不需要进行回调函数了

val future= Future {    
        println(s"future中执行的逻辑代码")
      }
      Await.result(future, 1 second)

这种情况下超过指定时间,会报如下错误:
java.util.concurrent.TimeoutException:
at scala.concurrent.impl.Promise $ DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise $ DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await $$ anonfun $ result $ 1.apply(package.scala:107)
at scala.concurrent.BlockContext $ DefaultBlockContext $ .blockOn(BlockContext.scala:53)

三. 一种的2种方式和二中的两种方式很多情况下是同时使用的,根据实际场景选择
例子1:就是我想进行超时的判定,超时的打印日志,但是最终就算超时了,我也要获取最终的结果并返回
可以通过第一种方案中设置超时,非阻塞的方式判定超时,但是下面再次设置超时为永久,阻塞的方式来等待最终结果
例子2:内层有5个future,外层 Await.result(Future.sequence(ckFlush), Duration.Inf)可以获取所有内层的最终结果,但是我想获取内层每个future子任务的执行结束时单独输出日志,便于错误排查。就可以在内层加入回调方式来打印日志,最外层的Await.result(Future.sequence(ckFlush), Duration.Inf)是用来等待内层所有任务执行结束,其实是为了获取所有的内层结果和日志,你内层跑了5个任务,你最终肯定是要知道每个任务是否执行成功失败的信息。

        val ckFlush = new mutable.ListBuffer[Future[Boolean]]()
        for (ck <- ckArr) {
            val future = Future {
            println(s"future逻辑")
            true
          }
          future onComplete {
            case Success(value) =>
              println(s"异步返回值:${value}")
            case Failure(ex: Exception) =>
              println(s"异步错误码:${getExceptionDetail(ex)}")
              ex.printStackTrace()
          }
          ckFlush.append(rs)
        }
        Await.result(Future.sequence(ckFlush), Duration.Inf)
学新通

如果回调函数中涉及变量,回调函数虽然是异步执行的的,但是代码是提前封装好的,如果涉及到变量,变量是封装是的值,所以回调返回结果时涉及变量的值是封装时的值,而不是当前的值。下面代码中的st1.getSqlTxt指的是是封装时的sql。

      for(st1 <- configTuple){
        val future = processTask(spark, st1, numPartitions, successAcc)
        future onComplete{
          case Success(value) => {
            logger.info(s"异步返回值:${value}")
          }
          case Failure(ex:Exception) => {
            logger.error(s"异步错误码:${getExceptionDetail(ex)}")
            DingDingAlterUtils.sliceSendDingDing(s"${taskName} SQL跑批失败:${st1.getSqlTxt} 异常错误码:${getExceptionDetail(ex)} ")
            ex.printStackTrace()
          }
        }
        sqlTask.append(future)

        if(sqlTask.size%sqlParallelNums==0){
          Await.result(Future.sequence(sqlTask), Duration.Inf)
          sqlTask.clear()
        }
      }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhhicecc
系列文章
更多 icon
同类精品
更多 icon
继续加载