package com.example.erp.common; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RabbitMQUtil { private static final String SEND_QUEUE_NAME = "temperingUsed"; private static final String RECEIVE_QUEUE_NAME = "temperingReturn"; private static final String HOST = "localhost"; private static final String USERNAME = "guest"; private static final String PASSWORD = "guest"; private ConnectionFactory factory; private Connection connection; private Channel sendChannel; private Channel receiveChannel; private BlockingQueue messageQueue; public RabbitMQUtil() throws IOException, TimeoutException { factory = new ConnectionFactory(); factory.setHost(HOST); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); connection = factory.newConnection(); sendChannel = connection.createChannel(); sendChannel.queueDeclare(SEND_QUEUE_NAME, false, false, false, null); receiveChannel = connection.createChannel(); receiveChannel.queueDeclare(RECEIVE_QUEUE_NAME, false, false, false, null); messageQueue = new ArrayBlockingQueue<>(100); // 设置队列大小 startConsuming(); } public void sendMessage(String message) throws IOException { sendChannel.basicPublish("", SEND_QUEUE_NAME, null, message.getBytes()); } public String receiveMessages() throws InterruptedException { return messageQueue.take(); // 阻塞直到有消息可用 } private void startConsuming() throws IOException { DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); messageQueue.offer(message); // 将消息放入队列 }; receiveChannel.basicConsume(RECEIVE_QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } public void close() throws IOException, TimeoutException { if (sendChannel != null) { sendChannel.close(); } if (receiveChannel != null) { receiveChannel.close(); } if (connection != null) { connection.close(); } } }