blog

基本的な RocketMQ メッセージ送信と消費の例

この信頼性が高く同期的な送信方法は、重要メッセージの通知、SMS通知など、広く使用されています。 非同期メッセージは通常、応答時間に敏感なビジネス・シナリオで使用されます。つまり、送信者はブローカーか...

Dec 18, 2020 · 5 min. read
シェア

全体的な手順

  • MQ クライアントの依存関係のインポート
<dependency>
 <groupId>org.apache.rocketmq</groupId>
 <artifactId>rocketmq-client</artifactId>
 <version>4.4.0</version>
</dependency>
  • メッセージ送信者のステップ・バイ・ステップ分析
1.メッセージ・プロデューサーとプロデューサー・グループ名を作成する。
2.ネームサーバーのアドレスを指定する
3.プロデューサーの開始
4.メッセージ・オブジェクトを作成し、トピック、タグ、メッセージ・ボディを指定する。
5.メッセージの送信
6.クローズ・プロデューサー
  • メッセージ消費者の段階的分析
1.コンシューマ・グループ名を持つコンシューマConsumerを作成する。
2.ネームサーバーのアドレスを指定する
3.トピックとタグを購読する
4.メッセージを処理するコールバック関数を設定する
5.コンシューマの開始

2、基本サンプルのRocketMQメッセージ送信と消費

メッセージ配信

)同期メッセージの送信

この種の信頼性の高い同期送信方法は、重要なメッセージの通知、SMS通知など、広く使用されています。

public class SyncProducer {
 public static void main(String[] args) throws Exception {
 // メッセージ・プロデューサーをインスタンス化する プロデューサー
 DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
 // NameServerのアドレスを指定する
 producer.setNamesrvAddr("localhost:9876");
 // プロデューサ・インスタンスを開始する
 producer.start();
 for (int i = 0; i < 100; i++) {
 // メッセージを作成し、Topic、Tag、メッセージ・ボディを指定する。
 Message msg = new Message("TopicTest" /* Topic */,
 "TagA" /* Tag */,
 ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
 );
 // ブローカーにメッセージを送る
 SendResult sendResult = producer.send(msg);
 // sendResultは、メッセージが正常に配信されたかどうかを返す。
 System.out.printf("%s%n", sendResult);
 }
 // これ以上メッセージが送信されない場合、Producerインスタンスはクローズされる。
 producer.shutdown();
 }
}

)非同期メッセージの送信

非同期メッセージは通常、レスポンス・タイムに敏感なビジネス・シナリオで使用されます。

public class AsyncProducer {
 public static void main(String[] args) throws Exception {
 // メッセージ・プロデューサーをインスタンス化する プロデューサー
 DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
 // NameServerのアドレスを設定する
 producer.setNamesrvAddr("localhost:9876");
 // プロデューサ・インスタンスを開始する
 producer.start();
 producer.setRetryTimesWhenSendAsyncFailed(0);
 for (int i = 0; i < 100; i++) {
 final int index = i;
 // メッセージを作成し、Topic、Tag、メッセージ・ボディを指定する。
 Message msg = new Message("TopicTest",
 "TagA",
 "OrderID188",
 "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
 // SendCallbackコールバックで非同期結果を受け取る
 producer.send(msg, new SendCallback() {
 @Override
 public void onSuccess(SendResult sendResult) {
 System.out.printf("%-10d OK %s %n", index,
 sendResult.getMsgId());
 }
 @Override
 public void onException(Throwable e) {
 System.out.printf("%-10d Exception %s %n", index, e);
 e.printStackTrace();
 }
 });
 }
 // これ以上メッセージが送信されない場合、Producerインスタンスはクローズされる。
 producer.shutdown();
 }
}

)メッセージは一方向に送信されます

この方法は、ログ送信など、送信結果を特に気にしない場面で主に使われます。

public class OnewayProducer {
 public static void main(String[] args) throws Exception{
 // メッセージ・プロデューサーをインスタンス化する プロデューサー
 DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
 // NameServerのアドレスを設定する
 producer.setNamesrvAddr("localhost:9876");
 // プロデューサ・インスタンスを開始する
 producer.start();
 for (int i = 0; i < 100; i++) {
 // メッセージを作成し、Topic、Tag、メッセージ・ボディを指定する。
 Message msg = new Message("TopicTest" /* Topic */,
 "TagA" /* Tag */,
 ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
 );
 // 一方向メッセージは、結果を返すことなく送信される
 producer.sendOneway(msg);
 }
 // これ以上メッセージが送信されない場合、Producerインスタンスはクローズされる。
 producer.shutdown();
 }
}

メッセージの消費

)負荷分散モード

コンシューマは、複数のコンシューマがキューに入れられたメッセージを一緒に消費し、各コンシューマが異なるメッセージを処理する、負荷分散された方法でメッセージを消費します。

public static void main(String[] args) throws Exception {
 // メッセージ・プロデューサをインスタンス化し、グループ名を指定する。
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
 // Namesrvアドレス情報を指定する.
 consumer.setNamesrvAddr("localhost:9876");
 // トピックを購読する
 consumer.subscribe("Test", "*");
 //負荷分散モード消費
 consumer.setMessageModel(MessageModel.CLUSTERING);
 // メッセージを処理するコールバック関数を登録する
 consumer.registerMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
 ConsumeConcurrentlyContext context) {
 System.out.printf("%s Receive New Messages: %s %n", 
 Thread.currentThread().getName(), msgs);
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
 });
 //イニシエータ
 consumer.start();
 System.out.printf("Consumer Started.%n");
}

)ブロードキャスト・モード

消費者はブロードキャストを使用してメッセージを消費し、各消費者は同じメッセージを消費します。

public static void main(String[] args) throws Exception {
 // メッセージ・プロデューサをインスタンス化し、グループ名を指定する。
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
 // Namesrvアドレス情報を指定する.
 consumer.setNamesrvAddr("localhost:9876");
 // トピックを購読する
 consumer.subscribe("Test", "*");
 //ブロードキャスト・モードの消費
 consumer.setMessageModel(MessageModel.BROADCASTING);
 // メッセージを処理するコールバック関数を登録する
 consumer.registerMessageListener(new MessageListenerConcurrently() {
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
 ConsumeConcurrentlyContext context) {
 System.out.printf("%s Receive New Messages: %s %n", 
 Thread.currentThread().getName(), msgs);
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
 });
 //イニシエータ
 consumer.start();
 System.out.printf("Consumer Started.%n");
}
Read next

dubbo-goのbroadcastClusterについて語る

この記事では dubbo-go の -go に焦点を当てています。

Dec 18, 2020 · 2 min read