wang
2024-03-28 04adb88a2ed54cdf4c2958c79972c30109b9b5b6
UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java
@@ -3,10 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -83,7 +80,7 @@
    }
//消费消息
    public List<String> readMessages(String queueName,boolean is) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
@@ -108,6 +105,114 @@
        return messages;
    }
//接收队列中所有消息,不消费
    public static List<String> browseMessages(String queueName) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        List<String> messages = new ArrayList<>();
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
            // 获取队列中的消息
            GetResponse response;
            while ((response = channel.basicGet(queueName, false)) != null) {
                String message = new String(response.getBody(), "UTF-8");
                messages.add(message);
            }
        }
        return messages;
    }
    private static Set<String> sentMessageIds = new HashSet<>();
//根据id发送消息
    public static boolean sendMessageWithId(String queueName, String message, String messageId) throws Exception {
        if (sentMessageIds.contains(messageId)) {
            System.err.println("Message with ID " + messageId + " has already been sent.");
            return false; // 消息重复,发送失败
        }
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .messageId(messageId)
                    .build();
            channel.basicPublish("", queueName, properties, message.getBytes("UTF-8"));
            System.out.println("Sent message with ID: " + messageId);
            sentMessageIds.add(messageId); // 将 messageId 添加到已发送集合中
            return true; // 消息成功发送
        } catch (Exception e) {
            System.err.println("Failed to send message: " + e.getMessage());
            return false; // 消息发送失败
        }
    }
//根据id消费消息
public static String consumeMessageById(String messageId, String queueName) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
    try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
        channel.queueDeclare(queueName, false, false, false, args);
        GetResponse response;
        boolean found = false;
        while ((response = channel.basicGet(queueName, false)) != null) {
            String receivedMessage = new String(response.getBody(), "UTF-8");
            if (response.getProps().getMessageId().equals(messageId)) {
                long deliveryTag = response.getEnvelope().getDeliveryTag();
                channel.basicAck(deliveryTag, false);
                System.out.println("Selected message: " + messageId + " " + receivedMessage);
                return receivedMessage;
            } else {
                // 未找到指定消息,继续查找
                channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
            }
        }
        return "Specified message not found in the queue.";
    }
}
//消费指定消息
    public static String consumeSelectedMessage(int messageToConsume, String queueName) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
            for (int i = 0; i < messageToConsume; i++) {
                GetResponse response = channel.basicGet(queueName, false);
                if (response == null) {
                    return "Queue does not have enough messages.";
                }
//                long deliveryTag = response.getEnvelope().getDeliveryTag();
//                channel.basicAck(deliveryTag, false);
            }
            GetResponse selectedResponse = channel.basicGet(queueName, false);
            if (selectedResponse != null) {
                byte[] body = selectedResponse.getBody();
                String selectedMessage = new String(body, "UTF-8");
                long deliveryTag = selectedResponse.getEnvelope().getDeliveryTag();
                channel.basicAck(deliveryTag, false);
                return selectedMessage;
            } else {
                return "Specified message not found in the queue.";
            }
        }
    }
    // 发送 JSON 消息到队列
    public static boolean sendJsonMessage(Object message, String queueName) throws Exception {
        // 创建连接工厂并设置主机名