package com.mes.connect.modbus;
|
|
import com.mes.connect.IndustrialInterface.AddressParser;
|
import com.mes.connect.IndustrialInterface.IndustrialDataHandler;
|
import com.mes.connect.IndustrialInterface.IndustrialServer;
|
import com.mes.connect.addressParser.ModbusAddressParser;
|
|
import java.io.DataInputStream;
|
import java.io.DataOutputStream;
|
import java.io.IOException;
|
import java.net.ServerSocket;
|
import java.net.Socket;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.logging.Logger;
|
|
/**
|
* Modbus TCP服务器实现
|
*/
|
public class ModbusTcpServer implements IndustrialServer {
|
private static final Logger logger = Logger.getLogger(ModbusTcpServer.class.getName());
|
private ServerSocket serverSocket;
|
private Thread serverThread;
|
private boolean running;
|
private IndustrialDataHandler handler;
|
private final int port;
|
private final ExecutorService threadPool = Executors.newFixedThreadPool(10);
|
private final AddressParser addressParser = new ModbusAddressParser();
|
|
public ModbusTcpServer(int port) {
|
this.port = port;
|
}
|
|
@Override
|
public synchronized void start() throws IOException {
|
if (!running) {
|
serverSocket = new ServerSocket(port);
|
serverThread = new Thread(this::serverLoop);
|
serverThread.setDaemon(true);
|
serverThread.start();
|
running = true;
|
logger.info("Modbus TCP server started on port " + port);
|
}
|
}
|
|
@Override
|
public synchronized void stop() {
|
if (running) {
|
running = false;
|
try {
|
if (serverSocket != null) {
|
serverSocket.close();
|
}
|
if (serverThread != null) {
|
serverThread.interrupt();
|
serverThread.join(1000);
|
}
|
threadPool.shutdownNow();
|
} catch (IOException | InterruptedException e) {
|
logger.warning("Error stopping Modbus TCP server: " + e.getMessage());
|
} finally {
|
serverSocket = null;
|
serverThread = null;
|
logger.info("Modbus TCP server stopped");
|
}
|
}
|
}
|
|
@Override
|
public boolean isRunning() {
|
return running;
|
}
|
|
@Override
|
public void setDataHandler(IndustrialDataHandler handler) {
|
this.handler = handler;
|
}
|
|
@Override
|
public void close() throws IOException {
|
stop();
|
}
|
|
private void serverLoop() {
|
while (running) {
|
try {
|
Socket clientSocket = serverSocket.accept();
|
threadPool.submit(() -> handleClient(clientSocket));
|
} catch (IOException e) {
|
if (running) {
|
logger.warning("Error accepting client connection: " + e.getMessage());
|
}
|
}
|
}
|
}
|
|
private void handleClient(Socket clientSocket) {
|
try (DataInputStream input = new DataInputStream(clientSocket.getInputStream());
|
DataOutputStream output = new DataOutputStream(clientSocket.getOutputStream())) {
|
|
byte[] request = readRequest(input);
|
byte[] response = processRequest(request);
|
output.write(response);
|
|
} catch (IOException e) {
|
logger.warning("Error handling client: " + e.getMessage());
|
}
|
}
|
|
private byte[] readRequest(DataInputStream input) throws IOException {
|
// 读取Modbus TCP请求
|
byte[] header = new byte[6];
|
int bytesRead = 0;
|
|
// 读取头部
|
while (bytesRead < 6) {
|
int count = input.read(header, bytesRead, 6 - bytesRead);
|
if (count == -1) {
|
throw new IOException("Connection closed while reading request header");
|
}
|
bytesRead += count;
|
}
|
|
int length = ((header[4] & 0xFF) << 8) | (header[5] & 0xFF);
|
byte[] request = new byte[6 + length];
|
|
// 复制头部
|
System.arraycopy(header, 0, request, 0, 6);
|
|
// 读取剩余部分
|
bytesRead = 0;
|
while (bytesRead < length) {
|
int count = input.read(request, 6 + bytesRead, length - bytesRead);
|
if (count == -1) {
|
throw new IOException("Connection closed while reading request body");
|
}
|
bytesRead += count;
|
}
|
|
return request;
|
}
|
|
private byte[] processRequest(byte[] request) {
|
int transactionId = ((request[0] & 0xFF) << 8) | (request[1] & 0xFF);
|
int unitId = request[6] & 0xFF;
|
int functionCode = request[7] & 0xFF;
|
|
try {
|
switch (functionCode) {
|
case 1: // 读线圈
|
return handleReadCoils(request, transactionId, unitId);
|
case 2: // 读离散输入
|
return handleReadDiscreteInputs(request, transactionId, unitId);
|
case 3: // 读保持寄存器
|
return handleReadHoldingRegisters(request, transactionId, unitId);
|
case 4: // 读输入寄存器
|
return handleReadInputRegisters(request, transactionId, unitId);
|
case 5: // 写单个线圈
|
return handleWriteSingleCoil(request, transactionId, unitId);
|
case 6: // 写单个寄存器
|
return handleWriteSingleRegister(request, transactionId, unitId);
|
case 15: // 写多个线圈
|
return handleWriteMultipleCoils(request, transactionId, unitId);
|
case 16: // 写多个寄存器
|
return handleWriteMultipleRegisters(request, transactionId, unitId);
|
default:
|
return buildExceptionResponse(transactionId, unitId, functionCode, 1); // 不支持的功能码
|
}
|
} catch (Exception e) {
|
logger.warning("Error processing request: " + e.getMessage());
|
return buildExceptionResponse(transactionId, unitId, functionCode, 4); // 服务器设备故障
|
}
|
}
|
|
private byte[] handleReadCoils(byte[] request, int transactionId, int unitId) {
|
int address = ((request[8] & 0xFF) << 8) | (request[9] & 0xFF);
|
int quantity = ((request[10] & 0xFF) << 8) | (request[11] & 0xFF);
|
|
if (quantity > 2000) {
|
return buildExceptionResponse(transactionId, unitId, 1, 3); // 请求数量过大
|
}
|
|
byte[] response = new byte[9 + (quantity + 7) / 8];
|
response[0] = (byte) (transactionId >> 8);
|
response[1] = (byte) transactionId;
|
response[2] = 0; // 协议标识符高字节
|
response[3] = 0; // 协议标识符低字节
|
response[4] = 0; // 长度高字节
|
response[5] = (byte) (response.length - 6); // 长度低字节
|
response[6] = (byte) unitId;
|
response[7] = 1; // 功能码
|
response[8] = (byte) ((quantity + 7) / 8); // 字节数
|
|
// 填充线圈数据
|
for (int i = 0; i < quantity; i++) {
|
String addressStr = String.format("MB.0x%04d.%d", address + i, 0);
|
boolean value = handler.handleReadBit(addressStr);
|
int byteIndex = 9 + i / 8;
|
int bitIndex = i % 8;
|
if (value) {
|
response[byteIndex] |= (1 << bitIndex);
|
}
|
}
|
|
return response;
|
}
|
|
private byte[] handleReadDiscreteInputs(byte[] request, int transactionId, int unitId) {
|
int address = ((request[8] & 0xFF) << 8) | (request[9] & 0xFF);
|
int quantity = ((request[10] & 0xFF) << 8) | (request[11] & 0xFF);
|
|
if (quantity > 2000) {
|
return buildExceptionResponse(transactionId, unitId, 2, 3); // 请求数量过大
|
}
|
|
byte[] response = new byte[9 + (quantity + 7) / 8];
|
response[0] = (byte) (transactionId >> 8);
|
response[1] = (byte) transactionId;
|
response[2] = 0; // 协议标识符高字节
|
response[3] = 0; // 协议标识符低字节
|
response[4] = 0; // 长度高字节
|
response[5] = (byte) (response.length - 6); // 长度低字节
|
response[6] = (byte) unitId;
|
response[7] = 2; // 功能码
|
response[8] = (byte) ((quantity + 7) / 8); // 字节数
|
|
// 填充离散输入数据
|
for (int i = 0; i < quantity; i++) {
|
String addressStr = String.format("MB.1x%04d.%d", address + i, 0);
|
boolean value = handler.handleReadBit(addressStr);
|
int byteIndex = 9 + i / 8;
|
int bitIndex = i % 8;
|
if (value) {
|
response[byteIndex] |= (1 << bitIndex);
|
}
|
}
|
|
return response;
|
}
|
|
private byte[] handleReadHoldingRegisters(byte[] request, int transactionId, int unitId) {
|
int address = ((request[8] & 0xFF) << 8) | (request[9] & 0xFF);
|
int quantity = ((request[10] & 0xFF) << 8) | (request[11] & 0xFF);
|
|
if (quantity > 125) {
|
return buildExceptionResponse(transactionId, unitId, 3, 3); // 请求数量过大
|
}
|
|
byte[] response = new byte[9 + quantity * 2];
|
response[0] = (byte) (transactionId >> 8);
|
response[1] = (byte) transactionId;
|
response[2] = 0; // 协议标识符高字节
|
response[3] = 0; // 协议标识符低字节
|
response[4] = 0; // 长度高字节
|
response[5] = (byte) (response.length - 6); // 长度低字节
|
response[6] = (byte) unitId;
|
response[7] = 3; // 功能码
|
response[8] = (byte) (quantity * 2); // 字节数
|
|
// 填充保持寄存器数据
|
for (int i = 0; i < quantity; i++) {
|
String addressStr = String.format("MB.4x%04d", address + i);
|
int value = handler.handleReadRegister(addressStr);
|
int byteIndex = 9 + i * 2;
|
response[byteIndex] = (byte) (value >> 8);
|
response[byteIndex + 1] = (byte) value;
|
}
|
|
return response;
|
}
|
|
private byte[] handleReadInputRegisters(byte[] request, int transactionId, int unitId) {
|
int address = ((request[8] & 0xFF) << 8) | (request[9] & 0xFF);
|
int quantity = ((request[10] & 0xFF) << 8) | (request[11] & 0xFF);
|
|
if (quantity > 125) {
|
return buildExceptionResponse(transactionId, unitId, 4, 3); // 请求数量过大
|
}
|
|
byte[] response = new byte[9 + quantity * 2];
|
response[0] = (byte) (transactionId >> 8);
|
response[1] = (byte) transactionId;
|
response[2] = 0; // 协议标识符高字节
|
response[3] = 0; // 协议标识符低字节
|
response[4] = 0; // 长度高字节
|
response[5] = (byte) (response.length - 6); // 长度低字节
|
response[6] = (byte) unitId;
|
response[7] = 4; // 功能码
|
response[8] = (byte) (quantity * 2); // 字节数
|
|
// 填充输入寄存器数据
|
for (int i = 0; i < quantity; i++) {
|
String addressStr = String.format("MB.3x%04d", address + i);
|
int value = handler.handleReadRegister(addressStr);
|
int byteIndex = 9 + i * 2;
|
response[byteIndex] = (byte) (value >> 8);
|
response[byteIndex + 1] = (byte) value;
|
}
|
|
return response;
|
}
|
|
private byte[] handleWriteSingleCoil(byte[] request, int transactionId, int unitId) {
|
int address = ((request[8] & 0xFF) << 8) | (request[9] & 0xFF);
|
int value = ((request[10] & 0xFF) << 8) | (request[11] & 0xFF);
|
|
boolean coilValue = (value == 0xFF00);
|
String addressStr = String.format("MB.0x%04d.%d", address, 0);
|
|
handler.handleWriteBit(addressStr, coilValue);
|
|
// 返回与请求相同的数据作为确认
|
byte[] response = new byte[12];
|
System.arraycopy(request, 0, response, 0, 12);
|
return response;
|
}
|
|
private byte[] handleWriteSingleRegister(byte[] request, int transactionId, int unitId) {
|
int address = ((request[8] & 0xFF) << 8) | (request[9] & 0xFF);
|
int value = ((request[10] & 0xFF) << 8) | (request[11] & 0xFF);
|
|
String addressStr = String.format("MB.4x%04d", address);
|
handler.handleWriteRegister(addressStr, value);
|
|
// 返回与请求相同的数据作为确认
|
byte[] response = new byte[12];
|
System.arraycopy(request, 0, response, 0, 12);
|
return response;
|
}
|
|
private byte[] handleWriteMultipleCoils(byte[] request, int transactionId, int unitId) {
|
int address = ((request[8] & 0xFF) << 8) | (request[9] & 0xFF);
|
int quantity = ((request[10] & 0xFF) << 8) | (request[11] & 0xFF);
|
int byteCount = request[12] & 0xFF;
|
|
if (quantity > 1968) {
|
return buildExceptionResponse(transactionId, unitId, 15, 3); // 请求数量过大
|
}
|
|
// 处理线圈值
|
for (int i = 0; i < quantity; i++) {
|
int byteIndex = 13 + i / 8;
|
int bitIndex = i % 8;
|
boolean value = ((request[byteIndex] & (1 << bitIndex)) != 0);
|
String addressStr = String.format("MB.0x%04d.%d", address + i, 0);
|
handler.handleWriteBit(addressStr, value);
|
}
|
|
// 构建响应
|
byte[] response = new byte[12];
|
response[0] = (byte) (transactionId >> 8);
|
response[1] = (byte) transactionId;
|
response[2] = 0; // 协议标识符高字节
|
response[3] = 0; // 协议标识符低字节
|
response[4] = 0; // 长度高字节
|
response[5] = 6; // 长度低字节
|
response[6] = (byte) unitId;
|
response[7] = 15; // 功能码
|
response[8] = (byte) (address >> 8); // 起始地址高字节
|
response[9] = (byte) address; // 起始地址低字节
|
response[10] = (byte) (quantity >> 8); // 数量高字节
|
response[11] = (byte) quantity; // 数量低字节
|
|
return response;
|
}
|
|
private byte[] handleWriteMultipleRegisters(byte[] request, int transactionId, int unitId) {
|
int address = ((request[8] & 0xFF) << 8) | (request[9] & 0xFF);
|
int quantity = ((request[10] & 0xFF) << 8) | (request[11] & 0xFF);
|
int byteCount = request[12] & 0xFF;
|
|
if (quantity > 123) {
|
return buildExceptionResponse(transactionId, unitId, 16, 3); // 请求数量过大
|
}
|
|
int[] values = new int[quantity];
|
|
// 解析寄存器值
|
for (int i = 0; i < quantity; i++) {
|
int byteIndex = 13 + i * 2;
|
values[i] = ((request[byteIndex] & 0xFF) << 8) | (request[byteIndex + 1] & 0xFF);
|
}
|
|
// 处理写入请求
|
String addressStr = String.format("MB.4x%04d", address);
|
handler.handleWriteRegisters(addressStr, values);
|
|
// 构建响应
|
byte[] response = new byte[12];
|
response[0] = (byte) (transactionId >> 8);
|
response[1] = (byte) transactionId;
|
response[2] = 0; // 协议标识符高字节
|
response[3] = 0; // 协议标识符低字节
|
response[4] = 0; // 长度高字节
|
response[5] = 6; // 长度低字节
|
response[6] = (byte) unitId;
|
response[7] = 16; // 功能码
|
response[8] = (byte) (address >> 8); // 起始地址高字节
|
response[9] = (byte) address; // 起始地址低字节
|
response[10] = (byte) (quantity >> 8); // 数量高字节
|
response[11] = (byte) quantity; // 数量低字节
|
|
return response;
|
}
|
|
private byte[] buildExceptionResponse(int transactionId, int unitId, int functionCode, int exceptionCode) {
|
byte[] response = new byte[9];
|
response[0] = (byte) (transactionId >> 8);
|
response[1] = (byte) transactionId;
|
response[2] = 0; // 协议标识符高字节
|
response[3] = 0; // 协议标识符低字节
|
response[4] = 0; // 长度高字节
|
response[5] = 3; // 长度低字节
|
response[6] = (byte) unitId;
|
response[7] = (byte) (functionCode + 0x80); // 异常功能码
|
response[8] = (byte) exceptionCode; // 异常码
|
return response;
|
}
|
}
|