wuyouming666
2024-02-28 1fdd85d07cf1ed861e8efaf685674049d6c1284e
增加mq 根据id 、队列名、消息内容 发送和消费消息方法,
3个文件已修改
166 ■■■■■ 已修改文件
UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java 96 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
UnLoadGlassModule/src/main/java/com/mes/service/ModuleA.java 63 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
UnLoadGlassModule/src/main/java/com/mes/service/ModuleB.java 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java
@@ -3,10 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -83,7 +80,7 @@
    }
//消费消息
    public List<String> readMessages(String queueName,boolean is) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
@@ -108,6 +105,95 @@
        return messages;
    }
    private static Set<String> sentMessageIds = new HashSet<>();
//根据id发送消息
    public static boolean sendMessageWithId(String queueName, String message, String messageId) throws Exception {
        if (sentMessageIds.contains(messageId)) {
            System.err.println("Message with ID " + messageId + " has already been sent.");
            return false; // 消息重复,发送失败
        }
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .messageId(messageId)
                    .build();
            channel.basicPublish("", queueName, properties, message.getBytes("UTF-8"));
            System.out.println("Sent message with ID: " + messageId);
            sentMessageIds.add(messageId); // 将 messageId 添加到已发送集合中
            return true; // 消息成功发送
        } catch (Exception e) {
            System.err.println("Failed to send message: " + e.getMessage());
            return false; // 消息发送失败
        }
    }
//根据id消费消息
    public static String consumeMessageById( String messageId,String queueName) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
            GetResponse response;
            while ((response = channel.basicGet(queueName, false)) != null) {
                String receivedMessage = new String(response.getBody(), "UTF-8");
                if (response.getProps().getMessageId().equals(messageId)) {
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                    System.out.println("Selected message: "+messageId+ receivedMessage);
                    return receivedMessage; // 返回选定的消息内容
                } else {
                    // 对于不符合条件的消息,进行 Nack 操作
//                    long deliveryTag = response.getEnvelope().getDeliveryTag();
//                    channel.basicNack(deliveryTag, false, true);
                    return "Specified message not found in the queue.";
                }
            }
            return "Specified message not found in the queue.";
        }
    }
//消费指定消息
    public static String consumeSelectedMessage(int messageToConsume, String queueName) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
            for (int i = 0; i < messageToConsume; i++) {
                GetResponse response = channel.basicGet(queueName, false);
                if (response == null) {
                    return "Queue does not have enough messages.";
                }
//                long deliveryTag = response.getEnvelope().getDeliveryTag();
//                channel.basicAck(deliveryTag, false);
            }
            GetResponse selectedResponse = channel.basicGet(queueName, false);
            if (selectedResponse != null) {
                byte[] body = selectedResponse.getBody();
                String selectedMessage = new String(body, "UTF-8");
                long deliveryTag = selectedResponse.getEnvelope().getDeliveryTag();
                channel.basicAck(deliveryTag, false);
                return selectedMessage;
            } else {
                return "Specified message not found in the queue.";
            }
        }
    }
    // 发送 JSON 消息到队列
    public static boolean sendJsonMessage(Object message, String queueName) throws Exception {
        // 创建连接工厂并设置主机名
UnLoadGlassModule/src/main/java/com/mes/service/ModuleA.java
@@ -1,62 +1,25 @@
package com.mes.service;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
import com.mes.common.RabbitMQUtils;
public class ModuleA {
    private static String QUEUE_NAME = "hangzhou2";
    private static RabbitMQUtils receiver;
    public ModuleA(String QUEUENAME){
        QUEUE_NAME=QUEUENAME;
    public ModuleA(String QUEUENAME) {
        QUEUE_NAME = QUEUENAME;
        receiver = new RabbitMQUtils(); // 实例化 RabbitMQUtils 对象
    }
    public static void main(String[] argv) throws Exception {
//        ModuleA moduleA = new ModuleA("hangzhou2"); // 实例化 ModuleA 对象
//        String message = "Your message to send666";
//        String messageId = "5"; // 消息ID
//        receiver.sendMessageWithId(QUEUE_NAME, message, messageId); // 调用 sendMessageWithId 方法发送消息
        receiver.sendMessageWithId("hangzhou2", "Hello RabbitMQ!", "1");
        receiver.sendMessageWithId("hangzhou2", "Another message", "2");
        receiver.sendMessageWithId("hangzhou2", "Yet another message", "1"); // 这条消息会打印重复消息的错误信息
   public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("10.153.19.150");
    //factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             //channel 是通过 connection 创建的一个 AMQP 信道对象
             Channel channel = connection.createChannel()) {
            // `channel.queueDeclare(QUEUE_NAME, false, false, false, null);` 是用来声明一个队列的方法
            //该方法有五个参数:
            //1. `QUEUE_NAME`:指定队列的名称,这里使用了之前声明的常量`"module_queue"`作为队列名称。
            //2. `false`:指定队列是否为持久化的。在这里设置为`false`,表示不将队列持久化到磁盘,一旦RabbitMQ服务停止或崩溃,队列将会丢失。
            //3. `false`:指定是否只允许当前连接声明此队列。在这里设置为`false`,表示可允许其他连接也声明同名的队列。
            //4. `false`:指定是否在不再使用时自动删除队列。在这里设置为`false`,表示当没有消费者或者所有消费者断开连接后,队列不会自动删除。
            //5. `null`:指定其他队列属性的参数。在这里设置为`null`,表示没有其他属性需要设置。
            //执行`channel.queueDeclare()`方法后,如果队列不存在,将会创建一个新的队列,如果队列已经存在,则不做任何操作。
            Map<String, Object> args = new HashMap<>();
            args.put("x-max-length-bytes",1024 * 1024);
//            args.put("x-max-length",5000);
            channel.queueDeclare(QUEUE_NAME, false, false, false, args);
            String message = "你";
            for ( int i=1;i< 20000;i++)
            {
                message+="你";
            }
            message+="b";
            //  将消息发布到指定的队列中。空字符串""表示默认的交换机,QUEUE_NAME指定了目标队列名称,null表示没有指定其他属性,message.getBytes()将消息内容转换为字节数组进行传输。
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
//            DownGlassInfo downGlassInfo = new DownGlassInfo();
//            downGlassInfo.setId(1);
//            System.out.println(" id'" + downGlassInfo.getId() + "'");
        }
    }
}
UnLoadGlassModule/src/main/java/com/mes/service/ModuleB.java
@@ -12,8 +12,11 @@
        RabbitMQUtils receiver = new RabbitMQUtils();
        try {
            String receivedMessage = String.valueOf(receiver.readMessages(QUEUE_NAME,false));
            System.out.println("Received message: " + receivedMessage);
            receiver.consumeMessageById("2",QUEUE_NAME);
           // String receivedMessage = receiver.consumeSelectedMessage(1,QUEUE_NAME);
//            String receivedMessage = String.valueOf(receiver.readMessages(QUEUE_NAME,false));
          // System.out.println("Received message: " + receivedMessage);
        } catch (Exception e) {
            e.printStackTrace();
        }