ZengTao
2024-05-09 63574bf6cf94613385cb8d0e2c2beb1829f644b5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.example.springboot.component;
 
import com.rabbitmq.client.*;
import com.fasterxml.jackson.databind.ObjectMapper;
 
import java.util.ArrayList;
import java.util.List;
 
public class RabbitMQUtils {
    // ObjectMapper对象,用于序列化和反序列化JSON
    private static ObjectMapper objectMapper = new ObjectMapper();
 
    // 发送消息到 RabbitMQ 队列中
    public static void sendMessage(String message, String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
 
        // 使用 try-with-resources 语句创建连接和通道,并发送消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, null);
 
            // 发布消息到队列
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "' to queue '" + queueName + "'");
        }
    }
 
 
    public List<String> readMessages(String queueName) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        List<String> messages = new ArrayList<>();
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            boolean autoAck = false;
 
            GetResponse response = channel.basicGet(queueName, autoAck);
            if (response != null) {
                String message = new String(response.getBody(), "UTF-8");
                messages.add(message);
                // 手动确认消息处理完成
                long deliveryTag = response.getEnvelope().getDeliveryTag();
                channel.basicAck(deliveryTag, false);
            }
        }
 
        return messages;
    }
 
    // 从 RabbitMQ 队列中接收消息
    public static void receiveMessage(String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
 
        // 使用 try-with-resources 语句创建连接和通道,并接收消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, null);
 
            // 设置消息接收回调
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String receivedMessage = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + receivedMessage + "'");
            };
 
            // 消费队列中的消息
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
            });
        }
    }
 
    // 发送 JSON 消息到队列
    public static void sendJsonMessage(Object message, String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
 
        // 使用 try-with-resources 语句创建连接和通道,并发送 JSON 消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, null);
 
            // 将对象转换为 JSON 字符串
            String jsonMessage = objectMapper.writeValueAsString(message);
 
            // 发布 JSON 消息到队列
            channel.basicPublish("", queueName, null, jsonMessage.getBytes());
            System.out.println(" [x] Sent JSON message: '" + jsonMessage + "' to queue '" + queueName + "'");
        }
    }
 
    // 接收 JSON 消息并转换为对象
    public static <T> T receiveJsonMessage(Class<T> valueType, String queueName) throws Exception {
        // 创建连接工厂并设置主机名
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
 
        // 使用 try-with-resources 语句创建连接和通道,并接收 JSON 消息
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明队列
            channel.queueDeclare(queueName, false, false, false, null);
 
            // 获取队列中的消息
            GetResponse response = channel.basicGet(queueName, true);
            if (response != null) {
                byte[] body = response.getBody();
                String jsonMessage = new String(body, "UTF-8");
                System.out.println(" [x] Received JSON message: '" + jsonMessage + "' from queue '" + queueName + "'");
 
                // 将 JSON 消息转换为对象
                return objectMapper.readValue(jsonMessage, valueType);
            } else {
                return null;
            }
        }
    }
}
 
 
//    RabbitMQUtils.sendMessage("Hello, world!", "module_queue");
//RabbitMQUtils.receiveMessage("module_queue");
//RabbitMQUtils.sendJsonMessage(someObject, "another_queue");
//RabbitMQUtils.receiveJsonMessage(SomeClass.class, "another_queue");
// 需要将 someObject 替换为你要发送的对象,并将 SomeClass 替换为你要接收并转换的对象类型。