SpringBoot+RabbitMQ (保證消息100%投遞成功并被消費)
閱讀本文大概需要 8.5?分鐘。
來自:網(wǎng)絡
一、先扔一張圖

消息發(fā)送確認機制
消費確認機制
消息的重新投遞
消費冪等性, 等等
簡略介紹163郵箱授權(quán)碼的獲取
編寫發(fā)送郵件工具類
編寫RabbitMQ配置文件
生產(chǎn)者發(fā)起調(diào)用
消費者發(fā)送郵件
定時任務定時拉取投遞失敗的消息, 重新投遞
各種異常情況的測試驗證
springboot版本2.1.5.RELEASE, 舊版本可能有些配置屬性不能使用, 需要以代碼形式進行配置
RabbitMQ版本3.7.15
MailUtil: 發(fā)送郵件工具類
RabbitConfig: rabbitmq相關配置
TestServiceImpl: 生產(chǎn)者, 發(fā)送消息
MailConsumer: 消費者, 消費消息, 發(fā)送郵件
ResendMsg: 定時任務, 重新投遞發(fā)送失敗的消息

org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-mail
# rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 開啟confirms回調(diào) P -> Exchange
spring.rabbitmq.publisher-confirms=true
# 開啟returnedMessage回調(diào) Exchange -> Queue
spring.rabbitmq.publisher-returns=true
# 設置手動確認(ack) Queue -> C
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=100
spring.mail.host=smtp.163.com
spring.mail.username=18621142249@163.com
spring.mail.password=123456wangzai
spring.mail.from=18621142249@163.com
spring.mail.properties.mail.smtp.auth=true
spring.mail.properties.mail.smtp.starttls.enable=true
spring.mail.properties.mail.smtp.starttls.required=true
CREATE TABLE `msg_log` (
?`msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一標識',
?`msg` text COMMENT '消息體, json格式化',
?`exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交換機',
?`routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由鍵',
?`status` int(11) NOT NULL DEFAULT '0' COMMENT '狀態(tài): 0投遞中 1投遞成功 2投遞失敗 3已消費',
?`try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重試次數(shù)',
?`next_try_time` datetime DEFAULT NULL COMMENT '下一次重試時間',
?`create_time` datetime DEFAULT NULL COMMENT '創(chuàng)建時間',
?`update_time` datetime DEFAULT NULL COMMENT '更新時間',
?PRIMARY KEY (`msg_id`),
?UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投遞日志';
@Component
@Slf4j
public class MailUtil {
? ?@Value("${spring.mail.from}")
? ?private String from;
? ?@Autowired
? ?private JavaMailSender mailSender;
? ?/**
? ? * 發(fā)送簡單郵件
? ? *
? ? * @param mail
? ? */
? ?public boolean send(Mail mail) {
? ? ? ?String to = mail.getTo();// 目標郵箱
? ? ? ?String title = mail.getTitle();// 郵件標題
? ? ? ?String content = mail.getContent();// 郵件正文
? ? ? ?SimpleMailMessage message = new SimpleMailMessage();
? ? ? ?message.setFrom(from);
? ? ? ?message.setTo(to);
? ? ? ?message.setSubject(title);
? ? ? ?message.setText(content);
? ? ? ?try {
? ? ? ? ? ?mailSender.send(message);
? ? ? ? ? ?log.info("郵件發(fā)送成功");
? ? ? ? ? ?return true;
? ? ? ?} catch (MailException e) {
? ? ? ? ? ?log.error("郵件發(fā)送失敗, to: {}, title: {}", to, title, e);
? ? ? ? ? ?return false;
? ? ? ?}
? ?}
}@Configuration
@Slf4j
public class RabbitConfig {
? ?@Autowired
? ?private CachingConnectionFactory connectionFactory;
? ?@Autowired
? ?private MsgLogService msgLogService;
? ?@Bean
? ?public RabbitTemplate rabbitTemplate() {
? ? ? ?RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
? ? ? ?rabbitTemplate.setMessageConverter(converter());
? ? ? ?// 消息是否成功發(fā)送到Exchange
? ? ? ?rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
? ? ? ? ? ?if (ack) {
? ? ? ? ? ? ? ?log.info("消息成功發(fā)送到Exchange");
? ? ? ? ? ? ? ?String msgId = correlationData.getId();
? ? ? ? ? ? ? ?msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_SUCCESS);
? ? ? ? ? ?} else {
? ? ? ? ? ? ? ?log.info("消息發(fā)送到Exchange失敗, {}, cause: {}", correlationData, cause);
? ? ? ? ? ?}
? ? ? ?});
? ? ? ?// 觸發(fā)setReturnCallback回調(diào)必須設置mandatory=true, 否則Exchange沒有找到Queue就會丟棄掉消息, 而不會觸發(fā)回調(diào)
? ? ? ?rabbitTemplate.setMandatory(true);
? ? ? ?// 消息是否從Exchange路由到Queue, 注意: 這是一個失敗回調(diào), 只有消息從Exchange路由到Queue失敗才會回調(diào)這個方法
? ? ? ?rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
? ? ? ? ? ?log.info("消息從Exchange路由到Queue失敗: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message);
? ? ? ?});
? ? ? ?return rabbitTemplate;
? ?}
? ?@Bean
? ?public Jackson2JsonMessageConverter converter() {
? ? ? ?return new Jackson2JsonMessageConverter();
? ?}
? ?// 發(fā)送郵件
? ?public static final String MAIL_QUEUE_NAME = "mail.queue";
? ?public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
? ?public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
? ?@Bean
? ?public Queue mailQueue() {
? ? ? ?return new Queue(MAIL_QUEUE_NAME, true);
? ?}
? ?@Bean
? ?public DirectExchange mailExchange() {
? ? ? ?return new DirectExchange(MAIL_EXCHANGE_NAME, true, false);
? ?}
? ?@Bean
? ?public Binding mailBinding() {
? ? ? ?return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME);
? ?}
}
@Service
public class TestServiceImpl implements TestService {
? ?@Autowired
? ?private MsgLogMapper msgLogMapper;
? ?@Autowired
? ?private RabbitTemplate rabbitTemplate;
? ?@Override
? ?public ServerResponse send(Mail mail) {
? ? ? ?String msgId = RandomUtil.UUID32();
? ? ? ?mail.setMsgId(msgId);
? ? ? ?MsgLog msgLog = new MsgLog(msgId, mail, RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME);
? ? ? ?msgLogMapper.insert(msgLog);// 消息入庫
? ? ? ?CorrelationData correlationData = new CorrelationData(msgId);
? ? ? ?rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail), correlationData);// 發(fā)送消息
? ? ? ?return ServerResponse.success(ResponseCode.MAIL_SEND_SUCCESS.getMsg());
? ?}
}@Component
@Slf4j
public class MailConsumer {
? ?@Autowired
? ?private MsgLogService msgLogService;
? ?@Autowired
? ?private MailUtil mailUtil;
? ?@RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME)
? ?public void consume(Message message, Channel channel) throws IOException {
? ? ? ?Mail mail = MessageHelper.msgToObj(message, Mail.class);
? ? ? ?log.info("收到消息: {}", mail.toString());
? ? ? ?String msgId = mail.getMsgId();
? ? ? ?MsgLog msgLog = msgLogService.selectByMsgId(msgId);
? ? ? ?if (null == msgLog || msgLog.getStatus().equals(Constant.MsgLogStatus.CONSUMED_SUCCESS)) {// 消費冪等性
? ? ? ? ? ?log.info("重復消費, msgId: {}", msgId);
? ? ? ? ? ?return;
? ? ? ?}
? ? ? ?MessageProperties properties = message.getMessageProperties();
? ? ? ?long tag = properties.getDeliveryTag();
? ? ? ?boolean success = mailUtil.send(mail);
? ? ? ?if (success) {
? ? ? ? ? ?msgLogService.updateStatus(msgId, Constant.MsgLogStatus.CONSUMED_SUCCESS);
? ? ? ? ? ?channel.basicAck(tag, false);// 消費確認
? ? ? ?} else {
? ? ? ? ? ?channel.basicNack(tag, false, true);
? ? ? ?}
? ?}
}9、ResendMsg定時任務重新投遞發(fā)送失敗的消息
@Component
@Slf4j
public class ResendMsg {
? ?@Autowired
? ?private MsgLogService msgLogService;
? ?@Autowired
? ?private RabbitTemplate rabbitTemplate;
? ?// 最大投遞次數(shù)
? ?private static final int MAX_TRY_COUNT = 3;
? ?/**
? ? * 每30s拉取投遞失敗的消息, 重新投遞
? ? */
? ?@Scheduled(cron = "0/30 * * * * ?")
? ?public void resend() {
? ? ? ?log.info("開始執(zhí)行定時任務(重新投遞消息)");
? ? ? ?ListmsgLogs = msgLogService.selectTimeoutMsg();
? ? ? ?msgLogs.forEach(msgLog -> {
? ? ? ? ? ?String msgId = msgLog.getMsgId();
? ? ? ? ? ?if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
? ? ? ? ? ? ? ?msgLogService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
? ? ? ? ? ? ? ?log.info("超過最大重試次數(shù), 消息投遞失敗, msgId: {}", msgId);
? ? ? ? ? ?} else {
? ? ? ? ? ? ? ?msgLogService.updateTryCount(msgId, msgLog.getNextTryTime());// 投遞次數(shù)+1
? ? ? ? ? ? ? ?CorrelationData correlationData = new CorrelationData(msgId);
? ? ? ? ? ? ? ?rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 重新投遞
? ? ? ? ? ? ? ?log.info("第 " + (msgLog.getTryCount() + 1) + " 次重新投遞消息");
? ? ? ? ? ?}
? ? ? ?});
? ? ? ?log.info("定時任務執(zhí)行結(jié)束(重新投遞消息)");
? ?}
} 











七、拓展: 使用動態(tài)代理實現(xiàn)消費端冪等性驗證和消費確認(ack)

八、總結(jié)
推薦閱讀:
微信掃描二維碼,關注我的公眾號
朕已閱?
評論
圖片
表情

