UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
UnLoadGlassModule/src/main/java/com/mes/service/ModuleA.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
UnLoadGlassModule/src/main/java/com/mes/service/ModuleB.java | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java
@@ -3,10 +3,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -83,7 +80,7 @@ } //消费消息 public List<String> readMessages(String queueName,boolean is) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); @@ -108,6 +105,95 @@ return messages; } private static Set<String> sentMessageIds = new HashSet<>(); //根据id发送消息 public static boolean sendMessageWithId(String queueName, String message, String messageId) throws Exception { if (sentMessageIds.contains(messageId)) { System.err.println("Message with ID " + messageId + " has already been sent."); return false; // 消息重复,发送失败 } ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(queueName, false, false, false, args); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .messageId(messageId) .build(); channel.basicPublish("", queueName, properties, message.getBytes("UTF-8")); System.out.println("Sent message with ID: " + messageId); sentMessageIds.add(messageId); // 将 messageId 添加到已发送集合中 return true; // 消息成功发送 } catch (Exception e) { System.err.println("Failed to send message: " + e.getMessage()); return false; // 消息发送失败 } } //根据id消费消息 public static String consumeMessageById( String messageId,String queueName) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(queueName, false, false, false, args); GetResponse response; while ((response = channel.basicGet(queueName, false)) != null) { String receivedMessage = new String(response.getBody(), "UTF-8"); if (response.getProps().getMessageId().equals(messageId)) { long deliveryTag = response.getEnvelope().getDeliveryTag(); channel.basicAck(deliveryTag, false); System.out.println("Selected message: "+messageId+ receivedMessage); return receivedMessage; // 返回选定的消息内容 } else { // 对于不符合条件的消息,进行 Nack 操作 // long deliveryTag = response.getEnvelope().getDeliveryTag(); // channel.basicNack(deliveryTag, false, true); return "Specified message not found in the queue."; } } return "Specified message not found in the queue."; } } //消费指定消息 public static String consumeSelectedMessage(int messageToConsume, String queueName) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(host); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(queueName, false, false, false, args); for (int i = 0; i < messageToConsume; i++) { GetResponse response = channel.basicGet(queueName, false); if (response == null) { return "Queue does not have enough messages."; } // long deliveryTag = response.getEnvelope().getDeliveryTag(); // channel.basicAck(deliveryTag, false); } GetResponse selectedResponse = channel.basicGet(queueName, false); if (selectedResponse != null) { byte[] body = selectedResponse.getBody(); String selectedMessage = new String(body, "UTF-8"); long deliveryTag = selectedResponse.getEnvelope().getDeliveryTag(); channel.basicAck(deliveryTag, false); return selectedMessage; } else { return "Specified message not found in the queue."; } } } // 发送 JSON 消息到队列 public static boolean sendJsonMessage(Object message, String queueName) throws Exception { // 创建连接工厂并设置主机名 UnLoadGlassModule/src/main/java/com/mes/service/ModuleA.java
@@ -1,62 +1,25 @@ package com.mes.service; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.HashMap; import java.util.Map; import com.mes.common.RabbitMQUtils; public class ModuleA { private static String QUEUE_NAME = "hangzhou2"; private static RabbitMQUtils receiver; public ModuleA(String QUEUENAME){ QUEUE_NAME=QUEUENAME; public ModuleA(String QUEUENAME) { QUEUE_NAME = QUEUENAME; receiver = new RabbitMQUtils(); // 实例化 RabbitMQUtils 对象 } public static void main(String[] argv) throws Exception { // ModuleA moduleA = new ModuleA("hangzhou2"); // 实例化 ModuleA 对象 // String message = "Your message to send666"; // String messageId = "5"; // 消息ID // receiver.sendMessageWithId(QUEUE_NAME, message, messageId); // 调用 sendMessageWithId 方法发送消息 receiver.sendMessageWithId("hangzhou2", "Hello RabbitMQ!", "1"); receiver.sendMessageWithId("hangzhou2", "Another message", "2"); receiver.sendMessageWithId("hangzhou2", "Yet another message", "1"); // 这条消息会打印重复消息的错误信息 public static void main(String[] argv) throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.153.19.150"); //factory.setHost("localhost"); try (Connection connection = factory.newConnection(); //channel 是通过 connection 创建的一个 AMQP 信道对象 Channel channel = connection.createChannel()) { // `channel.queueDeclare(QUEUE_NAME, false, false, false, null);` 是用来声明一个队列的方法 //该方法有五个参数: //1. `QUEUE_NAME`:指定队列的名称,这里使用了之前声明的常量`"module_queue"`作为队列名称。 //2. `false`:指定队列是否为持久化的。在这里设置为`false`,表示不将队列持久化到磁盘,一旦RabbitMQ服务停止或崩溃,队列将会丢失。 //3. `false`:指定是否只允许当前连接声明此队列。在这里设置为`false`,表示可允许其他连接也声明同名的队列。 //4. `false`:指定是否在不再使用时自动删除队列。在这里设置为`false`,表示当没有消费者或者所有消费者断开连接后,队列不会自动删除。 //5. `null`:指定其他队列属性的参数。在这里设置为`null`,表示没有其他属性需要设置。 //执行`channel.queueDeclare()`方法后,如果队列不存在,将会创建一个新的队列,如果队列已经存在,则不做任何操作。 Map<String, Object> args = new HashMap<>(); args.put("x-max-length-bytes",1024 * 1024); // args.put("x-max-length",5000); channel.queueDeclare(QUEUE_NAME, false, false, false, args); String message = "你"; for ( int i=1;i< 20000;i++) { message+="你"; } message+="b"; // 将消息发布到指定的队列中。空字符串""表示默认的交换机,QUEUE_NAME指定了目标队列名称,null表示没有指定其他属性,message.getBytes()将消息内容转换为字节数组进行传输。 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); // DownGlassInfo downGlassInfo = new DownGlassInfo(); // downGlassInfo.setId(1); // System.out.println(" id'" + downGlassInfo.getId() + "'"); } } } UnLoadGlassModule/src/main/java/com/mes/service/ModuleB.java
@@ -12,8 +12,11 @@ RabbitMQUtils receiver = new RabbitMQUtils(); try { String receivedMessage = String.valueOf(receiver.readMessages(QUEUE_NAME,false)); System.out.println("Received message: " + receivedMessage); receiver.consumeMessageById("2",QUEUE_NAME); // String receivedMessage = receiver.consumeSelectedMessage(1,QUEUE_NAME); // String receivedMessage = String.valueOf(receiver.readMessages(QUEUE_NAME,false)); // System.out.println("Received message: " + receivedMessage); } catch (Exception e) { e.printStackTrace(); }