On Sunday, the consumer group suddenly started to accumulate, and then the temporary solution was to try to create a new consumer group to see if it could recover, but it didn't work. In desperation, we had to revert to using Redis as the message middleware and will investigate further on Monday.
1 Errors Encountered and Phenomena#
First, I looked at the Dashboard and found that sometimes there was one consumer in the consumer group, and sometimes there wasn't.
Next, I checked the business log prints of the consumer group. Although it was consuming, it was very slow. Then I looked at the RocketMQ log prints of the consumption group and found that error messages were flooding the screen.
2023-01-08 12:26:45,649 WARN RocketmqClient - execute the pull request exception
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 24 DESC: the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details. BROKER: 192.168.1.58:10911
at org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:803)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$200(MQClientAPIImpl.java:175)
at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:754)
at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:321)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
2023-01-08 12:26:45,649 WARN RocketmqClient - execute the pull request exception
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 24 DESC: the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details. BROKER: 192.168.1.58:10911
at org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:803)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$200(MQClientAPIImpl.java:175)
at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:754)
at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:321)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
This error was flooding the screen, so I had to look for the source code of the RocketMQ consumer client.
2 Source Code Tracking#
2.1 Consumer Client Source Code#
org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:803)
is the line in question.
It seems that the data pulled back from the broker is problematic and not normal consumable data. So let's check how the RemotingCommand response
object is created.
If we go one step up, we can see there are two places: one is synchronous and the other is asynchronous. Naturally, we look at the synchronous one since synchronous code is simpler and clearer. The final functionality is definitely the same. Since I previously learned about Netty and know that RocketMQ uses Netty for communication, I can guess from the method name this.remotingClient.invokeSync
that it communicates with the broker to retrieve the messages to be consumed. So let's go deeper.
We can see that the response
object is returned from this method invokeSyncImpl
, so let's check it out.
From this perspective, there is no special processing done on this object. At this point, someone might wonder why it can return normally when there are so many if statements. Based on CODE: 24 DESC: the consumer's group info not exist
, it is clear that the response
object must exist and has been successfully encapsulated; otherwise, this error message wouldn't be printed.
I have finished looking at the consumer client source code and found no issues. Since this error is returned from the broker to the consumer client, let's search the broker's source code for where this error might be.
2.2 Broker Source Code Tracking#
First, I searched for the error message content, org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
at line 178 of this method.
Here, this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
returns null. Let's check it out.
It turns out that consumerTable
is retrieved as null.
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
This object is a ConcurrentMap, so if it returns null, it means there is no such key, which means there is no information about this consumer group. I clearly started the consumer group, so how could it not exist? It must have been removed somewhere. Next, let's check where the remove methods are called.
org.apache.rocketmq.broker.client.ConsumerManager#doChannelCloseEvent
org.apache.rocketmq.broker.client.ConsumerManager#unregisterConsumer
These two methods contain the logic for the remove method. At this point, I felt the need to check the broker's logs.
With so many logs, where should I start? Let's look at the ones with more logs first since the screen is flooding, indicating a large log volume and the file isn't small. This screenshot should have already been archived; the current broker.log is only 16M, but I remember when the issue occurred, it was over 100M. Let's check broker.log
.
From the vast amount of printed logs, I found this. It seems to indicate that a new consumer group information was created and then immediately deleted. The only NETTY EVENT: remove not active channel
is a WARN, so let's check where this is reported.
It happens to be in org.apache.rocketmq.broker.client.ConsumerManager#doChannelCloseEvent
, which directly excludes org.apache.rocketmq.broker.client.ConsumerManager#unregisterConsumer
.
The previous layer of this method is:
These methods are all running within a run() method.
It can be seen that this.eventQueue
is where NettyEvent is stored, and these three methods are all called from here. So let's see where the NettyEvent is added to the eventQueue.
Searching, only the putNettyEvent
method in this class can add events. Let's see where this method is called from.
There are many places calling it, and since we are looking at the broker's code, it must be from the server. So let's focus on these four.
We can see that the first type is CONNECT
, which can be excluded directly. For the remaining three, let's check them one by one, and we will find that these three methods all accompany log prints.
Let's check the log prints to pinpoint exactly which event caused the deletion of my consumer group information. Here arises a question: which log file should I check on the server side? I thought about looking at which package this code is in.
It can be seen that it is under remoting, so let's check the corresponding log. At this point, I opened the consumer and observed the logs.
We can see that after being Active, it goes directly to Inactive, which definitely indicates that this event caused it. Next, I searched for this log print and found that it triggered a CLOSE event.
At this point, I was a bit confused. The CLOSE event indicates that it was normally closed due to client inactivity, which suggests that there is a problem with the client.
3 Continuing to Check Client Logs#
After going around in circles, I returned to the client logs. Previously, the flooding messages were all CODE: 24 DESC: the consumer's group info not exist
, but this was a WARN type log. I had a sudden thought to search for ERROR logs, and sure enough, I found the real issue.
This looks familiar; this is a Netty decoding class. If the message is too long, it will throw an error. It seems that the message is too long, but my message should definitely not exceed 1M. From the stack trace, it can be seen that the error occurred at org.apache.rocketmq.remoting.netty.NettyDecoder.decode
.
The error is at line 42, indicating that the current message is too large, causing a decoding issue. I suspect that multiple messages are being packaged into one large message, and then the consumer client is trying to unpack it. Since my messages might be close to 1M, and I fetch 100 messages at once, this merging would definitely exceed 16777216
.
So, I increased the parameter -Dcom.rocketmq.remoting.frameMaxLength=167772160
to ten times and restarted. It had some effect; at least the flooding error messages were gone, but it was still not normal. Occasionally, there were still flooding messages, and the Dashboard showed that sometimes there was a consumer and sometimes there wasn't. This was still not normal, so I had to reduce the number of messages fetched at once. I reduced it to 10, which is the parameter consumer.pullBatchSize = 10
.
Finally, this error was resolved. At my current technical level, I consider the problem solved, but I still haven't found out where the issue lies. RemotingUtil.closeChannel(ctx.channel());
is called after an error occurs, which I suspect is the cause. I will study this further later.
4 Summary#
Through this source code tracking to troubleshoot the issue, I realized that RocketMQ messages cannot be too large; otherwise, various strange problems will occur. However, if our business sends messages one by one, it would be too many. We can only merge them into one message and then send them in batches to the MQ, which leads to individual messages potentially being close to 1M. Meanwhile, consumers cannot fetch too many messages at once... After all, if there are too many, they cannot process them. Reducing the speed is also not a problem. The issue is resolved, and the next step is to migrate the produced items to the MQ, hoping there won't be any more problems...