benym的知识笔记 benym的知识笔记
🦮首页
  • Java

    • Java-基础
    • Java-集合
    • Java-多线程与并发
    • Java-JVM
    • Java-IO
  • Python

    • Python-基础
    • Python-机器学习
  • Kafka
  • Redis
  • MySQL
  • 分布式事务
  • Spring

    • SpringIOC
    • SpringAOP
🦌设计模式
  • 剑指Offer
  • LeetCode
  • 排序算法
🐧实践
  • Rpamis

    • Utils
    • Exception
    • Security
  • 归档
  • 标签
  • 目录
🦉里程碑
🐷关于
GitHub (opens new window)

benym

惟其艰难,才更显勇毅🍂惟其笃行,才弥足珍贵
🦮首页
  • Java

    • Java-基础
    • Java-集合
    • Java-多线程与并发
    • Java-JVM
    • Java-IO
  • Python

    • Python-基础
    • Python-机器学习
  • Kafka
  • Redis
  • MySQL
  • 分布式事务
  • Spring

    • SpringIOC
    • SpringAOP
🦌设计模式
  • 剑指Offer
  • LeetCode
  • 排序算法
🐧实践
  • Rpamis

    • Utils
    • Exception
    • Security
  • 归档
  • 标签
  • 目录
🦉里程碑
🐷关于
GitHub (opens new window)
  • Java-基础

    • Java反射获取类对象的三种方式
    • 动态代理
  • Java-集合

    • ArrayList的扩容机制
  • Java-多线程与并发

    • Java多线程实现的几种方式
    • 多线程交替打印数字—多种实现
    • CountDownLatch使用方法
    • CyclicBarrier使用方法
    • Semaphore使用方法
    • CompletableFuture常用用法及踩坑
    • Java-JVM

      • 自定义类加载器
      • JMH-基准测试框架
    • Java-IO

      • 概览
    • Java
    • Java-多线程与并发
    benym
    2022-06-06
    目录

    CompletableFuture常用用法及踩坑

    # CompletableFuture常用用法及踩坑

    作为常用的并发类,CompletableFuture在项目中会经常使用,其作用与Google的ListenableFuture类似;

    总结来说CompletableFuture比Future多出了流式计算,返回值,异步回调,多Future组合的功能。

    # 适用场景

    • 某个接口不好修改,又没有提供批量的方法时
    • 需要异步调用接口时
    • CPU密集型任务,计算场景多,或多个不关联的接口需要同时调用时

    # 场景一

    问题:系统中存量老接口,逻辑复杂,改造成本大。

    解决方案:利用CompletableFuture提交多个任务分别执行逻辑,join等待所有任务执行完毕

    // 模拟功能:根据某个id列表,查询得到与id相关的数据,其中查询得到与id相关数据的过程非常复杂且耗时
    // executor为全局线程池
    
    List<MockDTO> results = Collections.synchronizedList(new ArrayList<>());
    
    List<String> mockIds = new ArrayList<>();
    List<CompletableFuture<Void>> futures = new ArrayList<>();
    mockIds.forEach(mockId -> {
        CompletableFuture<Void> res = CompletableFuture.supplyAsync(() -> {
            // 根据mockId组装查询实体
          	MockDTO mockdto = new MockDTO();
            mockdto.setId(mockId);
            
            // 调用存量老接口
            List<MockDTO> result = mockService
                    .getDataByMockIds(mockdto);
            return result;
        },executor).thenAccept(results::addAll).exceptionally(ex -> {
            throw new RuntimeException(ex.getMessage() + "异步数据获取执行异常");
        });
        futures.add(res);
    });
    futures.forEach(CompletableFuture::join);
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    • 这一场景描述一个典型的问题,当存量接口不好更改,查询速度很慢时,我们可以通过简单的CompletableFuture任务来并行执行。
    • 由于返回值是List的原因,需要注意并发add问题,可采用一个synchronizedList来解决。
    • 对于每一个任务返回之后执行thenAccept将返回数据加入到results中。
    • 同时,主线程需要等待异步线程全部执行完毕才返回结果,即join操作。
    # 如果不join会发生什么?

    主线程会很快就执行完毕,异步线程还没有执行完,主线程就返回了结果,这个结果必然不是我们预期的

    # 场景二

    问题:异步调用接口,比如消息发送接口,不能够阻塞主流程,但又需要获取返回值/知道本次调用是否成功

    解决方案:CompletableFuture异步调用+handle同时处理结果和异常

    handle与whenComplete均可

    以一个更容易踩坑的异步调用第三方接口为例

    log.info("url={}, messageDTO={}", serverUrl, messageDTO);
    String jsonParams = JacksonUtils.toJson(messageDTO);
    HttpHeaders headers = new HttpHeaders();
    headers.set("Content-Type", "application/json");
    HttpEntity<String> entity = new HttpEntity<>(jsonParams, headers);
    AtomicReference<JSONObject> responseJsonObject = new AtomicReference<>();
    log.info("==============Entity=========={}",entity);
    try {
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            // 一定要设置超时时间
            ResponseEntity<JSONObject> exchange = restTemplate.exchange(serverUrl, HttpMethod.POST, entity,
                    JSONObject.class);
            return exchange;
        },asyncTaskExecutor)
                .handle((exchange, ex) -> {
                    if (ex == null) {
                        if (exchange.hasBody()) {
                            responseJsonObject.set(exchange.getBody());
                            log.info("=========Message body========={}", responseJsonObject);
                            //当返回对象为空或者不为规定的code时,接口失败
                            if (responseJsonObject.get() == null || !responseJsonObject.get().getString("code").equals("200")) {
                                String errCode = responseJsonObject.get().getString("code");
                                String errMsg = responseJsonObject.get().getString("message");
                                log.warn("消息接口应答码不成功,错误代码:{},返回消息{}",errCode,errMsg);
                            } else {
                                log.info("消息接口返回成功,返回消息{}",exchange.getBody());
                            }
                        } else {
                            log.warn("消息接口返回body不存在");
                        }
                    } else {
                        log.warn("=========消息发送失败,第三方提供接口异常========={}",ex.getMessage());
                    }
                    return null;
                });
    } catch (Exception e) {
        log.warn("异步线程内部异常",e);
    }
    log.info("发送消息主线程执行完毕");
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39

    以上代码逻辑很简单,处理原则就是有异常处理异常,没有异常就正常解析返回值。同时打印足量的日志方便排查。

    # 踩坑场景

    对于调用非主流程接口,如发送消息等,其调用原则不应该阻塞主流程,同时出现错误可不用抛出异常,以免发生主流程正常执行,但发送消息失败,消息模块抛出异常造成主流程回滚。本文不讨论消息如何可靠,只考虑作为生产者,在不引入中间件的情况下,如何简单快速的对接第三方消息接口。

    处理原则:

    1. 对于一般的RPC,如Fegin、Dubbo等。或者外部提供的接口/或需要走RestTemplate的接口。

      设置RPC或者全局RestTemplate的超时时间

      如果不设置超时时间,运行上述代码时会发现,明明主线程执行完毕,异步线程没有直接报错,但异步线程的结果迟迟没有返回(假设调用的接口网络不通,且没有回TCP包,没有快速失败),也没有打印日志。以RestTemplate为例,其默认的超时时间为30s,也就是说其实不是不会打印日志,只是30秒之后才觉得调用的接口网络不通。很久才打印日志,会让我们排查问题时变得疑惑

    2. 对于直接调用的Service服务:即时返回结果,可不做超时设置

    提示

    注意点: CompletableFuture在本地测试的时候会发现,主线程执行完毕了,异步线程一直没有返回,这是因为如果使用java的主线程方法测试,那么运行结束后,程序就退出了,异步线程自然也就没有了。对于Web项目,调用该方法时,只是主线程结束,但程序没有退出,异步线程依旧可以运行

    # 场景三

    问题:多个不相关的任务,并行计算

    解决方案:多个CompletableFuture异步计算,使用allOf+join

    List<CompletableFuture> futures = new ArrayList<>(3);
    List<Double> result = new ArrayList<>();
    // 创建异步执行任务:
    CompletableFuture<List<Double>> cf = CompletableFuture.supplyAsync(() -> {
        System.out.println(
                Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());
        // 执行业务逻辑
        result.add(1.2);
        System.out.println(
                Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());
        return result;
    });
    CompletableFuture<List<Double>> cf2 = CompletableFuture.supplyAsync(() -> {
        System.out.println(
                Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());
        // 执行业务逻辑
        result.add(3.2);
        System.out.println(
                Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());
        return result;
    });
    CompletableFuture<List<Double>> cf3 = CompletableFuture.supplyAsync(() -> {
        System.out.println(
                Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());
        // 执行业务逻辑
        result.add(2.2);
        System.out.println(
                Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());
        return result;
    });
    futures.add(cf);
    futures.add(cf2);
    futures.add(cf3);
    //allof等待所有任务执行完成才执行cf4,如果有一个任务异常终止,则cf4.get时会抛出异常,都是正常执行,cf4.get返回null
    //anyOf是只有一个任务执行完成,无论是正常执行或者执行异常,都会执行cf4,cf4.get的结果就是已执行完成的任务的执行结果
    CompletableFuture<Void> cf4 = CompletableFuture
            .allOf(futures.toArray(new CompletableFuture[0]));
    
    System.out.println("main thread start cf4.get(),time->" + System.currentTimeMillis());
    //等待子任务执行完成
    cf4.join();
    System.out.println("子任务状态" + cf.isDone() + " " + cf2.isDone() + " " + cf3.isDone());
    System.out.println("计算结果" + result);
    System.out.println("main thread exit,time->" + System.currentTimeMillis());
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    # 运行结果
    Thread[ForkJoinPool.commonPool-worker-9,5,main] start job1,time->1654514454630
    Thread[ForkJoinPool.commonPool-worker-9,5,main] exit job1,time->1654514454630
    Thread[ForkJoinPool.commonPool-worker-9,5,main] start job2,time->1654514454631
    Thread[ForkJoinPool.commonPool-worker-9,5,main] exit job2,time->1654514454631
    Thread[ForkJoinPool.commonPool-worker-9,5,main] start job3,time->1654514454631
    Thread[ForkJoinPool.commonPool-worker-9,5,main] exit job3,time->1654514454631
    main thread start cf4.get(),time->1654514454631
    子任务状态true true true
    计算结果[1.2, 3.2, 2.2]
    main thread exit,time->1654514454633
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10

    异步线程依次执行,同时主线程等待所有子任务执行完毕,等到子任务执行完之后汇总结果,最后主线程退出。

    编辑 (opens new window)
    #Java#JUC#CompletableFuture
    上次更新: 2023/05/13, 18:05:21
    Semaphore使用方法
    自定义类加载器

    ← Semaphore使用方法 自定义类加载器→

    最近更新
    01
    SpringCache基本配置类
    05-16
    02
    DSTransactional与Transactional事务混用死锁场景分析
    03-04
    03
    Rpamis-security-原理解析
    12-13
    更多文章>
    Theme by Vdoing | Copyright © 2018-2024 benym | MIT License
     |   |   | 
    渝ICP备18012574号 | 渝公网安备50010902502537号
    • 跟随系统
    • 浅色模式
    • 深色模式
    • 阅读模式