利用spring WebFlux構建LLM網關的重試機制
在構建LLM網關時,需要處理服務間的通信,并確保當某個服務不可用時,能夠無縫切換到備用服務。本文將探討如何使用Spring WebFlux實現這一目標,尤其是在網關到Server B通信失敗時,如何重試并連接到Server C。
場景描述
我們的LLM網關調用鏈路為:客戶端 -> 網關 -> Server B。如果網關到Server B的連接失敗,我們希望網關能夠重試并連接到Server C。這需要網關能夠捕獲到Server B的錯誤響應碼,并在失敗時自動切換到Server C。
代碼分析及改進方案
我們先來看原始的ssehttp方法,它處理網關到Server B或Server C的請求:
Flux<Response> responseFlux = webClient.create(url) .post() .headers(httpHeaders -> setHeaders(httpHeaders, headers)) .contentType(MediaType.APPLICATION_JSON) .bodyValue(jsonBody) .retrieve() .onStatus(status -> status != HttpStatus.OK, response -> { // 錯誤處理邏輯 }) // ...其他邏輯...
為了實現重試策略,我們需要捕獲Server B的錯誤響應碼,并在發生錯誤時切換到Server C。之前的嘗試存在一些問題:簡單的try-catch無法捕獲Flux內部的錯誤;subscribe方法是非阻塞的,導致錯誤處理邏輯無法及時生效。
最佳實踐:利用retryWhen和onErrorResume
為了解決上述問題,我們應該利用Spring WebFlux提供的retryWhen和onErrorResume操作符。
首先,修改sseHttp方法,加入重試邏輯:
Flux<Response> sseHttp(String url) { return webClient.create(url) .post() .headers(httpHeaders -> setHeaders(httpHeaders, headers)) .contentType(MediaType.APPLICATION_JSON) .bodyValue(jsonBody) .retrieve() .onStatus(HttpStatus::isError, clientResponse -> { // 記錄錯誤日志,方便調試 return Mono.error(new WebClientResponseException("Server returned error status: " + clientResponse.rawStatusCode(), clientResponse.rawStatusCode(), clientResponse.headers().asHttpHeaders(), clientResponse.bodyToMono(String.class).block(), null)); }) .bodyToFlux(typeRef) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)) .Filter(throwable -> throwable instanceof WebClientResponseException) .onRetryExhaustedThrow((spec, signal) -> new gatewayException("Failed to connect to both Server B and Server C after multiple retries."))); }
這段代碼使用onStatus處理HTTP錯誤狀態碼,并使用retryWhen進行重試,最多重試3次,每次間隔1秒。 filter確保只重試WebClientResponseException類型的異常。如果重試次數耗盡,則拋出GatewayException。
然后,在調用sseHttp的地方,使用onErrorResume處理Server B的失敗,并切換到Server C:
Mono<Response> responseMono = sseHttp(serverBUrl) .onErrorResume(WebClientResponseException.class, ex -> { log.warn("Failed to connect to Server B: {}", ex.getMessage()); // 記錄錯誤日志 return sseHttp(serverCUrl); }) .next();
這段代碼先嘗試連接Server B,如果發生WebClientResponseException,則嘗試連接Server C。 next()方法確保只返回一個結果。
處理多個成功響應
如果Server B和Server C都成功返回數據,我們需要確保只處理一個響應。 可以使用一個AtomicBoolean變量來跟蹤是否已經成功處理過響應:
AtomicBoolean success = new AtomicBoolean(false); Flux<Response> sseHttp(String url) { // ... (previous code) ... .doOnNext(response -> { if (success.compareAndSet(false, true)) { // 處理成功的響應 } }) // ... (rest of the code) ... }
通過以上改進,我們實現了更健壯的重試機制,能夠有效處理服務間的通信故障,并確保LLM網關的高可用性。 記住添加充分的日志記錄,方便排查問題。