日曜日の時、突然消費者グループが積み上がり始め、仮の解決策として新しい消費者グループを作成してみることにしたが、うまくいかなかった。仕方なく、redis をメッセージミドルウェアとして使用する方法に戻ることにし、月曜日に再度詳しく調査することにした。
1 発生したエラーと現象#
まずは Dashboard を見て、消費者グループの中に消費者がいる時といない時があることに気づいた。
次に消費者グループの業務ログを確認したが、消費はしているものの非常に遅かった。
その後、消費グループの RocketMQ ログを確認すると、エラーが連続して表示されていた。
2023-01-08 12:26:45,649 WARN RocketmqClient - execute the pull request exception
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 24 DESC: the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details. BROKER: 192.168.1.58:10911
at org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:803)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$200(MQClientAPIImpl.java:175)
at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:754)
at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:321)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
2023-01-08 12:26:45,649 WARN RocketmqClient - execute the pull request exception
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 24 DESC: the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details. BROKER: 192.168.1.58:10911
at org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:803)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$200(MQClientAPIImpl.java:175)
at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:754)
at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:321)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
このエラーが連続して表示されるため、RocketMQ の消費者クライアントのソースコードを探すことにした。
2 ソースコードの追跡#
2.1 消費者クライアント側のソースコード#
org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:803)
が問題の行である。
ここから見ると、broker 側から引き戻されたデータに問題があるようで、正常に消費できるデータではない。次に上層のRemotingCommand response
オブジェクトがどのように生成されているかを確認する。
さらに上に進むと、同期と非同期の 2 つの場所があり、同期の方がコードがシンプルで明確なので、そちらを確認することにした。最終的な機能は同じである。以前に netty を学び、RocketMQ が Netty を使用して通信していることを知っていたので、このメソッド名this.remotingClient.invokeSync
から、broker 側と通信して消費するメッセージを取得していることが推測できる。次に進む。
response
オブジェクトはこのメソッドinvokeSyncImpl
から返されていることがわかるので、さらに確認する。
ここから見ると、このオブジェクトに特別な処理は行われていない。ここで疑問が生じるのは、なぜ正常に返されるのかということだ。内部に多くの if 文があるが、CODE: 24 DESC: the consumer's group info not exist
というエラーから、response
オブジェクトは確実に存在し、正常に封装されているはずである。そうでなければ、このエラーログは出力されないはずだ。
消費者クライアントのソースコードは確認できたが、特に問題は見つからなかった。このエラーは broker 側から消費者クライアントに返されたものであるため、broker のソースコードを探してこのエラーがどこで発生しているかを調べることにした。
2.2 broker 側のソースコード追跡#
まず、このエラーの内容を検索したところ、org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)
の 178 行目にたどり着いた。
ここではthis.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
の結果が null であることがわかる。詳細を確認する。
consumerTable
から null が取得されていることがわかる。
private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
このオブジェクトは ConcurrentMap であり、null が取得される場合は、キーが存在しないことを意味する。つまり、消費者グループの情報が存在しないことになる。明らかに消費者グループを起動したのに、なぜ存在しないのか。どこかで remove されてしまったのだろう。次に remove メソッドが呼び出されている場所を調べることにした。
org.apache.rocketmq.broker.client.ConsumerManager#doChannelCloseEvent
org.apache.rocketmq.broker.client.ConsumerManager#unregisterConsumer
この 2 つのメソッドのロジックに remove メソッドがあることがわかった。
ここまで来たら、broker 側のログを確認する必要があると感じた。
これだけのログがある中で、どこから見始めればよいのか。まずはログが多いものを選んでみる。エラーが頻発しているため、ログの量が多く、ファイルも大きいはずだ。
このスクリーンショットはすでにアーカイブされているようで、現在の broker.log は 16M で、問題が発生したときは 100M を超えていた。
まずはbroker.log
を確認する。
ログの中からこの情報を見つけた。
新しい消費者グループ情報が作成されたが、すぐに削除されたようだ。
NETTY EVENT: remove not active channel
というのは WARN であり、これがどこで発生しているのかを調べることにした。
ちょうどorg.apache.rocketmq.broker.client.ConsumerManager#doChannelCloseEvent
の中にあり、org.apache.rocketmq.broker.client.ConsumerManager#unregisterConsumer
は除外できる。
このメソッドの上層は以下の通り。
これらのメソッドはすべて run () メソッドの中で実行されている。
this.eventQueue
は NettyEvent を格納する場所であり、これらの 3 つのメソッドはすべてここから呼び出されている。
次に、NettyEvent が eventQueue に追加される場所を確認する。
このクラスのputNettyEvent
メソッドだけが add を行うことがわかる。
このメソッドがどこから呼び出されているかを確認する。
合計でこれだけの場所から呼び出されている。broker のコードを見ているので、サーバー側だろう。これらの 4 つに絞り込むことにした。
最初の type タイプはCONNECT
であるため、これを除外し、残りの 3 つを確認する。
これらのメソッドはすべてログ出力を伴っているため、ログ出力を確認し、具体的にどのイベントが消費者グループ情報の削除を引き起こしたのかを特定することにした。
ここで問題が発生する。サーバー側の多くのログファイルの中で、どれを見ればよいのか。
このコードがどのパッケージに属しているかを確認することにした。
remoting に属していることがわかるので、対応する名前のログを確認し、消費者を起動してログを観察する。
Active の後、すぐに Inactive になっていることがわかる。これが原因であることは明らかである。
次にこのログ出力を検索すると、CLOSE イベントがトリガーされたことがわかる。
ここまで来ると少し混乱してしまった。まず CLOSE イベントは、正常にクローズされたことを示しており、クライアントが非アクティブであるために発生したものである。つまり、クライアントに問題があると考えられる。
3 クライアントログの確認を続ける#
一周回って、再びクライアントログに戻ってきた。以前はCODE: 24 DESC: the consumer's group info not exist
が頻発していたが、これは WARN タイプのログである。ふと思いついて ERROR を検索してみると、真の問題を見つけることができた。
これは見覚えがある。Netty のデコーダークラスであり、メッセージが長すぎるとエラーが発生する。これはメッセージが長すぎることを示唆しているが、私のメッセージは確実に 1M を超えないはずである。スタックトレースからorg.apache.rocketmq.remoting.netty.NettyDecoder.decode
でエラーが発生していることがわかる。
エラーが発生しているのは 42 行目で、これは現在のメッセージが大きすぎてデコードに問題があることを示している。私は、複数のメッセージが 1 つの大きなメッセージにパッケージ化されていると推測し、消費者クライアントがそれを解包処理しているのだろう。私のメッセージはおそらく 1M に近いものであり、100 個のメッセージを一度に取得しているため、合計が16777216
を超える可能性がある。
そこで、消費者の起動時に-Dcom.rocketmq.remoting.frameMaxLength=167772160
を追加し、10 倍に増やしてみた。再起動すると効果があり、以前よりは改善された。少なくともエラーログは頻発しなくなったが、まだ正常ではなく、時折エラーが発生し、Dashboard 上でも消費者がいる時といない時があった。これは正常ではないため、取得するメッセージの数を減らすことにした。最終的に 10 に減らし、consumer.pullBatchSize = 10
というパラメータを設定した。
これでこのエラーは解消され、私の技術レベルでは問題を解決できたが、問題の根本原因はまだわからなかった。
RemotingUtil.closeChannel(ctx.channel());
の中でエラーが発生した後、このチャネルを閉じることが呼び出されるため、これが原因であると推測される。具体的には、今後さらに学んでいく必要がある。
4 まとめ#
今回のソースコード追跡を通じて、RocketMQ のメッセージは大きすぎてはいけないことを実感した。そうでないと、さまざまな奇妙な問題が発生する。しかし、私たちのビジネス側のメッセージを 1 つずつ送信すると非常に多くなってしまうため、ビジネス側でメッセージを 1 つにまとめてから MQ にバッチ送信する必要がある。これにより、単一のメッセージが 1M に近づく可能性がある。また、消費者側も一度に取得するメッセージの数を多くしすぎてはいけない。多すぎると処理できなくなるため、速度を落とすことも問題ない。問題は解決したので、次は生産物を MQ に移行することにし、今後問題が発生しないことを願っている。