wu
2024-03-26 e9b0059e5ee9551f7b9b81c9358ce3d42886ae9a
UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java
New file
@@ -0,0 +1,259 @@
package com.mes.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class RabbitMQUtils {
    // ObjectMapper对象,用于序列化和反序列化JSON
    private static ObjectMapper objectMapper = new ObjectMapper();
    private static String host = "10.153.19.150"; // RabbitMQ 主机名
    private static Map<String, Object> args = new HashMap<>(); // 队列参数
    static {
        // 设置队列参数
        args.put("x-max-length-bytes", 1024 * 1024);
    }
    //    private static  Map<String, Object> args = new HashMap<>();
//     args.put("x-max-length-bytes",1024 * 1024);
    // 发送消息到 RabbitMQ 队列中
    public String sendMessage(String message, String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        // 使用 try-with-resources 语句创建连接和通道,并发送消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, args);
            // 发布消息到队列
            channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "' to queue '" + queueName + "'");
        }
        return message;
    }
    // 从 RabbitMQ 队列中接收消息
    public String receiveMessage(String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        // 创建阻塞队列
        BlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(1);
        // 使用 try-with-resources 语句创建连接和通道,并接收消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, args);
            // 设置消息接收回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String receivedMessage = new String(delivery.getBody(), "UTF-8");
               // System.out.println(" [x] Received '" + receivedMessage + "'");
                // 将接收到的消息放入阻塞队列
                try {
                    messageQueue.put(receivedMessage);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            // 消费队列中的消息
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
            });
            // 阻塞并等待获取消息
            return messageQueue.take();
        }
    }
//消费消息
    public List<String> readMessages(String queueName,boolean is) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        List<String> messages = new ArrayList<>();
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            boolean autoAck = false;
            GetResponse response = channel.basicGet(queueName, autoAck);
            if (response != null) {
                String message = new String(response.getBody(), "UTF-8");
                messages.add(message);
                // 手动确认消息处理完成
                if(is){
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                }
            }
        }
        return messages;
    }
    private static Set<String> sentMessageIds = new HashSet<>();
//根据id发送消息
    public static boolean sendMessageWithId(String queueName, String message, String messageId) throws Exception {
        if (sentMessageIds.contains(messageId)) {
            System.err.println("Message with ID " + messageId + " has already been sent.");
            return false; // 消息重复,发送失败
        }
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .messageId(messageId)
                    .build();
            channel.basicPublish("", queueName, properties, message.getBytes("UTF-8"));
            System.out.println("Sent message with ID: " + messageId);
            sentMessageIds.add(messageId); // 将 messageId 添加到已发送集合中
            return true; // 消息成功发送
        } catch (Exception e) {
            System.err.println("Failed to send message: " + e.getMessage());
            return false; // 消息发送失败
        }
    }
//根据id消费消息
    public static String consumeMessageById( String messageId,String queueName) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
            GetResponse response;
            while ((response = channel.basicGet(queueName, false)) != null) {
                String receivedMessage = new String(response.getBody(), "UTF-8");
                if (response.getProps().getMessageId().equals(messageId)) {
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                    System.out.println("Selected message: "+messageId+ receivedMessage);
                    return receivedMessage; // 返回选定的消息内容
                } else {
                    // 对于不符合条件的消息,进行 Nack 操作
//                    long deliveryTag = response.getEnvelope().getDeliveryTag();
//                    channel.basicNack(deliveryTag, false, true);
                    return "Specified message not found in the queue.";
                }
            }
            return "Specified message not found in the queue.";
        }
    }
//消费指定消息
    public static String consumeSelectedMessage(int messageToConsume, String queueName) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
            for (int i = 0; i < messageToConsume; i++) {
                GetResponse response = channel.basicGet(queueName, false);
                if (response == null) {
                    return "Queue does not have enough messages.";
                }
//                long deliveryTag = response.getEnvelope().getDeliveryTag();
//                channel.basicAck(deliveryTag, false);
            }
            GetResponse selectedResponse = channel.basicGet(queueName, false);
            if (selectedResponse != null) {
                byte[] body = selectedResponse.getBody();
                String selectedMessage = new String(body, "UTF-8");
                long deliveryTag = selectedResponse.getEnvelope().getDeliveryTag();
                channel.basicAck(deliveryTag, false);
                return selectedMessage;
            } else {
                return "Specified message not found in the queue.";
            }
        }
    }
    // 发送 JSON 消息到队列
    public static boolean sendJsonMessage(Object message, String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        // 使用 try-with-resources 语句创建连接和通道,并发送 JSON 消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, args);
            // 将对象转换为 JSON 字符串
            String jsonMessage = objectMapper.writeValueAsString(message);
            // 发布 JSON 消息到队列
            channel.basicPublish("", queueName, null, jsonMessage.getBytes());
            System.out.println(" [x] Sent JSON message: '" + jsonMessage + "' to queue '" + queueName + "'");
            return true; // 发送消息成功
        } catch (Exception e) {
            e.printStackTrace();
            return false; // 发送消息失败
        }
    }
    // 接收 JSON 消息并转换为对象
    public static <T> T receiveJsonMessage(Class<T> valueType, String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        // 使用 try-with-resources 语句创建连接和通道,并接收 JSON 消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, args);
            // 获取队列中的消息
            GetResponse response = channel.basicGet(queueName, true);
            if (response != null) {
                byte[] body = response.getBody();
                String jsonMessage = new String(body, "UTF-8");
                System.out.println(" [x] Received JSON message: '" + jsonMessage + "' from queue '" + queueName + "'");
                // 将 JSON 消息转换为对象
                return objectMapper.readValue(jsonMessage, valueType);
            } else {
                return null;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return null; // 接收消息失败
        }
    }
}
//    RabbitMQUtils.sendMessage("Hello, world!", "module_queue");
//RabbitMQUtils.receiveMessage("module_queue");
//RabbitMQUtils.sendJsonMessage(someObject, "another_queue");
//RabbitMQUtils.receiveJsonMessage(SomeClass.class, "another_queue");
// 需要将 someObject 替换为你要发送的对象,并将 SomeClass 替换为你要接收并转换的对象类型。