package com.mes.common; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.*; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class RabbitMQUtils { // ObjectMapper对象,用于序列化和反序列化JSON private static ObjectMapper objectMapper = new ObjectMapper(); private static String host = "10.153.19.150"; // RabbitMQ 主机名 private static Map args = new HashMap<>(); // 队列参数 static { // 设置队列参数 args.put("x-max-length-bytes", 1024 * 1024); } // private static Map args = new HashMap<>(); // args.put("x-max-length-bytes",1024 * 1024); // 发送消息到 RabbitMQ 队列中 public String sendMessage(String message, String queueName) throws Exception { // 创建连接工厂并设置主机名 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); // 使用 try-with-resources 语句创建连接和通道,并发送消息 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明队列 channel.queueDeclare(queueName, false, false, false, args); // 发布消息到队列 channel.basicPublish("", queueName, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "' to queue '" + queueName + "'"); } return message; } // 从 RabbitMQ 队列中接收消息 public String receiveMessage(String queueName) throws Exception { // 创建连接工厂并设置主机名 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); // 创建阻塞队列 BlockingQueue messageQueue = new ArrayBlockingQueue<>(1); // 使用 try-with-resources 语句创建连接和通道,并接收消息 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明队列 channel.queueDeclare(queueName, false, false, false, args); // 设置消息接收回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String receivedMessage = new String(delivery.getBody(), "UTF-8"); // System.out.println(" [x] Received '" + receivedMessage + "'"); // 将接收到的消息放入阻塞队列 try { messageQueue.put(receivedMessage); } catch (InterruptedException e) { e.printStackTrace(); } }; // 消费队列中的消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); // 阻塞并等待获取消息 return messageQueue.take(); } } //消费消息 public List readMessages(String queueName,boolean is) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); List messages = new ArrayList<>(); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { boolean autoAck = false; GetResponse response = channel.basicGet(queueName, autoAck); if (response != null) { String message = new String(response.getBody(), "UTF-8"); messages.add(message); // 手动确认消息处理完成 if(is){ long deliveryTag = response.getEnvelope().getDeliveryTag(); channel.basicAck(deliveryTag, false); } } } return messages; } //接收队列中所有消息,不消费 public static List browseMessages(String queueName) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); List messages = new ArrayList<>(); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(queueName, false, false, false, args); // 获取队列中的消息 GetResponse response; while ((response = channel.basicGet(queueName, false)) != null) { String message = new String(response.getBody(), "UTF-8"); messages.add(message); } } return messages; } private static Set sentMessageIds = new HashSet<>(); //根据id发送消息 public static boolean sendMessageWithId(String queueName, String message, String messageId) throws Exception { if (sentMessageIds.contains(messageId)) { System.err.println("Message with ID " + messageId + " has already been sent."); return false; // 消息重复,发送失败 } ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(queueName, false, false, false, args); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .messageId(messageId) .build(); channel.basicPublish("", queueName, properties, message.getBytes("UTF-8")); System.out.println("Sent message with ID: " + messageId); sentMessageIds.add(messageId); // 将 messageId 添加到已发送集合中 return true; // 消息成功发送 } catch (Exception e) { System.err.println("Failed to send message: " + e.getMessage()); return false; // 消息发送失败 } } //根据id消费消息 public static String consumeMessageById(String messageId, String queueName) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(queueName, false, false, false, args); GetResponse response; boolean found = false; while ((response = channel.basicGet(queueName, false)) != null) { String receivedMessage = new String(response.getBody(), "UTF-8"); if (response.getProps().getMessageId().equals(messageId)) { long deliveryTag = response.getEnvelope().getDeliveryTag(); channel.basicAck(deliveryTag, false); System.out.println("Selected message: " + messageId + " " + receivedMessage); return receivedMessage; } else { // 未找到指定消息,继续查找 channel.basicReject(response.getEnvelope().getDeliveryTag(), false); } } return "Specified message not found in the queue."; } } //消费指定消息 public static String consumeSelectedMessage(int messageToConsume, String queueName) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(queueName, false, false, false, args); for (int i = 0; i < messageToConsume; i++) { GetResponse response = channel.basicGet(queueName, false); if (response == null) { return "Queue does not have enough messages."; } // long deliveryTag = response.getEnvelope().getDeliveryTag(); // channel.basicAck(deliveryTag, false); } GetResponse selectedResponse = channel.basicGet(queueName, false); if (selectedResponse != null) { byte[] body = selectedResponse.getBody(); String selectedMessage = new String(body, "UTF-8"); long deliveryTag = selectedResponse.getEnvelope().getDeliveryTag(); channel.basicAck(deliveryTag, false); return selectedMessage; } else { return "Specified message not found in the queue."; } } } // 发送 JSON 消息到队列 public static boolean sendJsonMessage(Object message, String queueName) throws Exception { // 创建连接工厂并设置主机名 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); // 使用 try-with-resources 语句创建连接和通道,并发送 JSON 消息 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明队列 channel.queueDeclare(queueName, false, false, false, args); // 将对象转换为 JSON 字符串 String jsonMessage = objectMapper.writeValueAsString(message); // 发布 JSON 消息到队列 channel.basicPublish("", queueName, null, jsonMessage.getBytes()); System.out.println(" [x] Sent JSON message: '" + jsonMessage + "' to queue '" + queueName + "'"); return true; // 发送消息成功 } catch (Exception e) { e.printStackTrace(); return false; // 发送消息失败 } } // 接收 JSON 消息并转换为对象 public static T receiveJsonMessage(Class valueType, String queueName) throws Exception { // 创建连接工厂并设置主机名 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); // 使用 try-with-resources 语句创建连接和通道,并接收 JSON 消息 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明队列 channel.queueDeclare(queueName, false, false, false, args); // 获取队列中的消息 GetResponse response = channel.basicGet(queueName, true); if (response != null) { byte[] body = response.getBody(); String jsonMessage = new String(body, "UTF-8"); System.out.println(" [x] Received JSON message: '" + jsonMessage + "' from queue '" + queueName + "'"); // 将 JSON 消息转换为对象 return objectMapper.readValue(jsonMessage, valueType); } else { return null; } } catch (Exception e) { e.printStackTrace(); return null; // 接收消息失败 } } } // RabbitMQUtils.sendMessage("Hello, world!", "module_queue"); //RabbitMQUtils.receiveMessage("module_queue"); //RabbitMQUtils.sendJsonMessage(someObject, "another_queue"); //RabbitMQUtils.receiveJsonMessage(SomeClass.class, "another_queue"); // 需要将 someObject 替换为你要发送的对象,并将 SomeClass 替换为你要接收并转换的对象类型。