如何利用CompletableFuture保證批量接口請(qǐng)求的順序并高效處理結(jié)果?

如何利用CompletableFuture保證批量接口請(qǐng)求的順序并高效處理結(jié)果?

Java并發(fā)編程:使用CompletableFuture高效有序處理批量接口請(qǐng)求

并發(fā)訪問第三方接口能顯著提升數(shù)據(jù)處理效率,但如果不控制線程執(zhí)行順序,最終結(jié)果可能與原始數(shù)據(jù)順序不符,導(dǎo)致后續(xù)處理錯(cuò)誤。本文介紹如何利用Java的CompletableFuture在多線程環(huán)境下,確保接口請(qǐng)求及結(jié)果處理的有序性。

問題:直接使用CompletableFuture.runAsync進(jìn)行異步調(diào)用,由于線程執(zhí)行順序不可預(yù)測(cè),導(dǎo)致返回結(jié)果與原始數(shù)據(jù)列表順序不一致。CompletableFuture.allOf僅能保證所有任務(wù)完成,無(wú)法保證順序。

解決方案:使用CompletableFuture.supplyAsync,它能返回結(jié)果。將每個(gè)異步任務(wù)的返回結(jié)果存儲(chǔ)在CompletableFuture列表中,所有任務(wù)完成后,通過stream().map(CompletableFuture::join).collect(Collectors.toList())按原始順序收集結(jié)果。

改進(jìn)后的代碼示例:

public static void main(String[] args) {     List<String> dataList = new ArrayList<>(); // 原始數(shù)據(jù)列表     // ... 初始化dataList ...      ExecutorService executorService = new ThreadPoolExecutor(             10, // corePoolSize             20, // maximumPoolSize             60L, // keepAliveTime             TimeUnit.SECONDS,             new LinkedBlockingQueue<>(1024), // workQueue             new ThreadPoolExecutor.CallerRunsPolicy() // handler     );      List<CompletableFuture<String>> futures = new ArrayList<>();     for (String data : dataList) {         futures.add(CompletableFuture.supplyAsync(() -> {             logger.info("Processing data: {}", data);             // 調(diào)用第三方接口,處理data             // ...  接口調(diào)用及結(jié)果處理邏輯 ...             return data + " - Processed Result"; // 返回處理結(jié)果         }, executorService));     }      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenRun(() -> {         List<String> results = futures.stream()                 .map(CompletableFuture::join)                 .collect(Collectors.toList());         logger.info("All tasks completed. Results: {}", JSON.toJSONString(results));         // 后續(xù)處理結(jié)果     }).thenRun(() -> executorService.shutdown()); }

改進(jìn)后的代碼使用CompletableFuture.supplyAsync返回每個(gè)接口調(diào)用的結(jié)果,并存儲(chǔ)在futures列表中。CompletableFuture.allOf確保所有任務(wù)完成后再處理結(jié)果,stream().map(CompletableFuture::join).collect(Collectors.toList())保證結(jié)果按原始順序收集,從而解決了并發(fā)導(dǎo)致結(jié)果順序錯(cuò)亂的問題。 代碼還使用了自定義線程池,以便更好地控制資源使用。

? 版權(quán)聲明
THE END
喜歡就支持一下吧
點(diǎn)贊13 分享