Redis Stream監(jiān)聽(tīng)器斷連后失效:如何避免監(jiān)聽(tīng)器因網(wǎng)絡(luò)問(wèn)題停止工作?

redis Stream監(jiān)聽(tīng)器斷連后自動(dòng)恢復(fù):保障持續(xù)消息監(jiān)聽(tīng)

使用spring data redis監(jiān)聽(tīng)redis stream消息時(shí),存在一個(gè)常見(jiàn)問(wèn)題:監(jiān)聽(tīng)器在長(zhǎng)時(shí)間運(yùn)行后(例如數(shù)周)停止接收消息,這通常是由于網(wǎng)絡(luò)或連接問(wèn)題導(dǎo)致程序與redis服務(wù)器斷開連接。本文分析此問(wèn)題,并提供一種解決方案,確保監(jiān)聽(tīng)器在連接中斷后自動(dòng)恢復(fù)。

問(wèn)題描述:

應(yīng)用使用StreamMessageListenerContainer監(jiān)聽(tīng)Redis Stream消息,但會(huì)間歇性地失效,無(wú)法接收新消息。懷疑是網(wǎng)絡(luò)連接或連接數(shù)問(wèn)題導(dǎo)致連接中斷。

解決方案:

問(wèn)題在于StreamMessageListenerContainer在默認(rèn)情況下,遇到錯(cuò)誤會(huì)停止監(jiān)聽(tīng)。我們需要修改配置,使其在連接中斷后能夠自動(dòng)重連并繼續(xù)監(jiān)聽(tīng)。

核心改進(jìn):

通過(guò)StreamReadRequest.builder().cancelOnError()方法自定義錯(cuò)誤處理邏輯。將cancelOnError設(shè)置為返回false,即可在發(fā)生異常時(shí)阻止監(jiān)聽(tīng)器停止工作,并嘗試重新連接。

改進(jìn)后的代碼片段:

@Bean public List<Subscription> subscription(RedisConnectionFactory factory) {     List<Subscription> resultList = new ArrayList<>();     for (String redisStreamName : redisStreamNames) {         initStream(redisStreamName, groups[0]);         StreamReadRequest options = StreamReadRequest.builder(StreamOffset.create(redisStreamName, ReadOffset.lastConsumed()))                 .cancelOnError(throwable -> {                     System.err.println("Redis連接錯(cuò)誤: " + throwable.getMessage()); // 記錄錯(cuò)誤信息到日志                     return false; // 不取消監(jiān)聽(tīng),嘗試重新連接                 })                 .consumer(Consumer.from(groups[0], this.getClass().getName()))                 .autoAck(true) // 自動(dòng)確認(rèn)消息                 .build();          StreamMessageListenerContainer<String, Map<String, String>> listenerContainer =              StreamMessageListenerContainer.create(factory, options); // 使用泛型指定消息類型         Subscription subscription = listenerContainer.receive(options, streamListener);         resultList.add(subscription);         listenerContainer.start();     }     return resultList; }

改進(jìn)說(shuō)明:

  • 使用StreamReadRequest.builder()構(gòu)建StreamReadRequest對(duì)象,并設(shè)置cancelOnError函數(shù),在發(fā)生異常時(shí)返回false,避免監(jiān)聽(tīng)器停止。
  • 添加了錯(cuò)誤日志記錄,方便排查問(wèn)題。
  • 使用了泛型StreamMessageListenerContainer>,假設(shè)消息鍵為String,消息體為Map,請(qǐng)根據(jù)實(shí)際情況修改。 這更清晰地定義了消息類型。
  • receiveAutoAck 方法已被替換為 receive 方法,并顯式設(shè)置 autoAck 為 true,確保消息自動(dòng)確認(rèn)。

額外建議:

雖然此方法可防止監(jiān)聽(tīng)器停止,但建議結(jié)合其他監(jiān)控手段,例如監(jiān)控Redis連接狀態(tài),以便及時(shí)發(fā)現(xiàn)并處理更深層次的網(wǎng)絡(luò)或Redis服務(wù)問(wèn)題。 定期檢查日志中的錯(cuò)誤信息,有助于診斷潛在問(wèn)題。

Redis Stream監(jiān)聽(tīng)器斷連后失效:如何避免監(jiān)聽(tīng)器因網(wǎng)絡(luò)問(wèn)題停止工作?

? 版權(quán)聲明
THE END
喜歡就支持一下吧
點(diǎn)贊12 分享