blog

超詳しいRabbitMQ初心者は、これだけ読めば十分だ!

メッセージとは、2つのアプリケーション間でやり取りされるデータのことです。データには、テキスト文字列から埋め込みオブジェクトまで、さまざまなタイプがあります。 メッセージ・キュー」は、送信中のメッセー...

Nov 3, 2020 · 22 min. read
シェア

マインドマップ

I. メッセージキューとは

メッセージとは、2 つのアプリケーション間でやり取りされるデータのことです。データの種類にはさまざまな形があり、テキスト文字列や埋め込みオブジェクトだけが含まれることもあります。

メッセージキュー "とは、メッセージの送信中にメッセージを保持するコンテナのことです。メッセージ・キューには、通常、プロデューサとコンシューマの2つの役割があります。プロデューサは、メッセージキューにデータを送信することだけに責任を持ち、誰がメッセージキューからデータを取り出して処理するかは気にしません。コンシューマは、メッセージキューからデータを取り出して処理することにのみ責任があり、誰がデータを送ったかは気にしません。

第二に、なぜメッセージキューを使うのでしょうか。

主な役割は3つあります:

  • デカップリング。図のように。システムAからデータを必要とするシステムB、C、Dがあるとします。そのため、システムAはB、C、Dにデータを送信するために3つのメソッドを呼び出します。そのデータを必要とする新しいシステムEがあるとすると、システムAは再びシステムEを呼び出すコードを追加しなければなりません。この強力な結合を減らすために、あなたはMQを使用することができます、システムAはMQにデータを送信する必要があります、他のシステムは、データが必要な場合は、MQから取得することができます

  • 非同期。図に示すように。クライアントの要求が送信され、システムAは、システムB、C、Dの3つのシステム、同期リクエストを呼び出します、応答時間は、システムA、B、C、Dの合計、つまり、800ミリ秒です。もしMQを使用すると、システムAは、MQにデータを送信し、クライアントへの応答を返すことができます、システムB、C、Dの応答を待つ必要はありません、あなたは大幅にパフォーマンスを向上させることができます。このようなSMSの送信、電子メールの送信など、いくつかの非本質的なビジネスでは、MQを使用することができます。

  • ピーククリッピング。図のように。これはMQの非常に重要なアプリケーションです。システムAは、時間のサージ要求の数の特定の期間で、そこに5000の要求が送信されるとすると、システムAは、実行のためにMySQLに5000のSQLを送信します、MySQLはもちろん、このような巨大な要求を処理することはできませんオーバー、MySQLがクラッシュし、その結果、システムの麻痺になります。あなたはMQを使用する場合は、システムAは、もはやデータベースに直接SQLを送信しますが、MQにデータを送信し、MQは、バックログデータの短い期間のために許容され、その後、消費者が処理するために2000をプルするたびに、リクエストのピーク期間を防ぐために、直接MySQLにシステムクラッシュにつながる送信要求の数が多い

III.RabbitMQの特徴

RabbitMQはErlang言語を使って開発されたオープンソースのメッセージングミドルウェアで、AMQP(Advanced Message Queuing Protocol)を実装しています。まず最初に、RabbitMQの特徴を知る必要があります公式サイト

  • 信頼性。永続性、トランスポート検証、パブリッシュ検証などのサポートにより、MQの信頼性が保証されます。
  • 柔軟なメッセージ配信戦略。これはRabbitMQの大きな特徴です。メッセージのルーティングは、メッセージがMQに入る前にExchange(スイッチ)によって行われます。メッセージの分散戦略には、シンプルモード、ワークキューモード、パブリッシュサブスクライブモード、ルーティングモード、ワイルドカードモードがあります。
  • クラスタリングのサポート。複数のRabbitMQサーバをクラスタ化し、論理的なBrokerを形成することができます。
  • RabbitMQはSTOMP、MQTTなど様々なメッセージキューイングプロトコルをサポートしています。
  • RabbitMQは、Java、.NET、Rubyなど、ほぼすべての一般的なプログラミング言語をサポートしています。
  • ビジュアル管理インターフェース RabbitMQは、メッセージブローカーを監視および管理できる使いやすいユーザーインターフェースを提供します。
  • プラグインメカニズム。RabbitMQは多くのプラグインを提供しており、プラグインを通じて拡張することができます。

第四に、RabbitMQの最初の経験

RabbitMQのインストール

私はWin10システムにインストールしているので、VMを開く必要はありません。Linuxシステムにインストールする場合は、DockerでRabbitMQ Imageをプルダウンすることをお勧めします。

erLang言語のインストールと環境変数の設定

まず、erlangの公式サイトサイトからwin10版のインストーラーをダウンロードしてください。

ダウンロードすると、このようになります:

その後、ダブルクリックしてインストールし、常に行の次(次)を指し、インストールした後、環境変数を設定します。

cmdコマンドでerl -versionと入力して確認してください:

RabbitMQサーバのインストール

RabbitMQgitHub、ウィンドウ版のサーバーインストールパッケージをダウンロードします。

ダウンロードしてください:

ダブルクリックでインストールする場合は、必ず「次へ」をタップし、インストールが完了したら、インストールディレクトリを探します:

次にrabbitmq-server.batをダブルクリックしてスクリプトを起動し、サービスマネージャを開いてRabbitMQが実行されていることを確認します:

この時点で、ブラウザを開き、"http://localhost:15672 "を入力します。アカウントのパスワードはデフォルトでguest/ゲストです。

この時点でインストールは完了です!

ハロー・ワード・フォーエバー

良いサーバーサイドを構築した後、確かに動作するようにクライアントを使用して、Javaの次の使用は、単純なHelloWordのデモを行います。

SpringBootを使っているので、対応するスターターの依存関係をプロデューサー側に追加するだけです:

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

一般に、キュー・トピック、スイッチ名、ルート・マッチ・キー名など、いくつかの設定を共有するために、パブリック・プロジェクト・コモンを作成する必要があります。

まず、application.ymlファイルにRabbitMQの設定情報を追加します:

spring:
 rabbitmq:
 host: .1
 port: 5672
 username: guest
 password: guest

次にプロデューサー側で、共通パッケージの maven 依存関係を追加し、Direct スイッチとキュー用のコンフィギュレーション・クラスを作成します:

@Configuration
public class DirectRabbitConfig {
 @Bean
 public Queue rabbitmqDemoDirectQueue() {
 /**
 * 1 name: キュー名
 * 2 durable: 永続性
 * 3 exclusive: 排他的・排他的かどうか。trueに設定すると 排他的キューとして定義される。そして、作成者だけがこのキューを使うことができる。つまり、非公開だ。
 * 4、autoDelete: 自動削除をするかどうか。一時キューとも呼ばれる。最後の消費者が切断すると自動的に削除される。
 * */
 return new Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC, true, false, false);
 }
 
 @Bean
 public DirectExchange rabbitmqDemoDirectExchange() {
 //Direct 
 return new DirectExchange(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, true, false);
 }
 @Bean
 public Binding bindDirect() {
 //チェーン、スイッチとキューのバインド、マッチングキーの設定
 return BindingBuilder
 //キューをバインドする
 .bind(rabbitmqDemoDirectQueue())
 //スイッチへ
 .to(rabbitmqDemoDirectExchange())
 //そして、マッチキーを設定する
 .with(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING);
 }
}

次に、メッセージを送信する Service クラスを作成します:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
 //日付のフォーマット
 private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 @Resource
 private RabbitTemplate rabbitTemplate;
 @Override
 public String sendMsg(String msg) throws Exception {
 try {
 String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
 String sendTime = sdf.format(new Date());
 Map<String, Object> map = new HashMap<>();
 map.put("msgId", msgId);
 map.put("sendTime", sendTime);
 map.put("msg", msg);
 rabbitTemplate.convertAndSend(RabbitMQConfig.RABBITMQ_DEMO_DIRECT_EXCHANGE, RabbitMQConfig.RABBITMQ_DEMO_DIRECT_ROUTING, map);
 return "ok";
 } catch (Exception e) {
 e.printStackTrace();
 return "error";
 }
 }
}

そして、定時タスクやインターフェイスのように、ビジネスに応じて使用する必要がある場所に配置します。ここではシンプルに、Controllerレイヤーを送信に使うことにします:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
 @Resource
 private RabbitMQService rabbitMQService;
 /**
 * メッセージを送信する
 * 
 */
 @PostMapping("/sendMsg")
 public String sendMsg(@RequestParam(name = "msg") String msg) throws Exception {
 return rabbitMQService.sendMsg(msg);
 }
}

mavenの依存関係、ymlファイルの設定とプロデューサーと同じです。図のようにクラスを作成し、@RabbitListenerアノテーションでリスニングキューの名前を書くだけです:

RabbitMQサーバにキューが作成されていないという小さな落とし穴があります:

この時点でコンシューマーを起動すると、エラーが報告されます:

まずプロデューサーを始めるには、メッセージを送ってください:

最後に、消費者は活性化され、消費します:

この時点では、常にキューをリッスンしていて、プロデューサーがMQにメッセージを送信するたびに、コンシューマーがメッセージを消費します。ここでは4つ送信しようとしています:

この問題は、キューが存在しないためにコンシューマがエラーを報告するというものです。これを行うための最善の方法は、プロデューサとコンシューマの双方がキューの作成を試みるようにすることです。これを行うにはどのように書けばよいのでしょうか。これを行う方法はいくつかありますが、ここでは比較的単純なものを使用することにします:

プロデューサーのコンフィギュレーション・クラスは何かを追加します:

//BeanPostProcessorクラスを実装して、Beanのライフサイクル機能を使用する。
@Component
public class DirectRabbitConfig implements BeanPostProcessor {
 //これがスイッチとキューの作成に使われるrabbitAdminオブジェクトだ。
 @Resource
 private RabbitAdmin rabbitAdmin;
 
 //rabbitAdminオブジェクトを初期化する
 @Bean
 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
 RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
 // SpringはRabbitAdminクラスをtrueに設定した場合のみロードする。
 rabbitAdmin.setAutoStartup(true);
 return rabbitAdmin;
 }
 
 //Beanをインスタンス化した後、つまりBeanの後処理である
 @Override
 public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
 //スイッチを作る
 rabbitAdmin.declareExchange(rabbitmqDemoDirectExchange());
 //キューを作る
 rabbitAdmin.declareQueue(rabbitmqDemoDirectQueue());
 return null;
 }
}

このようにして、スタートアップ・プロデューサーは、メッセージが送信されるまで待つことなく、スイッチとキューを自動的に作成します。

消費者はちょっとしたコードを追加する必要があります:

@Component
//queuesToDeclareプロパティを使って、キューが存在しない場合にキューを作成する。
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.RABBITMQ_DEMO_TOPIC))
public class RabbitDemoConsumer {
 //... 
}

これなら、生産者が先でも消費者が先でも問題はないでしょう

コードアドレスgithub.com/m

V. RabbitMQのコンポーネント

上記のHelloWordの例からも、その一端を体感できると思いますが、RabbitMQの構成には、このような部分があります:

  • ブローカー:メッセージキューサービスプロセス。このプロセスはExchangeとQueueの2つの部分から構成されます。
  • Exchange: メッセージキュースイッチ。特定のルールに従ってメッセージをルーティングし、キューに転送します。
  • Queue: メッセージ・キュー、メッセージを格納するキュー。
  • Producer: メッセージ・プロデューサー。プロデューサ・クライアントは、スイッチを持つキューにメッセージをルーティングします。
  • Consumer: メッセージ消費者。キューに格納されたメッセージを消費します。

これらのコンポーネントはどのように連動しているのでしょうか? おおよそのプロセスは以下の通りです:

  • メッセージプロデューサは RabbitMQ Broker に接続し、接続を作成し、チャネルを開きます。
  • プロデューサーは、スイッチのタイプ、名前、永続的かどうかなどを宣言します。
  • プロデューサはメッセージを送信し、メッセージが永続的かどうかやルーティング・キーなどの属性を指定します。
  • エクスチェンジがメッセージを受信すると、ルーティング・キーに基づいて現在のスイッチにマッチするキューにルーティングされます。
  • コンシューマ・リスナーはメッセージを受信し、ビジネス・プロセスを開始します。

VI.4種類の交換とその使用法

メッセージがRabbitMQに送信された後、対応するキューを見つけるためにまずExchangeを経由する必要があるからです。

Exchange には実際には 4 つのタイプがあり、タイプによって動作が異なります。HelloWord の例では、より単純なDirect Exchange が使用されています。他の 3 つは、ファンアウト交換、トピック交換、ヘッダー交換です。

直接交換

直接接続されたスイッチの意味は、このスイッチが、メッセージが特定のルーティングキーに正確に一致することを要求するキューにバインドする必要があることを意味します。簡単に言うと、1対1のポイント・ツー・ポイントの送信です。

完全なコードは上記のHelloWordの例で、コードの繰り返しはありません。

ファンアウト交換

このタイプのスイッチでは、キューがスイッチにバインドされる必要があります。あるスイッチに送信されたメッセージは、そのスイッチにバインドされたすべてのキューに転送されます。サブネットのブロードキャストと同様に、サブネット内の各ホストはメッセージのコピーを取得します。より簡単な言葉で言えば、これは publish subscribe です。

コードはどのように書かれていますか?

最初のステップは、まずスイッチとキュー名を設定することです:

public class RabbitMQConfig {
 /**
 * RabbitMQ FANOUT_EXCHANGスイッチ型のキューAの名前
 */
 public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_A = "fanout.A";
 /**
 * RabbitMQ FANOUT_EXCHANGスイッチタイプのキューBの名前
 */
 public static final String FANOUT_EXCHANGE_QUEUE_TOPIC_B = "fanout.B";
 /**
 * RabbitMQ FANOUT_EXCHANGスイッチ・タイプの名前
 */
 public static final String FANOUT_EXCHANGE_DEMO_NAME = "fanout.exchange.demo.name";
}

次に、FanoutExchangeタイプのスイッチと2つのキューAおよびBを設定し、バインドします。このタイプでは、ルーティングキーを設定する必要はありません:

@Component
public class DirectRabbitConfig implements BeanPostProcessor {
 @Resource
 private RabbitAdmin rabbitAdmin;
 
 @Bean
 public Queue fanoutExchangeQueueA() {
 // A
 return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A, true, false, false);
 }
 @Bean
 public Queue fanoutExchangeQueueB() {
 // B
 return new Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B, true, false, false);
 }
 @Bean
 public FanoutExchange rabbitmqDemoFanoutExchange() {
 //FanoutExchangeスイッチを作る
 return new FanoutExchange(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, true, false);
 }
 @Bean
 public Binding bindFanoutA() {
 //キューAをFanoutExchangeスイッチにバインドする
 return BindingBuilder.bind(fanoutExchangeQueueA()).to(rabbitmqDemoFanoutExchange());
 }
 @Bean
 public Binding bindFanoutB() {
 //キューBをFanoutExchangeスイッチにバインドする
 return BindingBuilder.bind(fanoutExchangeQueueB()).to(rabbitmqDemoFanoutExchange());
 }
 
 @Override
 public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
 //プロジェクトの開始、つまりスイッチとキューの作成
 rabbitAdmin.declareExchange(rabbitmqDemoFanoutExchange());
 rabbitAdmin.declareQueue(fanoutExchangeQueueB());
 rabbitAdmin.declareQueue(fanoutExchangeQueueA());
 return null;
 }
}

サービスがメッセージを公開するためのメソッドを作成します:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
 private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 @Resource
 private RabbitTemplate rabbitTemplate;
 
 //リリース
 @Override
 public String sendMsgByFanoutExchange(String msg) throws Exception {
 Map<String, Object> message = getMessage(msg);
 try {
 rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_DEMO_NAME, "", message);
 return "ok";
 } catch (Exception e) {
 e.printStackTrace();
 return "error";
 }
 }
 //メッセージ・ボディを組み立てる
 private Map<String, Object> getMessage(String msg) {
 String msgId = UUID.randomUUID().toString().replace("-", "").substring(0, 32);
 String sendTime = sdf.format(new Date());
 Map<String, Object> map = new HashMap<>();
 map.put("msgId", msgId);
 map.put("sendTime", sendTime);
 map.put("msg", msg);
 return map;
 }
}

コントローラのインターフェイス:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
 /**
 * リリース
 *
 * 
 */
 @PostMapping("/publish")
 public String publish(@RequestParam(name = "msg") String msg) throws Exception {
 return rabbitMQService.sendMsgByFanoutExchange(msg);
 }
}

次に、このプロジェクトのコンシューマ側で、消費用のキューをリッスンする 2 つのキューリスナークラスを作成します:

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_A))
public class FanoutExchangeConsumerA {
 @RabbitHandler
 public void process(Map<String, Object> map) {
 System.out.println("キューAがメッセージを受信する:" + map.toString());
 }
}
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.FANOUT_EXCHANGE_QUEUE_TOPIC_B))
public class FanoutExchangeConsumerB {
 @RabbitHandler
 public void process(Map<String, Object> map) {
 System.out.println("キューBがメッセージを受信する:" + map.toString());
 }
}

次にプロデューサプロジェクトとコンシューマプロジェクトの両方を起動すると、管理インターフェイスがFanoutExchangeスイッチと2つのキューを作成し、それらをバインドしていることがわかります:

メッセージの送信とテストには POSTMAN を使ってください:

コンソール上では、両方のキューが同時に同じメッセージを受信し、パブリッシュ・サブスクライブ効果が生じていることが確認できます:

トピック交換

直訳するとトピックスイッチと呼ばれますが、用法から訳すとワイルドカードスイッチと 呼ぶ方が適切かもしれません。このタイプのスイッチは、ワイルドカードを使用してマッチを行い、対応するキューにルーティングします。ワイルドカードには、"*"と "#"の2種類があります。ワイルドカードの前には". "記号を付けなければならないことに注意してください。記号が必要です。

* :: 記号: 1つの単語のみにマッチします。例えば、a.*は "a.b"、"a.c "にはマッチしますが、"a.b.c "にはマッチしません。

# 記号:1つ以上の単語にマッチします。例えば、"rabbit.#"は "rabbit.a.b"、"rabbit.a"、"rabbit.a.b.c "のいずれかにマッチします。

早速、コードを見てみましょう:

まだ、TopicExchange 名と 3 つのキューの名前を設定しています:

 /**
 * RabbitMQ TOPIC_EXCHANGEスイッチ名
 */
 public static final String TOPIC_EXCHANGE_DEMO_NAME = "topic.exchange.demo.name";
 /**
 * RabbitMQ TOPIC_EXCHANGEスイッチのキューの名前A
 */
 public static final String TOPIC_EXCHANGE_QUEUE_A = "topic.queue.a";
 /**
 * RabbitMQ TOPIC_EXCHANGEスイッチのキューの名前B
 */
 public static final String TOPIC_EXCHANGE_QUEUE_B = "topic.queue.b";
 /**
 * RabbitMQ TOPIC_EXCHANGEスイッチのキューの名前 C
 */
 public static final String TOPIC_EXCHANGE_QUEUE_C = "topic.queue.c";

それからスイッチとキューを設定し、バインドして作成するという、昔ながらのレシピです:

@Component
public class DirectRabbitConfig implements BeanPostProcessor {
 // ...
 
 @Bean
 public TopicExchange rabbitmqDemoTopicExchange() {
 //TopicExchangeスイッチの設定
 return new TopicExchange(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, true, false);
 }
 @Bean
 public Queue topicExchangeQueueA() {
 //キューを作る1
 return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A, true, false, false);
 }
 @Bean
 public Queue topicExchangeQueueB() {
 //キューを作る2
 return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B, true, false, false);
 }
 @Bean
 public Queue topicExchangeQueueC() {
 //キューを作る3
 return new Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C, true, false, false);
 }
 @Bean
 public Binding bindTopicA() {
 //キューAをFanoutExchangeスイッチにバインドする
 return BindingBuilder.bind(topicExchangeQueueB())
 .to(rabbitmqDemoTopicExchange())
 .with("a.*");
 }
 @Bean
 public Binding bindTopicB() {
 //キューAをFanoutExchangeスイッチにバインドする
 return BindingBuilder.bind(topicExchangeQueueC())
 .to(rabbitmqDemoTopicExchange())
 .with("a.*");
 }
 @Bean
 public Binding bindTopicC() {
 //キューAをFanoutExchangeスイッチにバインドする
 return BindingBuilder.bind(topicExchangeQueueA())
 .to(rabbitmqDemoTopicExchange())
 .with("rabbit.#");
 }
 
 @Override
 public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
 rabbitAdmin.declareExchange(rabbitmqDemoTopicExchange());
 rabbitAdmin.declareQueue(topicExchangeQueueA());
 rabbitAdmin.declareQueue(topicExchangeQueueB());
 rabbitAdmin.declareQueue(topicExchangeQueueC());
 return null;
 }
}

次に、メッセージを送信するサービス・メソッドを書きます:

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
 @Override
 public String sendMsgByTopicExchange(String msg, String routingKey) throws Exception {
 Map<String, Object> message = getMessage(msg);
 try {
 //メッセージを送信する
 rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE_DEMO_NAME, routingKey, message);
 return "ok";
 } catch (Exception e) {
 e.printStackTrace();
 return "error";
 }
 }
}

コントローラインターフェイスを記述します:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
 @Resource
 private RabbitMQService rabbitMQService;
 
 /**
 * ワイルドカード・スイッチからメッセージを送信する
 *
 * 
 */
 @PostMapping("/topicSend")
 public String topicSend(@RequestParam(name = "msg") String msg, @RequestParam(name = "routingKey") String routingKey) throws Exception {
 return rabbitMQService.sendMsgByTopicExchange(msg, routingKey);
 }
}

書き込みのプロデューサー側は、消費者側を書き込み、消費者側は比較的単純で、3つのリスナークラスを書き込みます:

@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A))
public class TopicExchangeConsumerA {
 @RabbitHandler
 public void process(Map<String, Object> map) {
 System.out.println(" [" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_A + "]ニュースがある:" + map.toString());
 }
}
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B))
public class TopicExchangeConsumerB {
 @RabbitHandler
 public void process(Map<String, Object> map) {
 System.out.println(" [" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_B+ "]ニュースがある:" + map.toString());
 }
}
@Component
@RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C))
public class TopicExchangeConsumerC {
 @RabbitHandler
 public void process(Map<String, Object> map) {
 System.out.println(" [" + RabbitMQConfig.TOPIC_EXCHANGE_QUEUE_C + "]ニュースがある:" + map.toString());
 }
}

では、プロジェクトを起動してデバッグを開始します。起動に成功すると、キューとルーティング・キー・バインディングの関係を見ることができます:

POSTMANでテストを実行し、rabbit.#ルーティングキーが正常にマッチするかどうかをテストします:

テストは成功し、キュー A はメッセージを消費します:

次に、routingKey = a.b を送信して、a.*ルーティングキーをテストします:

より一般的に使用されるのは、ダイレクト・コネクト、パブリッシュ・サブスクライブ、ワイルドカードの3つです。これら3種類のスイッチを上手に使いこなすことで、基本的にほとんどのビジネス・シナリオを解決することができます。

実際、少し考えてみると、このワイルドカードのパターンによって、直接接続とパブリッシュ購読の両方の効果が得られることがわかります。

ヘッダーの交換

この種のスイッチはあまり使われません。上の3つとは少し違っていて、経路のマッチングにroutingKeyを使うのではなく、リクエストヘッダにあるキーの値をマッチングして経路を決定します。図に示すように

キューを作成するには、バインディングのヘッダ情報を設定する必要があり、完全一致と部分一致の2つのモードがあります。上の図に示すように、スイッチはプロデューサーから送られたヘッダ情報に基づいてキューバインディングのキー値をマッチさせ、対応するキューにルーティングします。どのように実装するかは、以下のデモコードを参照してください:

まず、スイッチ名とキュー名を定義する必要があります:

 /**
 * HEADERS_EXCHANGEスイッチ名
 */
 public static final String HEADERS_EXCHANGE_DEMO_NAME = "headers.exchange.demo.name";
 /**
 * RabbitMQ HEADERS_EXCHANGEスイッチのキューの名前A
 */
 public static final String HEADERS_EXCHANGE_QUEUE_A = "headers.queue.a";
 /**
 * RabbitMQ HEADERS_EXCHANGEスイッチのキューの名前B
 */
 public static final String HEADERS_EXCHANGE_QUEUE_B = "headers.queue.b";

それからスイッチとキューをセットアップし、バインディングを行います:

@Component
public class DirectRabbitConfig implements BeanPostProcessor {
 @Bean
 public Queue headersQueueA() {
 return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A, true, false, false);
 }
 @Bean
 public Queue headersQueueB() {
 return new Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B, true, false, false);
 }
 @Bean
 public HeadersExchange rabbitmqDemoHeadersExchange() {
 return new HeadersExchange(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, true, false);
 }
 @Bean
 public Binding bindHeadersA() {
 Map<String, Object> map = new HashMap<>();
 map.put("key_one", "java");
 map.put("key_two", "rabbit");
 // 
 return BindingBuilder.bind(headersQueueA())
 .to(rabbitmqDemoHeadersExchange())
 .whereAll(map).match();
 }
 @Bean
 public Binding bindHeadersB() {
 Map<String, Object> map = new HashMap<>();
 map.put("headers_A", "coke");
 map.put("headers_B", "sky");
 //部分一致
 return BindingBuilder.bind(headersQueueB())
 .to(rabbitmqDemoHeadersExchange())
 .whereAny(map).match();
 }
 @Override
 public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
 rabbitAdmin.declareExchange(rabbitmqDemoHeadersExchange());
 rabbitAdmin.declareQueue(headersQueueA());
 rabbitAdmin.declareQueue(headersQueueB());
 return null;
 }
}

メッセージを送信する別のServiceメソッドを書きます。

@Service
public class RabbitMQServiceImpl implements RabbitMQService {
 @Resource
 private RabbitTemplate rabbitTemplate;
 
 @Override
 public String sendMsgByHeadersExchange(String msg, Map<String, Object> map) throws Exception {
 try {
 MessageProperties messageProperties = new MessageProperties();
 //メッセージの永続性
 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
 messageProperties.setContentType("UTF-8");
 //メッセージを追加する
 messageProperties.getHeaders().putAll(map);
 Message message = new Message(msg.getBytes(), messageProperties);
 rabbitTemplate.convertAndSend(RabbitMQConfig.HEADERS_EXCHANGE_DEMO_NAME, null, message);
 return "ok";
 } catch (Exception e) {
 e.printStackTrace();
 return "error";
 }
 }
}

別のコントローラインターフェイスを記述します:

@RestController
@RequestMapping("/mall/rabbitmq")
public class RabbitMQController {
 @Resource
 private RabbitMQService rabbitMQService;
 
 @PostMapping("/headersSend")
 @SuppressWarnings("unchecked")
 public String headersSend(@RequestParam(name = "msg") String msg,
 @RequestParam(name = "json") String json) throws Exception {
 ObjectMapper mapper = new ObjectMapper();
 Map<String, Object> map = mapper.readValue(json, Map.class);
 return rabbitMQService.sendMsgByHeadersExchange(msg, map);
 }
}

プロデューサー・サイドが書かれると、消費用にさらに2つのキュー・リスナー・クラスが書かれます:

@Component
public class HeadersExchangeConsumerA {
 @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A))
 public void process(Message message) throws Exception {
 MessageProperties messageProperties = message.getMessageProperties();
 String contentType = messageProperties.getContentType();
 System.out.println(" [" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_A + "]ニュースがある:" + new String(message.getBody(), contentType));
 }
}
@Component
public class HeadersExchangeConsumerB {
 @RabbitListener(queuesToDeclare = @Queue(RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B))
 public void process(Message message) throws Exception {
 MessageProperties messageProperties = message.getMessageProperties();
 String contentType = messageProperties.getContentType();
 System.out.println(" [" + RabbitMQConfig.HEADERS_EXCHANGE_QUEUE_B + "]ニュースがある:" + new String(message.getBody(), contentType));
 }
}

プロジェクトを開始し、管理インターフェイスを開くと、スイッチのバインドキューの情報を見ることができます:

上の回路図と同じです。POSTMANを使用して、フルマッチのキューAをテストするために送信:

部分的にマッチしたキューBを再テスト:

概要

今回はここまで。学んだことを復習しましょう:

  • メッセージキューとは?なぜメッセージキューを使うのですか?
  • RabbitMQの機能、コンポーネント、ワークフロー
  • RabbitMQをインストールし、HelloWordのケーススタディを完了します。
  • 4種類のRabbitMQスイッチの特徴と使い方

実はRabbitMQにはトランザクションの仕組みやロードバランシングの仕組みもあるのですが、これについてはまだ触れていません。というわけで、次号に載せますのでお楽しみに。

上記のすべての例のコードはgithubにアップロードされています:

github.com/m...

この記事が役に立ったら、クリックしてください。

皆さんからのお褒めの言葉が、私にとって創作への最大のモチベーションです。

能力には限りがありますので、間違いや不適切な点がありましたら、ご批判・ご指摘いただき、共に学び、交流してください!

Read next

HTTPステータスコード

HTTP はシンプルなリクエスト-レスポンスプロトコルで、通常は TCP の上で動作します。 RFC では、HTTP のステータスコードは「3 桁の数字」であり、最初の桁はレスポンスのカテゴリを定義し、5 つのカテゴリに分類されると規定しています: 1XX: リクエストがサーバーに受け入れられ、サービス中であることを表します。

Nov 3, 2020 · 3 min read