wuyouming666
2024-02-28 d765f491fe415a218975892c6f4651c13764e1f3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package com.mes.common;
 
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
 
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
 
public class RabbitMQUtils {
    // ObjectMapper对象,用于序列化和反序列化JSON
    private static ObjectMapper objectMapper = new ObjectMapper();
    private static String host = "10.153.19.150"; // RabbitMQ 主机名
    private static Map<String, Object> args = new HashMap<>(); // 队列参数
 
    static {
        // 设置队列参数
        args.put("x-max-length-bytes", 1024 * 1024);
    }
 
    //    private static  Map<String, Object> args = new HashMap<>();
//     args.put("x-max-length-bytes",1024 * 1024);
    // 发送消息到 RabbitMQ 队列中
    public String sendMessage(String message, String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
 
        // 使用 try-with-resources 语句创建连接和通道,并发送消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, args);
 
            // 发布消息到队列
            channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "' to queue '" + queueName + "'");
        }
 
        return message;
    }
 
 
    // 从 RabbitMQ 队列中接收消息
    public String receiveMessage(String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
 
        // 创建阻塞队列
        BlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(1);
 
        // 使用 try-with-resources 语句创建连接和通道,并接收消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, args);
 
            // 设置消息接收回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String receivedMessage = new String(delivery.getBody(), "UTF-8");
               // System.out.println(" [x] Received '" + receivedMessage + "'");
 
                // 将接收到的消息放入阻塞队列
                try {
                    messageQueue.put(receivedMessage);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
 
            // 消费队列中的消息
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
 
            });
 
            // 阻塞并等待获取消息
            return messageQueue.take();
        }
    }
 
 
 
    public List<String> readMessages(String queueName,boolean is) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        List<String> messages = new ArrayList<>();
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            boolean autoAck = false;
 
            GetResponse response = channel.basicGet(queueName, autoAck);
            if (response != null) {
                String message = new String(response.getBody(), "UTF-8");
                messages.add(message);
                // 手动确认消息处理完成
                if(is){
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                }
 
            }
        }
 
        return messages;
    }
 
    // 发送 JSON 消息到队列
    public static boolean sendJsonMessage(Object message, String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
 
        // 使用 try-with-resources 语句创建连接和通道,并发送 JSON 消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, args);
 
            // 将对象转换为 JSON 字符串
            String jsonMessage = objectMapper.writeValueAsString(message);
 
            // 发布 JSON 消息到队列
            channel.basicPublish("", queueName, null, jsonMessage.getBytes());
            System.out.println(" [x] Sent JSON message: '" + jsonMessage + "' to queue '" + queueName + "'");
 
            return true; // 发送消息成功
        } catch (Exception e) {
            e.printStackTrace();
            return false; // 发送消息失败
        }
    }
 
    // 接收 JSON 消息并转换为对象
    public static <T> T receiveJsonMessage(Class<T> valueType, String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
 
        // 使用 try-with-resources 语句创建连接和通道,并接收 JSON 消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, args);
 
            // 获取队列中的消息
            GetResponse response = channel.basicGet(queueName, true);
            if (response != null) {
                byte[] body = response.getBody();
                String jsonMessage = new String(body, "UTF-8");
                System.out.println(" [x] Received JSON message: '" + jsonMessage + "' from queue '" + queueName + "'");
 
                // 将 JSON 消息转换为对象
                return objectMapper.readValue(jsonMessage, valueType);
            } else {
                return null;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return null; // 接收消息失败
        }
    }
 
}
 
//    RabbitMQUtils.sendMessage("Hello, world!", "module_queue");
//RabbitMQUtils.receiveMessage("module_queue");
//RabbitMQUtils.sendJsonMessage(someObject, "another_queue");
//RabbitMQUtils.receiveJsonMessage(SomeClass.class, "another_queue");
// 需要将 someObject 替换为你要发送的对象,并将 SomeClass 替换为你要接收并转换的对象类型。