From e9b0059e5ee9551f7b9b81c9358ce3d42886ae9a Mon Sep 17 00:00:00 2001 From: wu <731351411@qq.com> Date: 星期二, 26 三月 2024 13:29:25 +0800 Subject: [PATCH] Merge branch 'master' of http://10.153.19.25:10101/r/HangZhouMes --- UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java | 259 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 259 insertions(+), 0 deletions(-) diff --git a/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java b/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java new file mode 100644 index 0000000..1da8717 --- /dev/null +++ b/UnLoadGlassModule/src/main/java/com/mes/common/RabbitMQUtils.java @@ -0,0 +1,259 @@ +package com.mes.common; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.*; + +import java.util.*; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +public class RabbitMQUtils { + // ObjectMapper瀵硅薄锛岀敤浜庡簭鍒楀寲鍜屽弽搴忓垪鍖朖SON + private static ObjectMapper objectMapper = new ObjectMapper(); + private static String host = "10.153.19.150"; // RabbitMQ 涓绘満鍚� + private static Map<String, Object> args = new HashMap<>(); // 闃熷垪鍙傛暟 + + static { + // 璁剧疆闃熷垪鍙傛暟 + args.put("x-max-length-bytes", 1024 * 1024); + } + + // private static Map<String, Object> args = new HashMap<>(); +// args.put("x-max-length-bytes",1024 * 1024); + // 鍙戦�佹秷鎭埌 RabbitMQ 闃熷垪涓� + public String sendMessage(String message, String queueName) throws Exception { + // 鍒涘缓杩炴帴宸ュ巶骞惰缃富鏈哄悕 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + + // 浣跨敤 try-with-resources 璇彞鍒涘缓杩炴帴鍜岄�氶亾锛屽苟鍙戦�佹秷鎭� + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + // 澹版槑闃熷垪 + channel.queueDeclare(queueName, false, false, false, args); + + // 鍙戝竷娑堟伅鍒伴槦鍒� + channel.basicPublish("", queueName, null, message.getBytes("UTF-8")); + System.out.println(" [x] Sent '" + message + "' to queue '" + queueName + "'"); + } + + return message; + } + + + // 浠� RabbitMQ 闃熷垪涓帴鏀舵秷鎭� + public String receiveMessage(String queueName) throws Exception { + // 鍒涘缓杩炴帴宸ュ巶骞惰缃富鏈哄悕 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + + // 鍒涘缓闃诲闃熷垪 + BlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(1); + + // 浣跨敤 try-with-resources 璇彞鍒涘缓杩炴帴鍜岄�氶亾锛屽苟鎺ユ敹娑堟伅 + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + // 澹版槑闃熷垪 + channel.queueDeclare(queueName, false, false, false, args); + + // 璁剧疆娑堟伅鎺ユ敹鍥炶皟 + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + String receivedMessage = new String(delivery.getBody(), "UTF-8"); + // System.out.println(" [x] Received '" + receivedMessage + "'"); + + // 灏嗘帴鏀跺埌鐨勬秷鎭斁鍏ラ樆濉為槦鍒� + try { + messageQueue.put(receivedMessage); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }; + + // 娑堣垂闃熷垪涓殑娑堟伅 + channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { + + }); + + // 闃诲骞剁瓑寰呰幏鍙栨秷鎭� + return messageQueue.take(); + } + } + + +//娑堣垂娑堟伅 + public List<String> readMessages(String queueName,boolean is) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + List<String> messages = new ArrayList<>(); + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + boolean autoAck = false; + + GetResponse response = channel.basicGet(queueName, autoAck); + if (response != null) { + String message = new String(response.getBody(), "UTF-8"); + messages.add(message); + // 鎵嬪姩纭娑堟伅澶勭悊瀹屾垚 + if(is){ + long deliveryTag = response.getEnvelope().getDeliveryTag(); + channel.basicAck(deliveryTag, false); + } + + } + } + + return messages; + } + + + + private static Set<String> sentMessageIds = new HashSet<>(); +//鏍规嵁id鍙戦�佹秷鎭� + public static boolean sendMessageWithId(String queueName, String message, String messageId) throws Exception { + if (sentMessageIds.contains(messageId)) { + System.err.println("Message with ID " + messageId + " has already been sent."); + return false; // 娑堟伅閲嶅锛屽彂閫佸け璐� + } + + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + + try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { + channel.queueDeclare(queueName, false, false, false, args); + + AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() + .messageId(messageId) + .build(); + + channel.basicPublish("", queueName, properties, message.getBytes("UTF-8")); + System.out.println("Sent message with ID: " + messageId); + + sentMessageIds.add(messageId); // 灏� messageId 娣诲姞鍒板凡鍙戦�侀泦鍚堜腑 + + return true; // 娑堟伅鎴愬姛鍙戦�� + } catch (Exception e) { + System.err.println("Failed to send message: " + e.getMessage()); + return false; // 娑堟伅鍙戦�佸け璐� + } + } +//鏍规嵁id娑堣垂娑堟伅 + public static String consumeMessageById( String messageId,String queueName) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + + try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { + channel.queueDeclare(queueName, false, false, false, args); + + GetResponse response; + while ((response = channel.basicGet(queueName, false)) != null) { + String receivedMessage = new String(response.getBody(), "UTF-8"); + if (response.getProps().getMessageId().equals(messageId)) { + long deliveryTag = response.getEnvelope().getDeliveryTag(); + channel.basicAck(deliveryTag, false); + System.out.println("Selected message: "+messageId+ receivedMessage); + return receivedMessage; // 杩斿洖閫夊畾鐨勬秷鎭唴瀹� + } else { + // 瀵逛簬涓嶇鍚堟潯浠剁殑娑堟伅锛岃繘琛� Nack 鎿嶄綔 +// long deliveryTag = response.getEnvelope().getDeliveryTag(); +// channel.basicNack(deliveryTag, false, true); + return "Specified message not found in the queue."; + } + } + + return "Specified message not found in the queue."; + } + } + +//娑堣垂鎸囧畾娑堟伅 + public static String consumeSelectedMessage(int messageToConsume, String queueName) throws Exception { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + + try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { + channel.queueDeclare(queueName, false, false, false, args); + + for (int i = 0; i < messageToConsume; i++) { + GetResponse response = channel.basicGet(queueName, false); + if (response == null) { + return "Queue does not have enough messages."; + } +// long deliveryTag = response.getEnvelope().getDeliveryTag(); +// channel.basicAck(deliveryTag, false); + } + + GetResponse selectedResponse = channel.basicGet(queueName, false); + if (selectedResponse != null) { + byte[] body = selectedResponse.getBody(); + String selectedMessage = new String(body, "UTF-8"); + long deliveryTag = selectedResponse.getEnvelope().getDeliveryTag(); + channel.basicAck(deliveryTag, false); + return selectedMessage; + } else { + return "Specified message not found in the queue."; + } + } + } + + // 鍙戦�� JSON 娑堟伅鍒伴槦鍒� + public static boolean sendJsonMessage(Object message, String queueName) throws Exception { + // 鍒涘缓杩炴帴宸ュ巶骞惰缃富鏈哄悕 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + + // 浣跨敤 try-with-resources 璇彞鍒涘缓杩炴帴鍜岄�氶亾锛屽苟鍙戦�� JSON 娑堟伅 + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + // 澹版槑闃熷垪 + channel.queueDeclare(queueName, false, false, false, args); + + // 灏嗗璞¤浆鎹负 JSON 瀛楃涓� + String jsonMessage = objectMapper.writeValueAsString(message); + + // 鍙戝竷 JSON 娑堟伅鍒伴槦鍒� + channel.basicPublish("", queueName, null, jsonMessage.getBytes()); + System.out.println(" [x] Sent JSON message: '" + jsonMessage + "' to queue '" + queueName + "'"); + + return true; // 鍙戦�佹秷鎭垚鍔� + } catch (Exception e) { + e.printStackTrace(); + return false; // 鍙戦�佹秷鎭け璐� + } + } + + // 鎺ユ敹 JSON 娑堟伅骞惰浆鎹负瀵硅薄 + public static <T> T receiveJsonMessage(Class<T> valueType, String queueName) throws Exception { + // 鍒涘缓杩炴帴宸ュ巶骞惰缃富鏈哄悕 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost(host); + + // 浣跨敤 try-with-resources 璇彞鍒涘缓杩炴帴鍜岄�氶亾锛屽苟鎺ユ敹 JSON 娑堟伅 + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel()) { + // 澹版槑闃熷垪 + channel.queueDeclare(queueName, false, false, false, args); + + // 鑾峰彇闃熷垪涓殑娑堟伅 + GetResponse response = channel.basicGet(queueName, true); + if (response != null) { + byte[] body = response.getBody(); + String jsonMessage = new String(body, "UTF-8"); + System.out.println(" [x] Received JSON message: '" + jsonMessage + "' from queue '" + queueName + "'"); + + // 灏� JSON 娑堟伅杞崲涓哄璞� + return objectMapper.readValue(jsonMessage, valueType); + } else { + return null; + } + } catch (Exception e) { + e.printStackTrace(); + return null; // 鎺ユ敹娑堟伅澶辫触 + } + } + +} + +// RabbitMQUtils.sendMessage("Hello, world!", "module_queue"); +//RabbitMQUtils.receiveMessage("module_queue"); +//RabbitMQUtils.sendJsonMessage(someObject, "another_queue"); +//RabbitMQUtils.receiveJsonMessage(SomeClass.class, "another_queue"); +// 闇�瑕佸皢 someObject 鏇挎崲涓轰綘瑕佸彂閫佺殑瀵硅薄锛屽苟灏� SomeClass 鏇挎崲涓轰綘瑕佹帴鏀跺苟杞崲鐨勫璞$被鍨嬨�� \ No newline at end of file -- Gitblit v1.8.0