From e208aa1ef8e861dba168e8d83ab3066fc9f1e02d Mon Sep 17 00:00:00 2001 From: 严智鑫 <test> Date: 星期二, 16 四月 2024 13:30:45 +0800 Subject: [PATCH] Merge branch 'master' of http://10.153.19.25:10101/r/HangZhouMes --- UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 110 insertions(+), 5 deletions(-) diff --git a/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java b/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java index 7c563e8..f381270 100644 --- a/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java +++ b/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java @@ -3,10 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.*; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -83,7 +80,7 @@ } - +//娑堣垂娑堟伅 public List<String> readMessages(String queueName,boolean is) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); @@ -108,6 +105,114 @@ return messages; } + +//鎺ユ敹闃熷垪涓墍鏈夋秷鎭紝涓嶆秷璐� + public static List<String> browseMessages(String queueName) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + List<String> messages = new ArrayList<>(); + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + channel.queueDeclare(queueName, false, false, false, args); + // 鑾峰彇闃熷垪涓殑娑堟伅 + GetResponse response; + while ((response = channel.basicGet(queueName, false)) != null) { + String message = new String(response.getBody(), "UTF-8"); + messages.add(message); + } + } + return messages; + } + + + + private static Set<String> sentMessageIds = new HashSet<>(); +//鏍规嵁id鍙戦�佹秷鎭� + public static boolean sendMessageWithId(String queueName, String message, String messageId) throws Exception { + if (sentMessageIds.contains(messageId)) { + System.err.println("Message with ID " + messageId + " has already been sent."); + return false; // 娑堟伅閲嶅锛屽彂閫佸け璐� + } + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + + try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { + channel.queueDeclare(queueName, false, false, false, args); + + AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() + .messageId(messageId) + .build(); + + channel.basicPublish("", queueName, properties, message.getBytes("UTF-8")); + System.out.println("Sent message with ID: " + messageId); + + sentMessageIds.add(messageId); // 灏� messageId 娣诲姞鍒板凡鍙戦�侀泦鍚堜腑 + + return true; // 娑堟伅鎴愬姛鍙戦�� + } catch (Exception e) { + System.err.println("Failed to send message: " + e.getMessage()); + return false; // 娑堟伅鍙戦�佸け璐� + } + } +//鏍规嵁id娑堣垂娑堟伅 +public static String consumeMessageById(String messageId, String queueName) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + + try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { + channel.queueDeclare(queueName, false, false, false, args); + + GetResponse response; + boolean found = false; + + while ((response = channel.basicGet(queueName, false)) != null) { + String receivedMessage = new String(response.getBody(), "UTF-8"); + if (response.getProps().getMessageId().equals(messageId)) { + long deliveryTag = response.getEnvelope().getDeliveryTag(); + channel.basicAck(deliveryTag, false); + System.out.println("Selected message: " + messageId + " " + receivedMessage); + return receivedMessage; + } else { + // 鏈壘鍒版寚瀹氭秷鎭紝缁х画鏌ユ壘 + channel.basicReject(response.getEnvelope().getDeliveryTag(), false); + } + } + + return "Specified message not found in the queue."; + } +} + +//娑堣垂鎸囧畾娑堟伅 + public static String consumeSelectedMessage(int messageToConsume, String queueName) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + + try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { + channel.queueDeclare(queueName, false, false, false, args); + + for (int i = 0; i < messageToConsume; i++) { + GetResponse response = channel.basicGet(queueName, false); + if (response == null) { + return "Queue does not have enough messages."; + } +// long deliveryTag = response.getEnvelope().getDeliveryTag(); +// channel.basicAck(deliveryTag, false); + } + + GetResponse selectedResponse = channel.basicGet(queueName, false); + if (selectedResponse != null) { + byte[] body = selectedResponse.getBody(); + String selectedMessage = new String(body, "UTF-8"); + long deliveryTag = selectedResponse.getEnvelope().getDeliveryTag(); + channel.basicAck(deliveryTag, false); + return selectedMessage; + } else { + return "Specified message not found in the queue."; + } + } + } + // 鍙戦�� JSON 娑堟伅鍒伴槦鍒� public static boolean sendJsonMessage(Object message, String queueName) throws Exception { // 鍒涘缓杩炴帴宸ュ巶骞惰缃富鏈哄悕 -- Gitblit v1.8.0