周日的時候,突然消費者組就開始堆積了,然後臨時解決方案想着重新起一個消費者組看看能不能恢復,然而並不能,無奈之下只能遷回使用 redis 作為消息中間件的方式,周一再細查一下
1 遇到的報錯以及現象#
首先是看的是 Dashboard,發現一會消費者組裡邊有一個消費者,一會沒有
其次就該去看消費者組的業務日誌打印了,雖然再消費,但是很慢
接下來就是看了消費組的 RocketMQ 日誌打印,發現開始報錯刷屏
2023-01-08 12:26:45,649 WARN RocketmqClient - execute the pull request exception
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 24 DESC: the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details. BROKER: 192.168.1.58:10911
at org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:803)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$200(MQClientAPIImpl.java:175)
at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:754)
at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:321)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
2023-01-08 12:26:45,649 WARN RocketmqClient - execute the pull request exception
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 24 DESC: the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details. BROKER: 192.168.1.58:10911
at org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:803)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$200(MQClientAPIImpl.java:175)
at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:754)
at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:321)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
這個報錯卡卡刷屏,於是乎只能去找 RocketMQ 的消費者 client 的源碼了
2 源碼的追蹤#
2.1 消費者 client 端源碼#
org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:803)
也就是這一行
這看著應該是從 broker 端拉回來的數據有問題,不是正常能消費的數據,那就再往上層看看RemotingCommand response
這個對象是怎麼來的吧
再往上一步點的話,會看到有兩個地方一個是同步一個是異步,那看的話自然是看同步,畢竟同步代碼更簡單明瞭一些,最終的功能肯定是一樣的,因為之前學了學 netty 以及知道 RocketMQ 是使用 Netty 通信的,所以看這個方法名字this.remotingClient.invokeSync
大概能猜出來它是和 broker 端進行通信然後獲取到要消費的消息,那接著點進去
可以看到response
對象是從這個方法invokeSyncImpl
返回出來的,那就再進去看看
從這裡看的話,並沒有對這個對象做特殊的處理,這個時候一定有人有疑問,為什麼他就是能正常返回呢,裡邊還有這麼多的 if,因為根據CODE: 24 DESC: the consumer's group info not exist
這個報錯就能看出來,response
對象一定是有的,並且順利封裝了,要不然不可能有這個報錯打印,
消費者 client 的源碼算是看完了,並沒有發現有什麼問題,而這個報錯是 broker 端返回給消費者 client 的,那就去搜搜 broker 的源碼裡哪裡有這個報錯吧
2.2 broker 端源碼追蹤#
首先搜了一下這個報錯的內容,org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
在這個方法的 178 行
那這裡就是this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
返回的結果為 null,點進去看看
發現是這個consumerTable
取出來為 null
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
這個對象是一個 ConcurrentMap,那如果取出來為 null 的話,肯定是沒有這個 key,也就是沒有這個消費者組的信息,我明明啟動了消費者組怎麼可能沒有呢,那肯定是在某個地方給 remove 掉了,接下來查一下調用 remove 的方法都在哪裡
org.apache.rocketmq.broker.client.ConsumerManager#doChannelCloseEvent
org.apache.rocketmq.broker.client.ConsumerManager#unregisterConsumer
就是這兩個方法的邏輯裡有 remove 方法
查到這裡我感覺需要看一下 broker 端的日誌了
這麼多日誌,從何看起呢,先挑著日誌多的看吧,畢竟卡卡刷屏,肯定日誌量大,文件也不小
這裡的截圖應該是已經歸檔過了的,現在 broker.log 才 16M,記得出問題的時候 100 多 M
先看broker.log
吧
我從茫茫的打印日誌中找到了這個
看著大概就是說新創建了一個消費者組信息,然後直接就被刪掉了
只有NETTY EVENT: remove not active channel
這個是 WARN,先查查這個在哪裡報的吧
正好在org.apache.rocketmq.broker.client.ConsumerManager#doChannelCloseEvent
直接把org.apache.rocketmq.broker.client.ConsumerManager#unregisterConsumer
這個排除了
那麼這個方法的上一層就是
這些方法都是在一個 run () 方法裡邊跑的
看的出來this.eventQueue
這個是存放 NettyEvent 的地方,而這三個方法都是從這裡調用的
那就看什麼地方會把 NettyEvent 放到 eventQueue 中
搜一下只有這個類中的putNettyEvent
方法才會 add
那再看看這個方法是從哪裡調用的吧
一共有這麼多地方調用,想來咱們看的是 broker 的代碼,那肯定是 server 吧,那就鎖定在這四個了
那也可以看到第一個 type 類型是CONNECT
直接排除,剩下的三個,
分別都點進去看看,會發現這三個方法都伴隨著日誌打印
那咱們就去看看日誌打印,然後定位出來具體是哪種事件導致了刪除我的消費者組信息
這裡就有個問題,server 端那麼多日誌文件應該看哪個哇?
我想到了看下這個代碼是在哪個包下
可以看到是 remoting,那就看對應名字的 log,此時打開消費者,觀察日誌
可以看到 Active 之後,直接 Inactive,那肯定是這個事件導致的
接下來搜這個日誌打印,然後就會發現是觸發了 CLOSE 事件
看到這裡就有點懵了,首先是 CLOSE 事件,這就說明是正常關閉,是因為客戶端不活躍導致的,那怎麼想都是客戶端有問題哇
3 繼續查看客戶端日誌#
兜兜轉轉一圈,又回到了客戶端日誌,因為之前刷屏的都是CODE: 24 DESC: the consumer's group info not exist
,但是這個是 WARN 類型的日誌,我突發奇想,搜搜 ERROR 看看,果然找到了真正的問題
這個東西熟啊,這個是 Netty 的一個解碼類,如果消息太長的話會報錯,那這個想來應該是消息太長,但是我的消息肯定不會超過 1M 哇,通過堆棧可以看到是org.apache.rocketmq.remoting.netty.NettyDecoder.decode
這裡報錯的,
報錯的是 42 行,那這個就是說當前消息太大了,解碼出了問題,我猜想是把多個消息包裝成一個大的消息,然後消費者 client 在進行解包處理,因為我的消息可能存才接近 1M 的,然後我一次獲取 100 個消息,這樣合併的話肯定會超過這個16777216
於是我在消費者啟動上增加 - Dcom.rocketmq.remoting.frameMaxLength=167772160,增加 10 倍看看,重新啟動發現有效果,比之前好點了,起碼不是刷屏報錯了,但是還是異常,偶爾還是有刷屏,Dashboard 上也是一會有消費者一會沒有,這樣子還是異常,那這樣子的话只能把一次獲取消息的個數減少了,於是我減少到了 10,也就是這個參數consumer.pullBatchSize = 10
到這裡這個報錯總算是沒有了,以我目前的技術水平,算是解決了問題,但是還是沒有找到問題出在哪裡
RemotingUtil.closeChannel(ctx.channel());
其中報錯之後會調用這個關閉 channel,估計是因為這個,具體之後在學習學習吧.
4 總結#
通過這次的源碼跟蹤查問題,體會到了 RocketMQ 的消息真的不能太大,不然會出現各種各樣奇怪的問題,但是我們業務那邊的消息如果按一個一個發送那真的是太多了,只能是業務那邊合併成一個消息,然後再批量發到 MQ,這就導致了單個消息可能接近 1M,同時消費者的話一次拉取消息也不能太多... 畢竟太多也處理不過來,索性減低一下速度,也是沒有問題的,問題解決了,接下來就是把生產的東西往 MQ 中遷移了,希望不會再有問題.....