RabbitMQ消息可靠性(二)-- 消费者消息确认

2023-09-15 17:13:13

一、消费者消息确认是什么?

在这种机制下,消费者在接收到消息后,需要向 RabbitMQ 发送确认信息,告知 RabbitMQ 已经接收到该消息,并已经处理完毕。如果 RabbitMQ 没有接收到确认信息,则会将该消息重新加入队列,等待其他消费者继续处理。

消费者消息确认机制能够保证消息不会因为消费者宕机或其他原因而丢失,从而保证了消息的可靠性和稳定性。

RabbitMQ 支持两种消费者消息确认机制:自动确认和手动确认。在自动确认模式下,消费者在接收到消息后,RabbitMQ 会自动将该消息标记为已经确认。在手动确认模式下,消费者需要向 RabbitMQ 显式地发送确认信息,才能完成消息的确认。

二、代码实现

1.修改application.yml 配置

spring:
  rabbitmq:
    listener:
      simple:
        # RabbitMQ开启手动确认
        acknowledge-mode: manual

而SpringAMQP则允许配置三种确认模式:

  1. manual:手动ack,需要在业务代码结束后,调用api发送ack。
  2. auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  3. none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

2.消费者确认

生产者发送一笔需要消费的订单到Direct Exchange直连交换机

@GetMapping("/sendDirectMessage")
    @ApiOperation(value = "sendDirectMessage")
    @ApiOperationSupport(order = 1)
    public String sendDirectMessage(@RequestParam String orderNo){
        //设置消息唯一ID
        String uniqueId = "MQ"+ DateUtils.dateTimeNow("yyyyMMddHHmmss")+ RandomUtil.randomNumbers(4);
        CorrelationData correlationData = new CorrelationData(uniqueId);
        log.info("------生产者发送消息,消息唯一id {},订单编号 {}-------",uniqueId,orderNo);
        rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting",orderNo,correlationData);
        return "ok";
    }

下面是消费者的处理逻辑
这里的消息序号是系统自动生成的,还需要注意的是,在手动确认模式下,如果消费者在处理消息时发生了异常或错误的时候

需要确保将该消息重新加入队列或者删除队列之后将该信息保存至数据库中记录下来,否则该消息将被认为已经成功处理并确认。因此,在编写消费者代码时,需要谨慎处理异常情况,避免因为异常而导致消息丢失或重复处理等问题。

/**
 * 消费者,用于消费队列信息
 */
@Component
@Slf4j
public class DirectConsumer {

    @Resource
    RedisService redisService;

    @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
    public void process(Message message, Channel channel) {
        // 消息序号
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //取出消息唯一标识
        String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
        // 取出订单编码
        String orderNo = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("------消费者收到消息,消息唯一id {},订单编号 {}-------",messageId,orderNo);
        try {
            //消费者在消费消息之前,先去redis中查看消息状态是否已被消费
            if (redisService.setCacheMapIfAbsent("rabbit-tag", messageId, Boolean.FALSE)){
                //删除过期订单.......
                //消费完消息后,设置key的值为true
                redisService.setCacheMapValue("rabbit-tag", messageId, Boolean.TRUE);
                channel.basicAck(deliveryTag,false);
                log.info("------订单处理完毕,订单编号 {}--------", orderNo);
            }else {
                //如果从redis中获取消息的value是TRUE,表示已消费,直接发送确认信号,避免重复消费
                if (Boolean.TRUE.equals(redisService.getCacheMapValue("rabbit-tag",messageId))) {
                    /**
                     * TODO 手动确认消息
                     * tag:消息序号
                     * multiple:消息的标识,是否确认多条,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除
                     */
                    channel.basicAck(deliveryTag, false);
                    log.info("--------订单已经被消费过了,订单编号 {}-------", orderNo);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            try {
                /**
                 * TODO 消费者消费消息异常,手动否认信息,将消息退回到队列中
                 * tag:消息序号
                 * multiple:消息的标识,是否确认多条,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除
                 * requeue:是否要退回到队列
                 */
                channel.basicNack(deliveryTag, true, false);
                redisService.setCacheMapValue("rabbit-tag", messageId, Boolean.FALSE);
                log.error("------------订单消费失败,已从队列删除.订单编号 {}, 原因 {}--------",orderNo, e.getMessage());
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
        log.info("------消费者处理完毕-------");
    }
}
更多推荐

K8s的网络——Underlay和Overlay网络

0.基础知识1)网络7层基础知识在网络7层协议基础里,第一层物理链路;第二层是数据链路层,在第一层的基础上引入MAC地址做数据转发。MAC地址在局域网内具有唯一性,主机A发送数据时,会向局域网内进行广播,每个主机根据mac地址自动匹配。网桥、交换机都是工作在数据链路层。由MAC地址构建的网络可以叫以太网。在网络中接入设

K8S入门前奏之VMware虚拟机网络配置

为了能在本地搭建K8S的运行服务器,在个人电脑上安装了虚拟机VMware16版本,并且在阿里巴巴开源镜像站下载了CentOS-7操作系统:阿里巴巴开源镜像站做完一些列准备工作后,在虚拟机安装完CentOS-7操作系统后,需要对VMware虚拟机网络进行配置,让其满足我们搭建K8S的要求。查看物理机ip信息命令:ipco

HTML中的<canvas>元素

聚沙成塔·每天进步一点点⭐专栏简介⭐canvas元素⭐用途⭐示例⭐写在最后⭐专栏简介前端入门之旅:探索Web开发的奇妙世界欢迎来到前端入门之旅!感兴趣的可以订阅本专栏哦!这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发者,这里都将为你提供一个系统而又亲切的学

电脑怎么取消磁盘分区?

有时候,我们的电脑会出现一个磁盘爆满,但另一个却空着,这时我们可以通过取消磁盘分区来进行调整,那么,这该怎么操作呢?下面我们就来了解一下。磁盘管理取消磁盘分区磁盘管理是Windows自带的磁盘管理工具,它位于"计算机管理"控制台中。它包括查错程序和磁盘碎片整理程序以及磁盘整理程序。它可以帮助你完成扩大分区、压缩分区、删

Redis核心原理与应用实操

一、基本概念1、noSQL关系数据库MySQL的IO操作慢!noSQL为内存操作快、高并发。2、Redis基本概念存储形式:K-V键值对优点:对数据高并发读写(直接在内存中操作)单线程操作(所谓的多线程只是多个命令队伍排队CPU处理时仍然是单线程)Redis-----提供缓存服务!!!!Redis定位是缓存,提高数据读

使用JavaScript实现图片的自动轮播

介绍在网站开发中,经常会遇到需要展示多张图片并自动切换的需求,这就需要使用JavaScript来实现图片的自动轮播功能。本文将通过一个简单的例子,演示如何用JavaScript实现图片的自动轮播。实现步骤:HTML结构:首先,创建一个包含图片列表的HTML结构,每个图片都用一个标签表示,并设置一个唯一的id,以便后续操

云原生:构建现代化应用的新篇章

云原生:构建现代化应用的新篇章随着技术的快速发展,我们进入了一个全新的数字化时代。在这个时代,云原生作为一种新兴的技术趋势,正逐渐改变着我们对构建现代化应用的认识。云原生是容器、微服务、DevOps、持续交付等技术的集合,它提供了一种在云环境中构建和运行应用的新方式。本文将深入探讨云原生的技术原理、核心优势以及应用场景

Docker从认识到实践再到底层原理(六-2)|Docker容器操作实例

前言那么这里博主先安利一些干货满满的专栏了!首先是博主的高质量博客的汇总,这个专栏里面的博客,都是博主最最用心写的一部分,干货满满,希望对大家有帮助。高质量博客汇总然后就是博主最近最花时间的一个专栏《Docker从认识到实践再到底层原理》希望大家多多关注!Docker从认识到实践再到底层原理容器的操作案例容器的基本操作

家政小程序开发制作,家政保洁上门维修小程序搭建

家政小程序开发制作,现如今家政上门服务,也越来越普及到我们的生活中,比如家电清洗,水电维修,家政保洁,上门护理等等方面。那么一个合格的家政小程序,需要满足哪些功能呢?今天就带大家一起详细了解一下。第一:邀请师傅入驻小程序:-创建一个师傅入驻的功能,允许师傅填写相关信息并提交申请。﹣设计审核流程,对师傅的资质和信誉进行验

家政小程序源码家政预约小程序独立版,家政上门预约,功能强大

家政服务行业作为一个相对传统的行业,随着互联网的发展迅速,和用户群体的改变,家政服务公司也需要改变一下经营思路了,否则未来很难满足新一代用户群体的个性化需求。核心功能:1、师傅(服务人员)入驻:家政保洁人员可以随时随地在微信小程序上面申请入驻,上传自己的资料,给平台审核。2、顾客下单预约:小区业主可以随时随地在微信小程

Elasticsearch 8.10 中引入查询规则 - query rules

作者:KathleenDeRusso我们很高兴宣布Elasticsearch8.10中的查询规则!查询规则(queryrules)允许你根据正在搜索的查询词或根据作为搜索查询的一部分提供的上下文信息来更改查询。什么是查询规则?查询规则(queryrules)允许自定义搜索相关性之外的搜索结果,这可以根据您提供的上下文信

热文推荐