【技术分享】四、RabbitMQ “延时队列”

前言

延时的含义为 等待一段时间,应用到RabbitMQ 消息 发布/订阅 模型中的概念就是,拿到消息后不想立即消费,等待一段时间再执行。
ex:

  1. 定时任务:十分钟后执行某种操作。
  2. 批量发送短信:用户量过大,一次性发送短信卡死,可以将几万条消息分布在10分钟内随机发送完成。

一、RabbitMQ “延时队列”概念

延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

RabbitMQ 中并没有延时队列的概念,是通过 延时交换机与 死信队列实现。

二、实现RabbitMQ “延时队列”两种方式

1. 利用两个特性:TTL + DLX [A队列过期->转发给B队列] (此种方式有缺陷)

TTL,全称Time To Live,消息过期时间设置。若没有队列进行消息消费,此消息过期后将被丢弃。
RabbitMq只会检查第一个消息是否过期,如果过期则丢到死信队列。
ex:若有两条消息,第一个消息延迟20秒执行,第二个消息延迟10秒执行,但RabbitMq只会检测队首第一条消息的过期时间。这样就会造成第二条消息延迟30秒执行。

DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

2. 采用 RabbitMq延时插件rabbitmq_delayed_message_exchange的方式。

为了解决 “队列阻塞”问题,新的插件发布了,再消息粒度上实现 消息过期控制。

三、RabbitMQ “延时队列”项目应用

1. 引入pom文件,并配置yml

<dependencies>
        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!-- web相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--  json相关依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>

2. 下载插件

插件下载官方链接: rabbitmq_delayed_message_exchange
在这里插入图片描述文章来源地址https://uudwc.com/A/DN3md

安装指南(以linux环境为例)
  1. 将rabbitmq_delayed_message_exchange-3.9.0.ez上传指定目录下使用unzip解压即可
    安装目录: /rabbitmq/plugins
    在这里插入图片描述
  2. 完成第一步解压后,执行以下图中安装操作
    开启插件:
	export PATH=$PATH:/opt/middle/rabbitmq/erlang/bin
	cd /opt/middle/rabbitmq/sbin
	./rabbitmq-pluginsenablerabbitmq_delayed_message_exchange
  1. 查询安装状态
    ./rabbitmq-plugins list
	./rabbitmq-plugins list

3. 配置资源信息


import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import java.util.HashMap;
import java.util.Map;

/**
 * @author liuzz
 * @date 2023/5/18 0018下午 4:09
 */
@Configuration("relevancyRabbitMqConfig")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class RelevancyRabbitMqConfig {

    private final CachingConnectionFactory factory;

    /**
     * rabbitmq 下发计划延时队列
     **/
    public static final String RELEVANCY_DELAYED_EXCHANGE = "saas.cbs.relevancy.delayed.exchange";

    /**
     * rabbitmq 下发延时队列订阅路由key
     **/
    public static final String RELEVANCY_DELAYED_ROUTINGKEY = "saas.cbs.relevancy.delayed.routingkey";

    /**
     * rabbitmq 下发延时队列
     **/
    public static final String RELEVANCY_DELAYED_QUEUE = "saas.cbs.relevancy.delayed.queue";

	
    @Bean("relevancyRabbitTemplate")
    @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(){
        factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
        //开启发送失败退回
        rabbitTemplate.setMandatory(true);
        //消息转换器
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    //插件版本 -- 实现延迟队列
    @Bean("relevancyDelayedQueue")
    public Queue relevancyDelayedQueue() {
        return new Queue(RelevancyDelayedConstant.RELEVANCY_DELAYED_QUEUE);
    }

    //定义延时交换机 -- 插件版本
    //指定交换器类型为 x-delayed-message 
    //设置属性 x-delayed-type 
    @Bean("relevancyDelayedExchange")
    public CustomExchange relevancyDelayedExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
    }
	
	/**
	* 绑定延时队列与交换机信息
	*/
    @Bean
    public Binding bindingNotify(@Qualifier("relevancyDelayedQueue") Queue relevancyDelayedQueue,
                                 @Qualifier("relevancyDelayedExchange") CustomExchange relevancyDelayedExchange) {
        return BindingBuilder
                .bind(relevancyDelayedQueue)
                .to(relevancyDelayedExchange)
                .with(RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY).noargs();
    }
}


4. 发送消息至交换机

@Slf4j
@Component
public class RelevancyExecuteMqConsume {

    @Autowired
    @Qualifier("relevancyRabbitTemplate")
    RabbitTemplate rabbitTemplate;

    /**
     * @Desc: 发送下发计划过期MQ
     * @param relevancyFrsMqSendMsgBo
     * @param finalExpirationTime
     **/
    public void sendSnapshotPlanMsg(RelevancyFrsMqSendMsgBo relevancyFrsMqSendMsgBo, Integer finalExpirationTime) {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //1.设置message的信息
                message.getMessageProperties().setDelay(finalExpirationTime);//消息的过期时间
                //2.返回该消息
                return message;
            }
        };
        rabbitTemplate.convertAndSend(RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY,relevancyFrsMqSendMsgBo,messagePostProcessor);
    }
}

5. 死信队列消费

@Slf4j
@Component
public class RelevancyExecuteMqConsume {

    @Autowired
    @Qualifier("relevancyRabbitTemplate")
    RabbitTemplate rabbitTemplate;

	
    @RabbitListener(bindings = {
            @QueueBinding(value =
            @Queue(RelevancyDelayedConstant.RELEVANCY_DELAYED_QUEUE),
                    exchange = @Exchange(name = RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,type ="x-delayed-message" ),
                    key= RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY)
    })
    public void process(Message message, Channel channel) {
    	//消费数据
	}
}

原文地址:https://blog.csdn.net/qq_35511685/article/details/131522162

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请联系站长进行投诉反馈,一经查实,立即删除!

上一篇 2023年09月23日 21:57
【深度学习&图神经网络】Node2Vec +GAT 完成 节点分类任务(含代码) | 附:其它生成节点特征向量的算法:DeepWalk、LINE(具体实现细节)、SDNE、MMDW
下一篇 2023年09月23日 21:58