| | |
| | | import com.fasterxml.jackson.databind.ObjectMapper;
|
| | | import com.rabbitmq.client.*;
|
| | |
|
| | | import java.util.ArrayList;
|
| | | import java.util.HashMap;
|
| | | import java.util.List;
|
| | | import java.util.Map;
|
| | | import java.util.*;
|
| | | import java.util.concurrent.ArrayBlockingQueue;
|
| | | import java.util.concurrent.BlockingQueue;
|
| | |
|
| | |
| | | }
|
| | |
|
| | |
|
| | |
|
| | | //消费消息
|
| | | public List<String> readMessages(String queueName,boolean is) throws Exception {
|
| | | ConnectionFactory factory = new ConnectionFactory();
|
| | | factory.setHost(host);
|
| | |
| | | return messages;
|
| | | }
|
| | |
|
| | |
|
| | | //接收队列中所有消息,不消费
|
| | | public static List<String> browseMessages(String queueName) throws Exception {
|
| | | ConnectionFactory factory = new ConnectionFactory();
|
| | | factory.setHost(host);
|
| | | List<String> messages = new ArrayList<>();
|
| | | try (Connection connection = factory.newConnection();
|
| | | Channel channel = connection.createChannel()) {
|
| | | channel.queueDeclare(queueName, false, false, false, args);
|
| | | // 获取队列中的消息
|
| | | GetResponse response;
|
| | | while ((response = channel.basicGet(queueName, false)) != null) {
|
| | | String message = new String(response.getBody(), "UTF-8");
|
| | | messages.add(message);
|
| | | }
|
| | | }
|
| | | return messages;
|
| | | }
|
| | |
|
| | |
|
| | |
|
| | | private static Set<String> sentMessageIds = new HashSet<>();
|
| | | //根据id发送消息
|
| | | public static boolean sendMessageWithId(String queueName, String message, String messageId) throws Exception {
|
| | | if (sentMessageIds.contains(messageId)) {
|
| | | System.err.println("Message with ID " + messageId + " has already been sent.");
|
| | | return false; // 消息重复,发送失败
|
| | | }
|
| | |
|
| | | ConnectionFactory factory = new ConnectionFactory();
|
| | | factory.setHost(host);
|
| | |
|
| | | try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
|
| | | channel.queueDeclare(queueName, false, false, false, args);
|
| | |
|
| | | AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
|
| | | .messageId(messageId)
|
| | | .build();
|
| | |
|
| | | channel.basicPublish("", queueName, properties, message.getBytes("UTF-8"));
|
| | | System.out.println("Sent message with ID: " + messageId);
|
| | |
|
| | | sentMessageIds.add(messageId); // 将 messageId 添加到已发送集合中
|
| | |
|
| | | return true; // 消息成功发送
|
| | | } catch (Exception e) {
|
| | | System.err.println("Failed to send message: " + e.getMessage());
|
| | | return false; // 消息发送失败
|
| | | }
|
| | | }
|
| | | //根据id消费消息
|
| | | public static String consumeMessageById(String messageId, String queueName) throws Exception {
|
| | | ConnectionFactory factory = new ConnectionFactory();
|
| | | factory.setHost(host);
|
| | |
|
| | | try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
|
| | | channel.queueDeclare(queueName, false, false, false, args);
|
| | |
|
| | | GetResponse response;
|
| | | boolean found = false;
|
| | |
|
| | | while ((response = channel.basicGet(queueName, false)) != null) {
|
| | | String receivedMessage = new String(response.getBody(), "UTF-8");
|
| | | if (response.getProps().getMessageId().equals(messageId)) {
|
| | | long deliveryTag = response.getEnvelope().getDeliveryTag();
|
| | | channel.basicAck(deliveryTag, false);
|
| | | System.out.println("Selected message: " + messageId + " " + receivedMessage);
|
| | | return receivedMessage;
|
| | | } else {
|
| | | // 未找到指定消息,继续查找
|
| | | channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
|
| | | }
|
| | | }
|
| | |
|
| | | return "Specified message not found in the queue.";
|
| | | }
|
| | | }
|
| | |
|
| | | //消费指定消息
|
| | | public static String consumeSelectedMessage(int messageToConsume, String queueName) throws Exception {
|
| | | ConnectionFactory factory = new ConnectionFactory();
|
| | | factory.setHost(host);
|
| | |
|
| | | try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
|
| | | channel.queueDeclare(queueName, false, false, false, args);
|
| | |
|
| | | for (int i = 0; i < messageToConsume; i++) {
|
| | | GetResponse response = channel.basicGet(queueName, false);
|
| | | if (response == null) {
|
| | | return "Queue does not have enough messages.";
|
| | | }
|
| | | // long deliveryTag = response.getEnvelope().getDeliveryTag();
|
| | | // channel.basicAck(deliveryTag, false);
|
| | | }
|
| | |
|
| | | GetResponse selectedResponse = channel.basicGet(queueName, false);
|
| | | if (selectedResponse != null) {
|
| | | byte[] body = selectedResponse.getBody();
|
| | | String selectedMessage = new String(body, "UTF-8");
|
| | | long deliveryTag = selectedResponse.getEnvelope().getDeliveryTag();
|
| | | channel.basicAck(deliveryTag, false);
|
| | | return selectedMessage;
|
| | | } else {
|
| | | return "Specified message not found in the queue.";
|
| | | }
|
| | | }
|
| | | }
|
| | |
|
| | | // 发送 JSON 消息到队列
|
| | | public static boolean sendJsonMessage(Object message, String queueName) throws Exception {
|
| | | // 创建连接工厂并设置主机名
|