package com.example.springboot.component; import com.rabbitmq.client.*; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.ArrayList; import java.util.List; public class RabbitMQUtils { // ObjectMapper对象,用于序列化和反序列化JSON private static ObjectMapper objectMapper = new ObjectMapper(); // 发送消息到 RabbitMQ 队列中 public static void sendMessage(String message, String queueName) throws Exception { // 创建连接工厂并设置主机名 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); // 使用 try-with-resources 语句创建连接和通道,并发送消息 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明队列 channel.queueDeclare(queueName, false, false, false, null); // 发布消息到队列 channel.basicPublish("", queueName, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "' to queue '" + queueName + "'"); } } public List readMessages(String queueName) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); List messages = new ArrayList<>(); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { boolean autoAck = false; GetResponse response = channel.basicGet(queueName, autoAck); if (response != null) { String message = new String(response.getBody(), "UTF-8"); messages.add(message); // 手动确认消息处理完成 long deliveryTag = response.getEnvelope().getDeliveryTag(); channel.basicAck(deliveryTag, false); } } return messages; } // 从 RabbitMQ 队列中接收消息 public static void receiveMessage(String queueName) throws Exception { // 创建连接工厂并设置主机名 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); // 使用 try-with-resources 语句创建连接和通道,并接收消息 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明队列 channel.queueDeclare(queueName, false, false, false, null); // 设置消息接收回调 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String receivedMessage = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + receivedMessage + "'"); }; // 消费队列中的消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } // 发送 JSON 消息到队列 public static void sendJsonMessage(Object message, String queueName) throws Exception { // 创建连接工厂并设置主机名 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); // 使用 try-with-resources 语句创建连接和通道,并发送 JSON 消息 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明队列 channel.queueDeclare(queueName, false, false, false, null); // 将对象转换为 JSON 字符串 String jsonMessage = objectMapper.writeValueAsString(message); // 发布 JSON 消息到队列 channel.basicPublish("", queueName, null, jsonMessage.getBytes()); System.out.println(" [x] Sent JSON message: '" + jsonMessage + "' to queue '" + queueName + "'"); } } // 接收 JSON 消息并转换为对象 public static T receiveJsonMessage(Class valueType, String queueName) throws Exception { // 创建连接工厂并设置主机名 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); // 使用 try-with-resources 语句创建连接和通道,并接收 JSON 消息 try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 声明队列 channel.queueDeclare(queueName, false, false, false, null); // 获取队列中的消息 GetResponse response = channel.basicGet(queueName, true); if (response != null) { byte[] body = response.getBody(); String jsonMessage = new String(body, "UTF-8"); System.out.println(" [x] Received JSON message: '" + jsonMessage + "' from queue '" + queueName + "'"); // 将 JSON 消息转换为对象 return objectMapper.readValue(jsonMessage, valueType); } else { return null; } } } } // RabbitMQUtils.sendMessage("Hello, world!", "module_queue"); //RabbitMQUtils.receiveMessage("module_queue"); //RabbitMQUtils.sendJsonMessage(someObject, "another_queue"); //RabbitMQUtils.receiveJsonMessage(SomeClass.class, "another_queue"); // 需要将 someObject 替换为你要发送的对象,并将 SomeClass 替换为你要接收并转换的对象类型。