コンシューマされたメッセージは、Producer->MQ->Consumer を経て正常にコンシューマされるため、3つのステップすべてでメッセージが失われる可能性があります。
メッセージ・プロデューサが MQ にメッセージを正常に送信していません。
業務の遂行
AMQP プロトコルは、メッセージ配信時にトランザクション・サポートをオンにし、メッセージ配信に失敗した場合にトランザクションをロールバックするトランザクション・メカニズムを提供します。
カスタム・トランザクション・マネージャー
@Configuration
public class RabbitTranscation {
@Bean
public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){
return new RabbitTransactionManager(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
return new RabbitTemplate(connectionFactory);
}
}
ymlの修正
spring:
rabbitmq:
# メッセージがキューに受信されずに返される
publisher-returns: true
トランザクション・サポートの有効化
rabbitTemplate.setChannelTransacted(true);
メッセージ未受信時の ReturnCallback の呼び出し
rabbitTemplate.setMandatory(true);
プロデューサーの配信メッセージ
@Service
public class ProviderTranscation implements RabbitTemplate.ReturnCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
// トランザクションを有効にするチャネルを設定する
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("このメッセージは送信に失敗した"+message+", ");
}
@Transactional(rollbackFor = Exception.class,transactionManager = "rabbitTransactionManager")
public void publishMessage(String message) throws Exception {
rabbitTemplate.setMandatory(true);
rabbitTemplate.convertAndSend("javatrip",message);
}
}
しかし、これは同期的な操作であるため、メッセージを送信すると、送信者は次のメッセージに進む前にRabbitMQ-Serverからの応答を待つ必要があり、メッセージを生成するプロデューサーのスループットとパフォーマンスが大幅に低下するため、ほとんど行われません。
送信者確認メカニズム
メッセージ送信時にチャネルは確認モードに設定されます。 メッセージがチャネルに入ると一意の ID が割り当てられ、メッセージがマッチしたキューに配信されると RabbitMQ はプロデューサーに確認応答を送信します。
メッセージ確認メカニズムの有効化
spring:
rabbitmq:
# メッセージがキューに受信されずに返される
publisher-returns: true
# メッセージ確認応答メカニズムを有効にする
publisher-confirm-type: correlated
メッセージ未受信時の ReturnCallback の呼び出し
rabbitTemplate.setMandatory(true);
プロデューサーの配信メッセージ
@Service
public class ConfirmProvider implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
@Autowired
RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("メッセージを確認した"+correlationData);
}else{
System.out.println("確認応答が失敗する:"+correlationData+"コンシューマがメッセージを正常に消費した後、手動で確認する:"+cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("このメッセージは送信に失敗した"+message+", ");
}
public void publisMessage(String message){
rabbitTemplate.setMandatory(true);
rabbitTemplate.convertAndSend("javatrip",message);
}
}
メッセージのリトライ・メカニズムであるメッセージ補償は、メッセージの確認に失敗した場合に実行できます。メッセージの再配信は、確認が取れなかった場合に行われます。これは以下のコンフィギュレーションを設定することで可能です。
spring:
rabbitmq:
# メッセージ失敗後のキュー再参加をサポートする
publisher-returns: true
# メッセージ確認応答メカニズムを有効にする
publisher-confirm-type: correlated
listener:
simple:
retry:
# リトライを有効にする
enabled: true
# 最大再試行回数
max-attempts: 5
# リトライ間隔
initial-interval: 3000
II メッセージがMQに送信された後、MQがダウンし、メッセージがメモリから失われます。
MQではメッセージが失われる可能性があり、キューとメッセージの両方を永続化する必要があります。
Queue アノテーションは、キューに関連する以下のプロパティを提供します:
- name: キューの名前
- durable: 永続的かどうか
- exclusive: 排他的、排他的かどうか;
- autoDelete: 自動削除するかどうか;
- arguments: キューのその他の属性パラメータで、以下のオプションがあります:
- x-message-ttl: メッセージの有効期限;
- x-expires: キューの有効期限時間、キューがアクセスされない場合に削除される時間、ミリ秒単位;
- x-max-length: キューの最大長。これを超えた場合、メッセージはキューの先頭から 削除されます;
- x-max-length-bytes: メモリサイズによって制限される、キューのメッセージ内容によって占められる最大スペース;
- x-overflow: キューのオーバフローの挙動を設定します。これは、キューの最大長に達した場合にメッセージがどうなるかを決定します。有効な値は drop-head、reject-publish、reject-publish-dlx のいずれかです。調停キュータイプでは drop-head のみがサポートされています;
- x-dead-letter-exchange: 期限切れまたは削除されたメッセージが送信されるように指定できる dead-letter-exchange の名前;
- x-dead-letter-routing-key: デッド・レター・メッセージのルーティング・キー。メッセージがデッド・レター交換に送られるときに使われます。
- x-single-active-consumer: キューが単一のアクティブなコンシューマであるかどうか。true の場合、登録されたコンシューマグループの中の一つのコンシューマだけがメッセージを消費し、他のコンシューマは無視されます。
- x-max-priority: キューがサポートする優先度の最大数。設定されていない場合、キューはメッセージの優先度をサポートしません;
- x-queue-mode: RAM の使用量を減らすために、可能な限り多くのメッセージを ディスク上に保持するために、キューを遅延モードに設定します;
- x-queue-master-locator: クラスタモードにおける Image キューのマスタノード情報を設定します。
永続キュー
キューを作成する際には、永続属性 durable を true に設定し、autoDelete を false に設定してください。
@Queue(value = "javatrip",durable = "false",autoDelete = "false")
永続的メッセージ
SpringBootのデフォルトでは永続的です。
III コンシューマがメッセージの消費を完了する前に例外を発生させてメッセージを消費
コンシューマはメッセージを消費しただけで、まだビジネスを処理していないため、例外が発生します。このような場合は、自動確認をオフにして、代わりに手動でメッセージを確認する必要があります。
ymlを手動サインオンモードに変更
spring:
rabbitmq:
listener:
simple:
# 手動サインオフ・モード
acknowledge-mode: manual
# 一度に1つのメッセージに署名する
prefetch: 1
消費者マニュアルへの署名
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
public class Consumer {
@RabbitHandler
public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{
System.out.println(message);
// 一意のメッセージID
Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
// メッセージを確認する
if(...){
channel.basicAck(deliverTag,false);
}else{
// 消費に失敗した場合、メッセージはキューに戻される
channel.basicNack(deliverTag,false,true);
}
}
}
概要
メッセージ紛失の理由
プロデューサー、MQ、およびコンシューマーはすべて、メッセージの損失を引き起こす可能性があります。
メッセージの信頼性を確保するには?
- 送信者は送信者確認モードを採用
- キューとメッセージの永続化のためのMQ
- 消費者購入成功後の手動確認メッセージ





