Scala Future异步编程
Scala 的Future类主要有两种使用方式,还可以两种同时使用:
首先要清楚的是Await.result(future,Duration.Inf)或者Await.result(Future.sequence(futures), Duration.Inf)是等待future完成并获取所有的future最终的返回值,
一.非阻塞方式(Non-Blocking),也称为回调方式(Callback):父actor或主程序在执行期间启动future,future任务和父actor并行执行,当每个future完成任务,将通知父actor。通过onComplete、onSuccess、onFailure方式使用来获取结果.
1.不设置超时,非阻塞的方式
对于单独的future,直接用回调函数获取结果,优点是可以不阻塞主线程继续执行代码,效率跟高,缺点是是不一定保证能获取到最终future结果,如果一定要获取最终future结果,外层需要通过阻塞的方式Await.result(future,Duration.Inf),单个future一直等待
或者Await.result(Future.sequence(futures), Duration.Inf)多个future一直等待的方式来保证得到最终结果
val future = Future {
println(s"future逻辑")
true
}
future onComplete {
case Success(value) =>
println(s"异步返回值:${value}")
case Failure(ex: Exception) =>
println(s"异步错误码:${getExceptionDetail(ex)}")
ex.printStackTrace()
}
2.设置超时,非阻塞的方式:
我们可以使用纯scala模式,来模仿实现after的功能
为了更容易使用future的超时设置,我们可以使用隐式类来扩展scala future从而支持超时:
使用时,为了确保在执行前,计时还没有开始,将future设置lazy val。
import scala.concurrent._
import scala.concurrent.duration.FiniteDuration
import ExecutionContext.Implicits.global
import akka.actor.ActorSystem
import akka.pattern.after
implicit class FutureExtensions[T](f: Future[T]) {
def withTimeout(timeout: => Throwable)(implicit duration: FiniteDuration, system: ActorSystem): Future[T] = {
Future firstCompletedOf Seq(f, after(duration, system.scheduler)(Future.failed(timeout)))
}
}
现在,我们可以随时很方便的给future设置超时了:
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.{ Success, Failure }
import ExecutionContext.Implicits.global
import akka.actor.ActorSystem
implicit val system = ActorSystem("theSystem")
implicit val timeout = 1 second
lazy val f = future { Thread.sleep(2000); true }
f withTimeout new TimeoutException("Future timed out!") onComplete {
case Success(x) => println(x)
case Failure(error) => println(error)
}
这种方式也是不一定保证能获取到最终future结果,只是作为是否超时的一个判断而已,如果超出指定超时时间,会抛出异常,可以通过try catch或者case Failure捕获,或者报错处理。
如果一定要获取最终future结果,外层需要通过阻塞的方式Await.result(future,Duration.Inf),单个future一直等待
或者Await.result(Future.sequence(futures), Duration.Inf)多个future一直等待的方式来保证得到最终结果
回调优化:
def onComplete[U](f: (Try[T]) ⇒ U)(implicit executor: ExecutionContext): Unit
从上面回调函数的API看出,它们传入了一个隐式参数implicit executor: ExecutionContext,一般只需要引入对应的类就行了,不需要再手动传入
但是在使用过程中发现,默认的ExecutionContext所能开辟的Future线程是受到机器CPU核数影响,那么超过核数的线程将会排队等待前面的线程完成才能开始,所以需要调大线程数。ExecutionContext类中提供了一个自定义的方法
object Implicits {
val threadPool = Executors.newFixedThreadPool(1000)
implicit lazy val ec = ExecutionContext.fromExecutor(threadPool)
}
ExecutionContext的线程数为什么受到CPU核数限制呢?从源码中可以看出,在构造的时候传入的执行器是个null
默认创建的是一个ForkJoinPool线程池,而它的线程数大小是通过Runtime.getRuntime.availableProcessors获取的,也就是CPU核数
所以用一个新的线程池创建一个执行器,再作为参数传入构造方法即可。
private val executor: ThreadPoolExecutor = new ThreadPoolExecutor(
200,
500,
60,
TimeUnit.SECONDS,
new ArrayBlockingQueue[Runnable](3000)
)
二.阻塞方式(Blocking):该方式下,主线程会阻塞住,父actor或主程序停止执行知道所有future完成各自任务。通过scala.concurrent.Await使用
1.设置超时为永久,阻塞的方式
一般情况下,不论是单个future,还是嵌套结构,里层单个future,外层是多个future的List[Future]集合,只要是最终需要等待里层所有的future完成,或者一定要获取结果,最外层都要通过Await.result(Future.sequence(futures), Duration.Inf)或者Await.result(Future.sequence(futures), Duration.Inf)一直等待的方式保证得到最终结果
val future= Future {
println(s"future中执行的逻辑代码")
}
Await.result(future, Duration.Inf)
2.设置超时为指定时间,阻塞的方式
这种适合代码中只有单个查询,指定查询超时时间,如果超时就捕获,不需要进行回调函数了
val future= Future {
println(s"future中执行的逻辑代码")
}
Await.result(future, 1 second)
这种情况下超过指定时间,会报如下错误:
java.util.concurrent.TimeoutException:
at scala.concurrent.impl.Promise $ DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise $ DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await $$ anonfun $ result $ 1.apply(package.scala:107)
at scala.concurrent.BlockContext $ DefaultBlockContext $ .blockOn(BlockContext.scala:53)
三. 一种的2种方式和二中的两种方式很多情况下是同时使用的,根据实际场景选择
例子1:就是我想进行超时的判定,超时的打印日志,但是最终就算超时了,我也要获取最终的结果并返回
可以通过第一种方案中设置超时,非阻塞的方式判定超时,但是下面再次设置超时为永久,阻塞的方式来等待最终结果
例子2:内层有5个future,外层 Await.result(Future.sequence(ckFlush), Duration.Inf)可以获取所有内层的最终结果,但是我想获取内层每个future子任务的执行结束时单独输出日志,便于错误排查。就可以在内层加入回调方式来打印日志,最外层的Await.result(Future.sequence(ckFlush), Duration.Inf)是用来等待内层所有任务执行结束,其实是为了获取所有的内层结果和日志,你内层跑了5个任务,你最终肯定是要知道每个任务是否执行成功失败的信息。
val ckFlush = new mutable.ListBuffer[Future[Boolean]]()
for (ck <- ckArr) {
val future = Future {
println(s"future逻辑")
true
}
future onComplete {
case Success(value) =>
println(s"异步返回值:${value}")
case Failure(ex: Exception) =>
println(s"异步错误码:${getExceptionDetail(ex)}")
ex.printStackTrace()
}
ckFlush.append(rs)
}
Await.result(Future.sequence(ckFlush), Duration.Inf)
如果回调函数中涉及变量,回调函数虽然是异步执行的的,但是代码是提前封装好的,如果涉及到变量,变量是封装是的值,所以回调返回结果时涉及变量的值是封装时的值,而不是当前的值。下面代码中的st1.getSqlTxt指的是是封装时的sql。
for(st1 <- configTuple){
val future = processTask(spark, st1, numPartitions, successAcc)
future onComplete{
case Success(value) => {
logger.info(s"异步返回值:${value}")
}
case Failure(ex:Exception) => {
logger.error(s"异步错误码:${getExceptionDetail(ex)}")
DingDingAlterUtils.sliceSendDingDing(s"${taskName} SQL跑批失败:${st1.getSqlTxt} 异常错误码:${getExceptionDetail(ex)} ")
ex.printStackTrace()
}
}
sqlTask.append(future)
if(sqlTask.size%sqlParallelNums==0){
Await.result(Future.sequence(sqlTask), Duration.Inf)
sqlTask.clear()
}
}
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhhicecc
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
微信运动停用后别人还能看到步数吗
PHP中文网 07-22