wuyouming666
2024-02-28 1fdd85d07cf1ed861e8efaf685674049d6c1284e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
package com.mes.common;
 
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.*;
 
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
 
public class RabbitMQUtils {
    // ObjectMapper对象,用于序列化和反序列化JSON
    private static ObjectMapper objectMapper = new ObjectMapper();
    private static String host = "10.153.19.150"; // RabbitMQ 主机名
    private static Map<String, Object> args = new HashMap<>(); // 队列参数
 
    static {
        // 设置队列参数
        args.put("x-max-length-bytes", 1024 * 1024);
    }
 
    //    private static  Map<String, Object> args = new HashMap<>();
//     args.put("x-max-length-bytes",1024 * 1024);
    // 发送消息到 RabbitMQ 队列中
    public String sendMessage(String message, String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
 
        // 使用 try-with-resources 语句创建连接和通道,并发送消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, args);
 
            // 发布消息到队列
            channel.basicPublish("", queueName, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "' to queue '" + queueName + "'");
        }
 
        return message;
    }
 
 
    // 从 RabbitMQ 队列中接收消息
    public String receiveMessage(String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
 
        // 创建阻塞队列
        BlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(1);
 
        // 使用 try-with-resources 语句创建连接和通道,并接收消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, args);
 
            // 设置消息接收回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String receivedMessage = new String(delivery.getBody(), "UTF-8");
               // System.out.println(" [x] Received '" + receivedMessage + "'");
 
                // 将接收到的消息放入阻塞队列
                try {
                    messageQueue.put(receivedMessage);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
 
            // 消费队列中的消息
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
 
            });
 
            // 阻塞并等待获取消息
            return messageQueue.take();
        }
    }
 
 
//消费消息
    public List<String> readMessages(String queueName,boolean is) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        List<String> messages = new ArrayList<>();
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            boolean autoAck = false;
 
            GetResponse response = channel.basicGet(queueName, autoAck);
            if (response != null) {
                String message = new String(response.getBody(), "UTF-8");
                messages.add(message);
                // 手动确认消息处理完成
                if(is){
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                }
 
            }
        }
 
        return messages;
    }
 
 
 
    private static Set<String> sentMessageIds = new HashSet<>();
//根据id发送消息
    public static boolean sendMessageWithId(String queueName, String message, String messageId) throws Exception {
        if (sentMessageIds.contains(messageId)) {
            System.err.println("Message with ID " + messageId + " has already been sent.");
            return false; // 消息重复,发送失败
        }
 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
 
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
 
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .messageId(messageId)
                    .build();
 
            channel.basicPublish("", queueName, properties, message.getBytes("UTF-8"));
            System.out.println("Sent message with ID: " + messageId);
 
            sentMessageIds.add(messageId); // 将 messageId 添加到已发送集合中
 
            return true; // 消息成功发送
        } catch (Exception e) {
            System.err.println("Failed to send message: " + e.getMessage());
            return false; // 消息发送失败
        }
    }
//根据id消费消息
    public static String consumeMessageById( String messageId,String queueName) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
 
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
 
            GetResponse response;
            while ((response = channel.basicGet(queueName, false)) != null) {
                String receivedMessage = new String(response.getBody(), "UTF-8");
                if (response.getProps().getMessageId().equals(messageId)) {
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                    System.out.println("Selected message: "+messageId+ receivedMessage);
                    return receivedMessage; // 返回选定的消息内容
                } else {
                    // 对于不符合条件的消息,进行 Nack 操作
//                    long deliveryTag = response.getEnvelope().getDeliveryTag();
//                    channel.basicNack(deliveryTag, false, true);
                    return "Specified message not found in the queue.";
                }
            }
 
            return "Specified message not found in the queue.";
        }
    }
 
//消费指定消息
    public static String consumeSelectedMessage(int messageToConsume, String queueName) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
 
        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
            channel.queueDeclare(queueName, false, false, false, args);
 
            for (int i = 0; i < messageToConsume; i++) {
                GetResponse response = channel.basicGet(queueName, false);
                if (response == null) {
                    return "Queue does not have enough messages.";
                }
//                long deliveryTag = response.getEnvelope().getDeliveryTag();
//                channel.basicAck(deliveryTag, false);
            }
 
            GetResponse selectedResponse = channel.basicGet(queueName, false);
            if (selectedResponse != null) {
                byte[] body = selectedResponse.getBody();
                String selectedMessage = new String(body, "UTF-8");
                long deliveryTag = selectedResponse.getEnvelope().getDeliveryTag();
                channel.basicAck(deliveryTag, false);
                return selectedMessage;
            } else {
                return "Specified message not found in the queue.";
            }
        }
    }
 
    // 发送 JSON 消息到队列
    public static boolean sendJsonMessage(Object message, String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
 
        // 使用 try-with-resources 语句创建连接和通道,并发送 JSON 消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, args);
 
            // 将对象转换为 JSON 字符串
            String jsonMessage = objectMapper.writeValueAsString(message);
 
            // 发布 JSON 消息到队列
            channel.basicPublish("", queueName, null, jsonMessage.getBytes());
            System.out.println(" [x] Sent JSON message: '" + jsonMessage + "' to queue '" + queueName + "'");
 
            return true; // 发送消息成功
        } catch (Exception e) {
            e.printStackTrace();
            return false; // 发送消息失败
        }
    }
 
    // 接收 JSON 消息并转换为对象
    public static <T> T receiveJsonMessage(Class<T> valueType, String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
 
        // 使用 try-with-resources 语句创建连接和通道,并接收 JSON 消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, args);
 
            // 获取队列中的消息
            GetResponse response = channel.basicGet(queueName, true);
            if (response != null) {
                byte[] body = response.getBody();
                String jsonMessage = new String(body, "UTF-8");
                System.out.println(" [x] Received JSON message: '" + jsonMessage + "' from queue '" + queueName + "'");
 
                // 将 JSON 消息转换为对象
                return objectMapper.readValue(jsonMessage, valueType);
            } else {
                return null;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return null; // 接收消息失败
        }
    }
 
}
 
//    RabbitMQUtils.sendMessage("Hello, world!", "module_queue");
//RabbitMQUtils.receiveMessage("module_queue");
//RabbitMQUtils.sendJsonMessage(someObject, "another_queue");
//RabbitMQUtils.receiveJsonMessage(SomeClass.class, "another_queue");
// 需要将 someObject 替换为你要发送的对象,并将 SomeClass 替换为你要接收并转换的对象类型。