| | |
| | | }
|
| | |
|
| | |
|
| | | //接收队列中所有消息,不消费
|
| | | public static List<String> browseMessages(String queueName) throws Exception {
|
| | | ConnectionFactory factory = new ConnectionFactory();
|
| | | factory.setHost(host);
|
| | | List<String> messages = new ArrayList<>();
|
| | | try (Connection connection = factory.newConnection();
|
| | | Channel channel = connection.createChannel()) {
|
| | | channel.queueDeclare(queueName, false, false, false, args);
|
| | | // 获取队列中的消息
|
| | | GetResponse response;
|
| | | while ((response = channel.basicGet(queueName, false)) != null) {
|
| | | String message = new String(response.getBody(), "UTF-8");
|
| | | messages.add(message);
|
| | | }
|
| | | }
|
| | | return messages;
|
| | | }
|
| | |
|
| | |
|
| | |
|
| | | private static Set<String> sentMessageIds = new HashSet<>();
|
| | | //根据id发送消息
|
| | |
| | | }
|
| | | }
|
| | | //根据id消费消息
|
| | | public static String consumeMessageById( String messageId,String queueName) throws Exception {
|
| | | ConnectionFactory factory = new ConnectionFactory();
|
| | | factory.setHost(host);
|
| | | public static String consumeMessageById(String messageId, String queueName) throws Exception {
|
| | | ConnectionFactory factory = new ConnectionFactory();
|
| | | factory.setHost(host);
|
| | |
|
| | | try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
|
| | | channel.queueDeclare(queueName, false, false, false, args);
|
| | | try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
|
| | | channel.queueDeclare(queueName, false, false, false, args);
|
| | |
|
| | | GetResponse response;
|
| | | while ((response = channel.basicGet(queueName, false)) != null) {
|
| | | String receivedMessage = new String(response.getBody(), "UTF-8");
|
| | | if (response.getProps().getMessageId().equals(messageId)) {
|
| | | long deliveryTag = response.getEnvelope().getDeliveryTag();
|
| | | channel.basicAck(deliveryTag, false);
|
| | | System.out.println("Selected message: "+messageId+ receivedMessage);
|
| | | return receivedMessage; // 返回选定的消息内容
|
| | | } else {
|
| | | // 对于不符合条件的消息,进行 Nack 操作
|
| | | // long deliveryTag = response.getEnvelope().getDeliveryTag();
|
| | | // channel.basicNack(deliveryTag, false, true);
|
| | | return "Specified message not found in the queue.";
|
| | | }
|
| | | GetResponse response;
|
| | | boolean found = false;
|
| | |
|
| | | while ((response = channel.basicGet(queueName, false)) != null) {
|
| | | String receivedMessage = new String(response.getBody(), "UTF-8");
|
| | | if (response.getProps().getMessageId().equals(messageId)) {
|
| | | long deliveryTag = response.getEnvelope().getDeliveryTag();
|
| | | channel.basicAck(deliveryTag, false);
|
| | | System.out.println("Selected message: " + messageId + " " + receivedMessage);
|
| | | return receivedMessage;
|
| | | } else {
|
| | | // 未找到指定消息,继续查找
|
| | | channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
|
| | | }
|
| | |
|
| | | return "Specified message not found in the queue.";
|
| | | }
|
| | |
|
| | | return "Specified message not found in the queue.";
|
| | | }
|
| | | }
|
| | |
|
| | | //消费指定消息
|
| | | public static String consumeSelectedMessage(int messageToConsume, String queueName) throws Exception {
|