rabbitmq动态创建队列和消费

try (Connection connection = masterRabbitTemplate.getConnectionFactory().createConnection();
     Channel channel = connection.createChannel(true)) {
    // 死信: 队列 交换机 绑定consumer 绑定队列
    AMQP.Queue.DeclareOk deadQueueDeclareOk = channel.queueDeclare(deadQueueName,
            true, false, false, null);
    int consumerCount = deadQueueDeclareOk.getConsumerCount();
    channel.exchangeDeclare("deadExchange", BuiltinExchangeType.DIRECT);
    if (consumerCount == 0) {
        channel.basicConsume(deadQueueName, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                String routingKey = envelope.getRoutingKey();
                String slaveRoutingKey = routingKey.substring(0, routingKey.length() - "_dead".length());
                try (Connection slaveConnection = slaveConnectionFactory.createConnection();
                     Channel slaveChannel = slaveConnection.createChannel(true)){
                    slaveChannel.queueDeclare(slaveRoutingKey, true, false, false, null);
                    slaveChannel.exchangeDeclare("exchange", BuiltinExchangeType.DIRECT);
                    slaveChannel.queueBind(slaveRoutingKey, "exchange", slaveRoutingKey);
                    slaveChannel.basicPublish("exchange", slaveRoutingKey, properties, body);
                    slaveChannel.txCommit();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                    channel.basicNack(envelope.getDeliveryTag(), false, true);
                    channel.txCommit();
                    return;
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
                channel.txCommit();
            }
        });
    }
    channel.queueBind(deadQueueName, "deadExchange", deadQueueName);
    // 延迟: 队列 交换机 绑定队列
    Map<String, Object> args = new HashMap<>();
    //设置延迟队列绑定的死信交换机
    args.put("x-dead-letter-exchange", "deadExchange");
    //设置延迟队列绑定的死信路由键
    args.put("x-dead-letter-routing-key", deadQueueName);
    //设置延迟队列的 TTL 消息存活时间
    args.put("x-message-ttl", 10 * 1000);
    channel.queueDeclare(delayQueueName, true, false, false, args);
    channel.exchangeDeclare("delayExchange", BuiltinExchangeType.DIRECT);
    channel.queueBind(delayQueueName, "delayExchange", delayQueueName);
    Phone phone = new Phone();
    phone.setId("one-plus");
    phone.setName("st");
    phone.setUserList(new ArrayList<User>(){{
        add(new User("steven", new Date()));
        add(new User("russell", new Date()));
    }});
    channel.basicPublish("delayExchange", delayQueueName, null,
            JSONObject.toJSONString(phone).getBytes(StandardCharsets.UTF_8));
    channel.txCommit();
}

String queueName = "testQueueName";
try (Connection slaveConnection = slaveRabbitTemplate.getConnectionFactory().createConnection();
     Channel slaveChannel = slaveConnection.createChannel(true)) {
    AMQP.Queue.DeclareOk queueDeclareOk = slaveChannel.queueDeclare(queueName,
            true, false, false, null);
    int consumerCount = queueDeclareOk.getConsumerCount();
    slaveChannel.exchangeDeclare("exchange", BuiltinExchangeType.DIRECT);
    // 绑定consumer 绑定队列
    if (consumerCount == 0) {
        slaveChannel.basicConsume(queueName, false, new DefaultConsumer(slaveChannel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {

                Phone phone = JSONObject.parseObject(new String(body), Phone.class);
                System.out.println("slaveMqConsume ==> " + phone);

                slaveChannel.basicAck(envelope.getDeliveryTag(), false);
                slaveChannel.txCommit();
            }
        });
    }
    slaveChannel.queueBind(queueName, "exchange", queueName);
}

文章来源地址https://uudwc.com/A/wo356

原文地址:https://blog.csdn.net/weixin_43317111/article/details/133106940

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请联系站长进行投诉反馈,一经查实,立即删除!

上一篇 2023年09月24日 11:16
Java核心知识点整理大全5-笔记
下一篇 2023年09月24日 11:16