Java并發(fā)編程:使用CompletableFuture高效有序處理批量接口請(qǐng)求
高并發(fā)訪問第三方接口能顯著提升數(shù)據(jù)處理效率,但如果不控制線程執(zhí)行順序,最終結(jié)果可能與原始數(shù)據(jù)順序不符,導(dǎo)致后續(xù)處理錯(cuò)誤。本文介紹如何利用Java的CompletableFuture在多線程環(huán)境下,確保接口請(qǐng)求及結(jié)果處理的有序性。
問題:直接使用CompletableFuture.runAsync進(jìn)行異步調(diào)用,由于線程執(zhí)行順序不可預(yù)測(cè),導(dǎo)致返回結(jié)果與原始數(shù)據(jù)列表順序不一致。CompletableFuture.allOf僅能保證所有任務(wù)完成,無(wú)法保證順序。
解決方案:使用CompletableFuture.supplyAsync,它能返回結(jié)果。將每個(gè)異步任務(wù)的返回結(jié)果存儲(chǔ)在CompletableFuture
改進(jìn)后的代碼示例:
public static void main(String[] args) { List<String> dataList = new ArrayList<>(); // 原始數(shù)據(jù)列表 // ... 初始化dataList ... ExecutorService executorService = new ThreadPoolExecutor( 10, // corePoolSize 20, // maximumPoolSize 60L, // keepAliveTime TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024), // workQueue new ThreadPoolExecutor.CallerRunsPolicy() // handler ); List<CompletableFuture<String>> futures = new ArrayList<>(); for (String data : dataList) { futures.add(CompletableFuture.supplyAsync(() -> { logger.info("Processing data: {}", data); // 調(diào)用第三方接口,處理data // ... 接口調(diào)用及結(jié)果處理邏輯 ... return data + " - Processed Result"; // 返回處理結(jié)果 }, executorService)); } CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenRun(() -> { List<String> results = futures.stream() .map(CompletableFuture::join) .collect(Collectors.toList()); logger.info("All tasks completed. Results: {}", JSON.toJSONString(results)); // 后續(xù)處理結(jié)果 }).thenRun(() -> executorService.shutdown()); }
改進(jìn)后的代碼使用CompletableFuture.supplyAsync返回每個(gè)接口調(diào)用的結(jié)果,并存儲(chǔ)在futures列表中。CompletableFuture.allOf確保所有任務(wù)完成后再處理結(jié)果,stream().map(CompletableFuture::join).collect(Collectors.toList())保證結(jié)果按原始順序收集,從而解決了并發(fā)導(dǎo)致結(jié)果順序錯(cuò)亂的問題。 代碼還使用了自定義線程池,以便更好地控制資源使用。