编程

如何在 CompletableFuture 中实现多个 REST 调用

134 2024-09-08 19:20:00

1. 介绍

在创建软件功能时,其中一项日常工作是从不同来源检索数据并将其聚合在响应中。在微服务中,这些源通常是外部 REST API。

本文中,我们将使用 Java 的 CompletableFuture 来从多个外部 REST API 并行检索数据。 

2. 为什么在 REST 调用中使用并行

想象一个场景,我们需要更新对象中的各个字段,每个字段值都来自外部 REST 调用。一种替代方法是顺序调用每个 API 来更新每个字段。

然而,等待一个 REST 调用完成以启动另一个调用会增加我们服务的响应时间。例如,如果我们调用两个 API,每个 API 需要 5 秒,那么总时间至少为 10 秒,因为第二次调用需要等待第一次调用完成。

相反,我们可以并行调用所有 API,这样总时间就是最慢的 REST 调用的时间。例如,其中一个调用需要 7 秒,另一个需要 5 秒。在这种情况下,我们将等待 7 秒,因为我们已经并行处理了所有内容,必须等待所有结果完成。

因此,并行是减少服务响应时间的绝佳选择,使其更具可扩展性并改善用户体验。

3. 使用用 CompletableFuture 实现并发

Java 中的 CompletableFuture 类是一个便捷的工具,用于组合和运行不同的并行任务以及处理单个任务错误。

在后续内容中,我们将用它来组合并运行 3 个 REST 调用,用于输入列表中的每个对象。

3.1. 创建 Demo 应用

首先定义用于更新的目标 POJO:

public class Purchase {
    String orderDescription;
    String paymentDescription;
    String buyerName;
    String orderId;
    String paymentId;
    String userId;

    // all-arg constructor, getters and setters
}

Purchase 类有三个需要更新的字段,每个字段通过 ID 调用不同的 REST 查询。

我们首先创建一个定义 RestTemplate Bean 及域名 URL 的类,用于 REST 调用:

@Component
public class PurchaseRestCallsAsyncExecutor {
    RestTemplate restTemplate;
    static final String BASE_URL = "https://internal-api.com";

    // all-arg constructor
}

然后,定义 /orders API 调用:

public String getOrderDescription(String orderId) {
    ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/orders/%s", BASE_URL, orderId),
        String.class);

    return result.getBody();
}

接下来,定义 /payments API 调用:

public String getPaymentDescription(String paymentId) {
    ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/payments/%s", BASE_URL, paymentId),
        String.class);

    return result.getBody();
}

最后定义 /users API 调用:

public String getUserName(String userId) {
    ResponseEntity<String> result = restTemplate.getForEntity(String.format("%s/users/%s", BASE_URL, userId),
        String.class);

    return result.getBody();
}

这三个方法都使用 getForEntity() 方法进行 REST 调用,并将结果包装在 ResponseEntity 对象中。

然后,我们调用 getBody() 从 REST 调用中获取响应体。

3.2. 使用 CompletableFuture 进行多个 REST 调用:

接下来,我们创建一个方法,来创建并运行一套 3 个 CompletableFuture

public void updatePurchase(Purchase purchase) {
    CompletableFuture.allOf(
      CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
        .thenAccept(purchase::setOrderDescription),
      CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
        .thenAccept(purchase::setPaymentDescription),
      CompletableFuture.supplyAsync(() -> getUserName(purchase.getUserId()))
        .thenAccept(purchase::setBuyerName)
    ).join();
}

我们使用 allOf() 方法来实现 CompletableFuture 的创建步骤。每个参数都是一个并行任务,它们是使用 REST 调用及其结果构建的另一个 CompletableFuture

为创建并行任务,首先使用 supplyAsync() 方法,用以从我们将要检索的数据中提供 Supplier。然后使用 thenAccept() 方法消费 supplyAsync() 提供的结果,并在 Purchase 类中设置对应的字段。

allOf() 的结尾,我们只是在那里的创建任务。没有采取任何行动。

最后,我们调用 join() 方法,以并行运行所有任务,并收集它们的结果。由于 join() 是一个线程阻塞操作,我们只在其结尾处调用它,而不是在每个任务步骤中调用。这是为了减少阻塞线程优化应用性能。

由于我们没有提供自定义的 ExecutorServicesupplyAsync() 方法,所有的任务都在同一个执行器中运行。默认情况下,Java 使用 ForkJoinPool.commonPool()

一般来说,向 supplyAsync() 指定一个自定义的 ExecutorService 是好的实际,这样我们通过线程池参数,我们拥有了更多的控制权。

3.3. 为列表(List)中的每个元素执行多个 Rest 调用

要在集合中应用 updatePurchase 方法,我们只需在 forEach() 循环中调用它:

public void updatePurchases(List<Purchase> purchases) {
    purchases.forEach(this::updatePurchase);
}

updatePurchases() 方法检索 Purchase 列表并将之前创建的 updatePurchase 方法应用到每一个元素中。

每个对 updatePurchases() 的调用运行三个我们在 CompletableFuture 中定义的并行任务。因此,每个 purchase 都有它自己的 CompletableFuture 对象,用来运行三个并行的 REST 调用。

4. 处理错误

在分布式系统中,服务不可用或网络故障很常见。这些故障可能发生在外部 REST API 中,而作为 API 的客户端,我们并不知道这些故障。例如,如果应用关闭,通过网络发送的请求永远不会完成。

4.1. 使用 handle() 优雅地处理错误

REST 调用执行期间可能会出现异常。例如,如果 API 服务关闭,或者如果我们输入了无效的参数,我们将得到错误。

因此,我们可以使用 handle() 方法单独处理每个 REST 调用异常:

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)

方法参数是一个 BiFunction,其中包含上一个任务的结果和异常作为参数。

为了说明,让我们将 handle() 步骤添加到 CompletableFuture 的一个步骤中:

public void updatePurchaseHandlingExceptions(Purchase purchase) {
    CompletableFuture.allOf(
        CompletableFuture.supplyAsync(() -> getPaymentDescription(purchase.getPaymentId()))
          .thenAccept(purchase::setPaymentDescription)
          .handle((result, exception) -> {
              if (exception != null) {
                  // handle exception
                  return null;
              }
              return result;
          })
    ).join();
}

本例中,handle()thenAccept() 调用的 setPaymentDescription() 中获取一个 Void 类型。

然后,它将 thenAccept() 操作中抛出的任何错误存储在异常中。因此,我们使用它来检查错误,并在if语句中正确处理它。

最后,如果没有抛出异常,handle() 返回作为参数传递的值。否则,它将返回 null

4.2. 处理 REST 调用超时

使用 CompletableFuture 时,我们可以指定一个类似于我们在 REST 调用中定义的任务超时时间。因此,如果任务未在指定时间内完成,Java 将以 TimeoutException 结束任务执行。

为此,让我们修改 CompletableFuture 的一个任务来处理超时:

public void updatePurchaseHandlingExceptions(Purchase purchase) {
    CompletableFuture.allOf(
        CompletableFuture.supplyAsync(() -> getOrderDescription(purchase.getOrderId()))
          .thenAccept(purchase::setOrderDescription)
          .orTimeout(5, TimeUnit.SECONDS)
          .handle((result, exception) -> {
              if (exception instanceof TimeoutException) {
                  // handle exception
                  return null;
              }
              return result;
          })
    ).join();
}

我们在 CompletableFuture 构建器中添加了 orTimeout() 行,如果任务在 5 秒内未完成,则会停止执行。

我们还在 handle() 方法中添加了一个 if 语句,以单独处理 TimeoutException

CompletableFuture 添加超时时间以保证任务始终完成。这对于避免线程无限期挂起,等待可能永远无法完成的操作结果非常重要。因此,它减少了处于长时间运行状态的线程数量,并提高了应用的运行状况。

5. 结论

在使用分布式系统时,其中一个常见的任务是对不同的 API 进行 REST 调用,以构建适当的响应。

本文中,我们看到了如何使用 CompletableFuture 为集合中的每个对象构建一组并行 REST 调用任务。

我们还看到了如何使用 handle() 方法优雅地处理超时和一般异常。