try (Connection connection = masterRabbitTemplate.getConnectionFactory().createConnection(); Channel channel = connection.createChannel(true)) { // 死信: 队列 交换机 绑定consumer 绑定队列 AMQP.Queue.DeclareOk deadQueueDeclareOk = channel.queueDeclare(deadQueueName, true, false, false, null); int consumerCount = deadQueueDeclareOk.getConsumerCount(); channel.exchangeDeclare("deadExchange", BuiltinExchangeType.DIRECT); if (consumerCount == 0) { channel.basicConsume(deadQueueName, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String slaveRoutingKey = routingKey.substring(0, routingKey.length() - "_dead".length()); try (Connection slaveConnection = slaveConnectionFactory.createConnection(); Channel slaveChannel = slaveConnection.createChannel(true)){ slaveChannel.queueDeclare(slaveRoutingKey, true, false, false, null); slaveChannel.exchangeDeclare("exchange", BuiltinExchangeType.DIRECT); slaveChannel.queueBind(slaveRoutingKey, "exchange", slaveRoutingKey); slaveChannel.basicPublish("exchange", slaveRoutingKey, properties, body); slaveChannel.txCommit(); } catch (TimeoutException e) { e.printStackTrace(); channel.basicNack(envelope.getDeliveryTag(), false, true); channel.txCommit(); return; } channel.basicAck(envelope.getDeliveryTag(), false); channel.txCommit(); } }); } channel.queueBind(deadQueueName, "deadExchange", deadQueueName); // 延迟: 队列 交换机 绑定队列 Map<String, Object> args = new HashMap<>(); //设置延迟队列绑定的死信交换机 args.put("x-dead-letter-exchange", "deadExchange"); //设置延迟队列绑定的死信路由键 args.put("x-dead-letter-routing-key", deadQueueName); //设置延迟队列的 TTL 消息存活时间 args.put("x-message-ttl", 10 * 1000); channel.queueDeclare(delayQueueName, true, false, false, args); channel.exchangeDeclare("delayExchange", BuiltinExchangeType.DIRECT); channel.queueBind(delayQueueName, "delayExchange", delayQueueName); Phone phone = new Phone(); phone.setId("one-plus"); phone.setName("st"); phone.setUserList(new ArrayList<User>(){{ add(new User("steven", new Date())); add(new User("russell", new Date())); }}); channel.basicPublish("delayExchange", delayQueueName, null, JSONObject.toJSONString(phone).getBytes(StandardCharsets.UTF_8)); channel.txCommit(); }
文章来源:https://uudwc.com/A/wo356
String queueName = "testQueueName"; try (Connection slaveConnection = slaveRabbitTemplate.getConnectionFactory().createConnection(); Channel slaveChannel = slaveConnection.createChannel(true)) { AMQP.Queue.DeclareOk queueDeclareOk = slaveChannel.queueDeclare(queueName, true, false, false, null); int consumerCount = queueDeclareOk.getConsumerCount(); slaveChannel.exchangeDeclare("exchange", BuiltinExchangeType.DIRECT); // 绑定consumer 绑定队列 if (consumerCount == 0) { slaveChannel.basicConsume(queueName, false, new DefaultConsumer(slaveChannel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Phone phone = JSONObject.parseObject(new String(body), Phone.class); System.out.println("slaveMqConsume ==> " + phone); slaveChannel.basicAck(envelope.getDeliveryTag(), false); slaveChannel.txCommit(); } }); } slaveChannel.queueBind(queueName, "exchange", queueName); }
文章来源地址https://uudwc.com/A/wo356