fang7

fang7

coder
tg_channel

RocketMQのオンライン消費者の積み残し問題

日曜日の時、突然消費者グループが積み上がり始め、仮の解決策として新しい消費者グループを作成してみることにしたが、うまくいかなかった。仕方なく、redis をメッセージミドルウェアとして使用する方法に戻ることにし、月曜日に再度詳しく調査することにした。

1 発生したエラーと現象#

まずは Dashboard を見て、消費者グループの中に消費者がいる時といない時があることに気づいた。

Pasted image 20230109210346

次に消費者グループの業務ログを確認したが、消費はしているものの非常に遅かった。
その後、消費グループの RocketMQ ログを確認すると、エラーが連続して表示されていた。

2023-01-08 12:26:45,649 WARN RocketmqClient - execute the pull request exception
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 24  DESC: the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details. BROKER: 192.168.1.58:10911
        at org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:803)
        at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$200(MQClientAPIImpl.java:175)
        at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:754)
        at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
        at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:321)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

2023-01-08 12:26:45,649 WARN RocketmqClient - execute the pull request exception
org.apache.rocketmq.client.exception.MQBrokerException: CODE: 24  DESC: the consumer's group info not exist
See http://rocketmq.apache.org/docs/faq/ for further details. BROKER: 192.168.1.58:10911
        at org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:803)
        at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$200(MQClientAPIImpl.java:175)
        at org.apache.rocketmq.client.impl.MQClientAPIImpl$2.operationComplete(MQClientAPIImpl.java:754)
        at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
        at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:321)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

このエラーが連続して表示されるため、RocketMQ の消費者クライアントのソースコードを探すことにした。

2 ソースコードの追跡#

2.1 消費者クライアント側のソースコード#

org.apache.rocketmq.client.impl.MQClientAPIImpl.processPullResponse(MQClientAPIImpl.java:803)が問題の行である。

Pasted image 20230109210903

ここから見ると、broker 側から引き戻されたデータに問題があるようで、正常に消費できるデータではない。次に上層のRemotingCommand responseオブジェクトがどのように生成されているかを確認する。

Pasted image 20230109211100

さらに上に進むと、同期と非同期の 2 つの場所があり、同期の方がコードがシンプルで明確なので、そちらを確認することにした。最終的な機能は同じである。以前に netty を学び、RocketMQ が Netty を使用して通信していることを知っていたので、このメソッド名this.remotingClient.invokeSyncから、broker 側と通信して消費するメッセージを取得していることが推測できる。次に進む。

Pasted image 20230109211718

responseオブジェクトはこのメソッドinvokeSyncImplから返されていることがわかるので、さらに確認する。

Pasted image 20230109211858

ここから見ると、このオブジェクトに特別な処理は行われていない。ここで疑問が生じるのは、なぜ正常に返されるのかということだ。内部に多くの if 文があるが、CODE: 24 DESC: the consumer's group info not existというエラーから、responseオブジェクトは確実に存在し、正常に封装されているはずである。そうでなければ、このエラーログは出力されないはずだ。
消費者クライアントのソースコードは確認できたが、特に問題は見つからなかった。このエラーは broker 側から消費者クライアントに返されたものであるため、broker のソースコードを探してこのエラーがどこで発生しているかを調べることにした。

2.2 broker 側のソースコード追跡#

まず、このエラーの内容を検索したところ、org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)の 178 行目にたどり着いた。

Pasted image 20230109212642

ここではthis.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());の結果が null であることがわかる。詳細を確認する。

Pasted image 20230109212857

consumerTableから null が取得されていることがわかる。

private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =  
    new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);

このオブジェクトは ConcurrentMap であり、null が取得される場合は、キーが存在しないことを意味する。つまり、消費者グループの情報が存在しないことになる。明らかに消費者グループを起動したのに、なぜ存在しないのか。どこかで remove されてしまったのだろう。次に remove メソッドが呼び出されている場所を調べることにした。

org.apache.rocketmq.broker.client.ConsumerManager#doChannelCloseEvent
org.apache.rocketmq.broker.client.ConsumerManager#unregisterConsumer

この 2 つのメソッドのロジックに remove メソッドがあることがわかった。
ここまで来たら、broker 側のログを確認する必要があると感じた。

Pasted image 20230109214021

これだけのログがある中で、どこから見始めればよいのか。まずはログが多いものを選んでみる。エラーが頻発しているため、ログの量が多く、ファイルも大きいはずだ。
このスクリーンショットはすでにアーカイブされているようで、現在の broker.log は 16M で、問題が発生したときは 100M を超えていた。
まずはbroker.logを確認する。

Pasted image 20230109215921

ログの中からこの情報を見つけた。
新しい消費者グループ情報が作成されたが、すぐに削除されたようだ。
NETTY EVENT: remove not active channelというのは WARN であり、これがどこで発生しているのかを調べることにした。

Pasted image 20230109220238

ちょうどorg.apache.rocketmq.broker.client.ConsumerManager#doChannelCloseEventの中にあり、org.apache.rocketmq.broker.client.ConsumerManager#unregisterConsumerは除外できる。
このメソッドの上層は以下の通り。

Pasted image 20230109220433

Pasted image 20230109220519

これらのメソッドはすべて run () メソッドの中で実行されている。

Pasted image 20230109220642

this.eventQueueは NettyEvent を格納する場所であり、これらの 3 つのメソッドはすべてここから呼び出されている。
次に、NettyEvent が eventQueue に追加される場所を確認する。

Pasted image 20230109221024

このクラスのputNettyEventメソッドだけが add を行うことがわかる。
このメソッドがどこから呼び出されているかを確認する。

Pasted image 20230109221159

合計でこれだけの場所から呼び出されている。broker のコードを見ているので、サーバー側だろう。これらの 4 つに絞り込むことにした。
最初の type タイプはCONNECTであるため、これを除外し、残りの 3 つを確認する。

Pasted image 20230109221444

これらのメソッドはすべてログ出力を伴っているため、ログ出力を確認し、具体的にどのイベントが消費者グループ情報の削除を引き起こしたのかを特定することにした。
ここで問題が発生する。サーバー側の多くのログファイルの中で、どれを見ればよいのか。
このコードがどのパッケージに属しているかを確認することにした。

Pasted image 20230109221733

remoting に属していることがわかるので、対応する名前のログを確認し、消費者を起動してログを観察する。

Pasted image 20230109222002

Active の後、すぐに Inactive になっていることがわかる。これが原因であることは明らかである。
次にこのログ出力を検索すると、CLOSE イベントがトリガーされたことがわかる。

Pasted image 20230109222125

ここまで来ると少し混乱してしまった。まず CLOSE イベントは、正常にクローズされたことを示しており、クライアントが非アクティブであるために発生したものである。つまり、クライアントに問題があると考えられる。

3 クライアントログの確認を続ける#

一周回って、再びクライアントログに戻ってきた。以前はCODE: 24 DESC: the consumer's group info not existが頻発していたが、これは WARN タイプのログである。ふと思いついて ERROR を検索してみると、真の問題を見つけることができた。

Pasted image 20230109222603

これは見覚えがある。Netty のデコーダークラスであり、メッセージが長すぎるとエラーが発生する。これはメッセージが長すぎることを示唆しているが、私のメッセージは確実に 1M を超えないはずである。スタックトレースからorg.apache.rocketmq.remoting.netty.NettyDecoder.decodeでエラーが発生していることがわかる。

Pasted image 20230109222945

エラーが発生しているのは 42 行目で、これは現在のメッセージが大きすぎてデコードに問題があることを示している。私は、複数のメッセージが 1 つの大きなメッセージにパッケージ化されていると推測し、消費者クライアントがそれを解包処理しているのだろう。私のメッセージはおそらく 1M に近いものであり、100 個のメッセージを一度に取得しているため、合計が16777216を超える可能性がある。
そこで、消費者の起動時に-Dcom.rocketmq.remoting.frameMaxLength=167772160を追加し、10 倍に増やしてみた。再起動すると効果があり、以前よりは改善された。少なくともエラーログは頻発しなくなったが、まだ正常ではなく、時折エラーが発生し、Dashboard 上でも消費者がいる時といない時があった。これは正常ではないため、取得するメッセージの数を減らすことにした。最終的に 10 に減らし、consumer.pullBatchSize = 10というパラメータを設定した。
これでこのエラーは解消され、私の技術レベルでは問題を解決できたが、問題の根本原因はまだわからなかった。
RemotingUtil.closeChannel(ctx.channel());の中でエラーが発生した後、このチャネルを閉じることが呼び出されるため、これが原因であると推測される。具体的には、今後さらに学んでいく必要がある。

4 まとめ#

今回のソースコード追跡を通じて、RocketMQ のメッセージは大きすぎてはいけないことを実感した。そうでないと、さまざまな奇妙な問題が発生する。しかし、私たちのビジネス側のメッセージを 1 つずつ送信すると非常に多くなってしまうため、ビジネス側でメッセージを 1 つにまとめてから MQ にバッチ送信する必要がある。これにより、単一のメッセージが 1M に近づく可能性がある。また、消費者側も一度に取得するメッセージの数を多くしすぎてはいけない。多すぎると処理できなくなるため、速度を落とすことも問題ない。問題は解決したので、次は生産物を MQ に移行することにし、今後問題が発生しないことを願っている。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。