在構建LLM gateway時,如何使用Spring WebFlux實現從serverB到serverC的重試策略?

在構建LLM gateway時,如何使用Spring WebFlux實現從serverB到serverC的重試策略?

利用spring WebFlux構建LLM網關的重試機制

在構建LLM網關時,需要處理服務間的通信,并確保當某個服務不可用時,能夠無縫切換到備用服務。本文將探討如何使用Spring WebFlux實現這一目標,尤其是在網關到Server B通信失敗時,如何重試并連接到Server C。

場景描述

我們的LLM網關調用鏈路為:客戶端 -> 網關 -> Server B。如果網關到Server B的連接失敗,我們希望網關能夠重試并連接到Server C。這需要網關能夠捕獲到Server B的錯誤響應碼,并在失敗時自動切換到Server C。

代碼分析及改進方案

我們先來看原始的ssehttp方法,它處理網關到Server B或Server C的請求:

Flux<Response> responseFlux = webClient.create(url)                 .post()                 .headers(httpHeaders -> setHeaders(httpHeaders, headers))                 .contentType(MediaType.APPLICATION_JSON)                 .bodyValue(jsonBody)                 .retrieve()                 .onStatus(status -> status != HttpStatus.OK, response -> {                     // 錯誤處理邏輯                 })                 // ...其他邏輯...

為了實現重試策略,我們需要捕獲Server B的錯誤響應碼,并在發生錯誤時切換到Server C。之前的嘗試存在一些問題:簡單的try-catch無法捕獲Flux內部的錯誤;subscribe方法是非阻塞的,導致錯誤處理邏輯無法及時生效。

最佳實踐:利用retryWhen和onErrorResume

為了解決上述問題,我們應該利用Spring WebFlux提供的retryWhen和onErrorResume操作符。

首先,修改sseHttp方法,加入重試邏輯:

Flux<Response> sseHttp(String url) {     return webClient.create(url)             .post()             .headers(httpHeaders -> setHeaders(httpHeaders, headers))             .contentType(MediaType.APPLICATION_JSON)             .bodyValue(jsonBody)             .retrieve()             .onStatus(HttpStatus::isError, clientResponse -> {                 // 記錄錯誤日志,方便調試                 return Mono.error(new WebClientResponseException("Server returned error status: " + clientResponse.rawStatusCode(), clientResponse.rawStatusCode(), clientResponse.headers().asHttpHeaders(), clientResponse.bodyToMono(String.class).block(), null));             })             .bodyToFlux(typeRef)             .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))                     .Filter(throwable -> throwable instanceof WebClientResponseException)                     .onRetryExhaustedThrow((spec, signal) -> new gatewayException("Failed to connect to both Server B and Server C after multiple retries."))); }

這段代碼使用onStatus處理HTTP錯誤狀態碼,并使用retryWhen進行重試,最多重試3次,每次間隔1秒。 filter確保只重試WebClientResponseException類型的異常。如果重試次數耗盡,則拋出GatewayException。

然后,在調用sseHttp的地方,使用onErrorResume處理Server B的失敗,并切換到Server C:

Mono<Response> responseMono = sseHttp(serverBUrl)         .onErrorResume(WebClientResponseException.class, ex -> {             log.warn("Failed to connect to Server B: {}", ex.getMessage()); // 記錄錯誤日志             return sseHttp(serverCUrl);         })         .next();

這段代碼先嘗試連接Server B,如果發生WebClientResponseException,則嘗試連接Server C。 next()方法確保只返回一個結果。

處理多個成功響應

如果Server B和Server C都成功返回數據,我們需要確保只處理一個響應。 可以使用一個AtomicBoolean變量來跟蹤是否已經成功處理過響應:

AtomicBoolean success = new AtomicBoolean(false);  Flux<Response> sseHttp(String url) {     // ... (previous code) ...     .doOnNext(response -> {         if (success.compareAndSet(false, true)) {             // 處理成功的響應         }     })     // ... (rest of the code) ... }

通過以上改進,我們實現了更健壯的重試機制,能夠有效處理服務間的通信故障,并確保LLM網關的高可用性。 記住添加充分的日志記錄,方便排查問題。

? 版權聲明
THE END
喜歡就支持一下吧
點贊13 分享