严智鑫
2024-04-19 04b841aa1661693e68f5dea1a80e7c97a209cbeb
UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java
@@ -106,6 +106,25 @@
    }
//接收队列中所有消息,不消费
    public static List<String> browseMessages(String queueName) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        List<String> messages = new ArrayList<>();
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
            // 获取队列中的消息
            GetResponse response;
            while ((response = channel.basicGet(queueName, false)) != null) {
                String message = new String(response.getBody(), "UTF-8");
                messages.add(message);
            }
        }
        return messages;
    }
    private static Set<String> sentMessageIds = new HashSet<>();
//根据id发送消息
@@ -137,32 +156,32 @@
        }
    }
//根据id消费消息
    public static String consumeMessageById( String messageId,String queueName) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
public static String consumeMessageById(String messageId, String queueName) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(host);
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
    try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
        channel.queueDeclare(queueName, false, false, false, args);
            GetResponse response;
            while ((response = channel.basicGet(queueName, false)) != null) {
                String receivedMessage = new String(response.getBody(), "UTF-8");
                if (response.getProps().getMessageId().equals(messageId)) {
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                    System.out.println("Selected message: "+messageId+ receivedMessage);
                    return receivedMessage; // 返回选定的消息内容
                } else {
                    // 对于不符合条件的消息,进行 Nack 操作
//                    long deliveryTag = response.getEnvelope().getDeliveryTag();
//                    channel.basicNack(deliveryTag, false, true);
                    return "Specified message not found in the queue.";
                }
        GetResponse response;
        boolean found = false;
        while ((response = channel.basicGet(queueName, false)) != null) {
            String receivedMessage = new String(response.getBody(), "UTF-8");
            if (response.getProps().getMessageId().equals(messageId)) {
                long deliveryTag = response.getEnvelope().getDeliveryTag();
                channel.basicAck(deliveryTag, false);
                System.out.println("Selected message: " + messageId + " " + receivedMessage);
                return receivedMessage;
            } else {
                // 未找到指定消息,继续查找
                channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
            }
            return "Specified message not found in the queue.";
        }
        return "Specified message not found in the queue.";
    }
}
//消费指定消息
    public static String consumeSelectedMessage(int messageToConsume, String queueName) throws Exception {