全体的な手順
- MQ クライアントの依存関係のインポート
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
- メッセージ送信者のステップ・バイ・ステップ分析
1.メッセージ・プロデューサーとプロデューサー・グループ名を作成する。
2.ネームサーバーのアドレスを指定する
3.プロデューサーの開始
4.メッセージ・オブジェクトを作成し、トピック、タグ、メッセージ・ボディを指定する。
5.メッセージの送信
6.クローズ・プロデューサー
- メッセージ消費者の段階的分析
1.コンシューマ・グループ名を持つコンシューマConsumerを作成する。
2.ネームサーバーのアドレスを指定する
3.トピックとタグを購読する
4.メッセージを処理するコールバック関数を設定する
5.コンシューマの開始
2、基本サンプルのRocketMQメッセージ送信と消費
メッセージ配信
)同期メッセージの送信
この種の信頼性の高い同期送信方法は、重要なメッセージの通知、SMS通知など、広く使用されています。
public class SyncProducer {
public static void main(String[] args) throws Exception {
// メッセージ・プロデューサーをインスタンス化する プロデューサー
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// NameServerのアドレスを指定する
producer.setNamesrvAddr("localhost:9876");
// プロデューサ・インスタンスを開始する
producer.start();
for (int i = 0; i < 100; i++) {
// メッセージを作成し、Topic、Tag、メッセージ・ボディを指定する。
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// ブローカーにメッセージを送る
SendResult sendResult = producer.send(msg);
// sendResultは、メッセージが正常に配信されたかどうかを返す。
System.out.printf("%s%n", sendResult);
}
// これ以上メッセージが送信されない場合、Producerインスタンスはクローズされる。
producer.shutdown();
}
}
)非同期メッセージの送信
非同期メッセージは通常、レスポンス・タイムに敏感なビジネス・シナリオで使用されます。
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// メッセージ・プロデューサーをインスタンス化する プロデューサー
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// NameServerのアドレスを設定する
producer.setNamesrvAddr("localhost:9876");
// プロデューサ・インスタンスを開始する
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 100; i++) {
final int index = i;
// メッセージを作成し、Topic、Tag、メッセージ・ボディを指定する。
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallbackコールバックで非同期結果を受け取る
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// これ以上メッセージが送信されない場合、Producerインスタンスはクローズされる。
producer.shutdown();
}
}
)メッセージは一方向に送信されます
この方法は、ログ送信など、送信結果を特に気にしない場面で主に使われます。
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// メッセージ・プロデューサーをインスタンス化する プロデューサー
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// NameServerのアドレスを設定する
producer.setNamesrvAddr("localhost:9876");
// プロデューサ・インスタンスを開始する
producer.start();
for (int i = 0; i < 100; i++) {
// メッセージを作成し、Topic、Tag、メッセージ・ボディを指定する。
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 一方向メッセージは、結果を返すことなく送信される
producer.sendOneway(msg);
}
// これ以上メッセージが送信されない場合、Producerインスタンスはクローズされる。
producer.shutdown();
}
}
メッセージの消費
)負荷分散モード
コンシューマは、複数のコンシューマがキューに入れられたメッセージを一緒に消費し、各コンシューマが異なるメッセージを処理する、負荷分散された方法でメッセージを消費します。
public static void main(String[] args) throws Exception {
// メッセージ・プロデューサをインスタンス化し、グループ名を指定する。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// Namesrvアドレス情報を指定する.
consumer.setNamesrvAddr("localhost:9876");
// トピックを購読する
consumer.subscribe("Test", "*");
//負荷分散モード消費
consumer.setMessageModel(MessageModel.CLUSTERING);
// メッセージを処理するコールバック関数を登録する
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//イニシエータ
consumer.start();
System.out.printf("Consumer Started.%n");
}
)ブロードキャスト・モード
消費者はブロードキャストを使用してメッセージを消費し、各消費者は同じメッセージを消費します。
public static void main(String[] args) throws Exception {
// メッセージ・プロデューサをインスタンス化し、グループ名を指定する。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// Namesrvアドレス情報を指定する.
consumer.setNamesrvAddr("localhost:9876");
// トピックを購読する
consumer.subscribe("Test", "*");
//ブロードキャスト・モードの消費
consumer.setMessageModel(MessageModel.BROADCASTING);
// メッセージを処理するコールバック関数を登録する
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n",
Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//イニシエータ
consumer.start();
System.out.printf("Consumer Started.%n");
}