package com.example.erp.common;
|
|
import com.rabbitmq.client.*;
|
|
import java.io.IOException;
|
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.TimeoutException;
|
|
public class RabbitMQUtil {
|
|
private static final String SEND_QUEUE_NAME = "temperingUsed";
|
private static final String RECEIVE_QUEUE_NAME = "temperingReturn";
|
private static final String HOST = "localhost";
|
private static final String USERNAME = "guest";
|
private static final String PASSWORD = "guest";
|
|
private ConnectionFactory factory;
|
private Connection connection;
|
private Channel sendChannel;
|
private Channel receiveChannel;
|
private BlockingQueue<String> messageQueue;
|
|
public RabbitMQUtil() throws IOException, TimeoutException {
|
factory = new ConnectionFactory();
|
factory.setHost(HOST);
|
factory.setUsername(USERNAME);
|
factory.setPassword(PASSWORD);
|
connection = factory.newConnection();
|
|
sendChannel = connection.createChannel();
|
sendChannel.queueDeclare(SEND_QUEUE_NAME, false, false, false, null);
|
|
receiveChannel = connection.createChannel();
|
receiveChannel.queueDeclare(RECEIVE_QUEUE_NAME, false, false, false, null);
|
|
messageQueue = new ArrayBlockingQueue<>(100); // 设置队列大小
|
startConsuming();
|
}
|
|
public void sendMessage(String message) throws IOException {
|
sendChannel.basicPublish("", SEND_QUEUE_NAME, null, message.getBytes());
|
}
|
|
public String receiveMessages() throws InterruptedException {
|
return messageQueue.take(); // 阻塞直到有消息可用
|
}
|
|
private void startConsuming() throws IOException {
|
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
|
String message = new String(delivery.getBody(), "UTF-8");
|
messageQueue.offer(message); // 将消息放入队列
|
};
|
receiveChannel.basicConsume(RECEIVE_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
|
}
|
|
public void close() throws IOException, TimeoutException {
|
if (sendChannel != null) {
|
sendChannel.close();
|
}
|
if (receiveChannel != null) {
|
receiveChannel.close();
|
}
|
if (connection != null) {
|
connection.close();
|
}
|
}
|
}
|