From 04adb88a2ed54cdf4c2958c79972c30109b9b5b6 Mon Sep 17 00:00:00 2001 From: wang <3597712270@qq.com> Date: 星期四, 28 三月 2024 16:54:56 +0800 Subject: [PATCH] 内容调整 --- UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java | 61 ++++++++++++++++++++---------- 1 files changed, 40 insertions(+), 21 deletions(-) diff --git a/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java b/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java index 1da8717..f381270 100644 --- a/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java +++ b/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java @@ -106,6 +106,25 @@ } +//鎺ユ敹闃熷垪涓墍鏈夋秷鎭紝涓嶆秷璐� + public static List<String> browseMessages(String queueName) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + List<String> messages = new ArrayList<>(); + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + channel.queueDeclare(queueName, false, false, false, args); + // 鑾峰彇闃熷垪涓殑娑堟伅 + GetResponse response; + while ((response = channel.basicGet(queueName, false)) != null) { + String message = new String(response.getBody(), "UTF-8"); + messages.add(message); + } + } + return messages; + } + + private static Set<String> sentMessageIds = new HashSet<>(); //鏍规嵁id鍙戦�佹秷鎭� @@ -137,32 +156,32 @@ } } //鏍规嵁id娑堣垂娑堟伅 - public static String consumeMessageById( String messageId,String queueName) throws Exception { - ConnectionFactory factory = new ConnectionFactory(); - factory.setHost(host); +public static String consumeMessageById(String messageId, String queueName) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); - try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { - channel.queueDeclare(queueName, false, false, false, args); + try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { + channel.queueDeclare(queueName, false, false, false, args); - GetResponse response; - while ((response = channel.basicGet(queueName, false)) != null) { - String receivedMessage = new String(response.getBody(), "UTF-8"); - if (response.getProps().getMessageId().equals(messageId)) { - long deliveryTag = response.getEnvelope().getDeliveryTag(); - channel.basicAck(deliveryTag, false); - System.out.println("Selected message: "+messageId+ receivedMessage); - return receivedMessage; // 杩斿洖閫夊畾鐨勬秷鎭唴瀹� - } else { - // 瀵逛簬涓嶇鍚堟潯浠剁殑娑堟伅锛岃繘琛� Nack 鎿嶄綔 -// long deliveryTag = response.getEnvelope().getDeliveryTag(); -// channel.basicNack(deliveryTag, false, true); - return "Specified message not found in the queue."; - } + GetResponse response; + boolean found = false; + + while ((response = channel.basicGet(queueName, false)) != null) { + String receivedMessage = new String(response.getBody(), "UTF-8"); + if (response.getProps().getMessageId().equals(messageId)) { + long deliveryTag = response.getEnvelope().getDeliveryTag(); + channel.basicAck(deliveryTag, false); + System.out.println("Selected message: " + messageId + " " + receivedMessage); + return receivedMessage; + } else { + // 鏈壘鍒版寚瀹氭秷鎭紝缁х画鏌ユ壘 + channel.basicReject(response.getEnvelope().getDeliveryTag(), false); } - - return "Specified message not found in the queue."; } + + return "Specified message not found in the queue."; } +} //娑堣垂鎸囧畾娑堟伅 public static String consumeSelectedMessage(int messageToConsume, String queueName) throws Exception { -- Gitblit v1.8.0