From 1fdd85d07cf1ed861e8efaf685674049d6c1284e Mon Sep 17 00:00:00 2001 From: wuyouming666 <2265557248@qq.com> Date: 星期三, 28 二月 2024 15:53:56 +0800 Subject: [PATCH] 增加mq 根据id 、队列名、消息内容 发送和消费消息方法, --- UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java | 96 +++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 91 insertions(+), 5 deletions(-) diff --git a/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java b/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java index 7c563e8..1da8717 100644 --- a/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java +++ b/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java @@ -3,10 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.*; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -83,7 +80,7 @@ } - +//娑堣垂娑堟伅 public List<String> readMessages(String queueName,boolean is) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); @@ -108,6 +105,95 @@ return messages; } + + + private static Set<String> sentMessageIds = new HashSet<>(); +//鏍规嵁id鍙戦�佹秷鎭� + public static boolean sendMessageWithId(String queueName, String message, String messageId) throws Exception { + if (sentMessageIds.contains(messageId)) { + System.err.println("Message with ID " + messageId + " has already been sent."); + return false; // 娑堟伅閲嶅锛屽彂閫佸け璐� + } + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + + try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { + channel.queueDeclare(queueName, false, false, false, args); + + AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() + .messageId(messageId) + .build(); + + channel.basicPublish("", queueName, properties, message.getBytes("UTF-8")); + System.out.println("Sent message with ID: " + messageId); + + sentMessageIds.add(messageId); // 灏� messageId 娣诲姞鍒板凡鍙戦�侀泦鍚堜腑 + + return true; // 娑堟伅鎴愬姛鍙戦�� + } catch (Exception e) { + System.err.println("Failed to send message: " + e.getMessage()); + return false; // 娑堟伅鍙戦�佸け璐� + } + } +//鏍规嵁id娑堣垂娑堟伅 + public static String consumeMessageById( String messageId,String queueName) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + + try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { + channel.queueDeclare(queueName, false, false, false, args); + + GetResponse response; + while ((response = channel.basicGet(queueName, false)) != null) { + String receivedMessage = new String(response.getBody(), "UTF-8"); + if (response.getProps().getMessageId().equals(messageId)) { + long deliveryTag = response.getEnvelope().getDeliveryTag(); + channel.basicAck(deliveryTag, false); + System.out.println("Selected message: "+messageId+ receivedMessage); + return receivedMessage; // 杩斿洖閫夊畾鐨勬秷鎭唴瀹� + } else { + // 瀵逛簬涓嶇鍚堟潯浠剁殑娑堟伅锛岃繘琛� Nack 鎿嶄綔 +// long deliveryTag = response.getEnvelope().getDeliveryTag(); +// channel.basicNack(deliveryTag, false, true); + return "Specified message not found in the queue."; + } + } + + return "Specified message not found in the queue."; + } + } + +//娑堣垂鎸囧畾娑堟伅 + public static String consumeSelectedMessage(int messageToConsume, String queueName) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + + try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { + channel.queueDeclare(queueName, false, false, false, args); + + for (int i = 0; i < messageToConsume; i++) { + GetResponse response = channel.basicGet(queueName, false); + if (response == null) { + return "Queue does not have enough messages."; + } +// long deliveryTag = response.getEnvelope().getDeliveryTag(); +// channel.basicAck(deliveryTag, false); + } + + GetResponse selectedResponse = channel.basicGet(queueName, false); + if (selectedResponse != null) { + byte[] body = selectedResponse.getBody(); + String selectedMessage = new String(body, "UTF-8"); + long deliveryTag = selectedResponse.getEnvelope().getDeliveryTag(); + channel.basicAck(deliveryTag, false); + return selectedMessage; + } else { + return "Specified message not found in the queue."; + } + } + } + // 鍙戦�� JSON 娑堟伅鍒伴槦鍒� public static boolean sendJsonMessage(Object message, String queueName) throws Exception { // 鍒涘缓杩炴帴宸ュ巶骞惰缃富鏈哄悕 -- Gitblit v1.8.0