package com.mes.connect.s7; import com.mes.connect.IndustrialInterface.AddressParser; import com.mes.connect.IndustrialInterface.IndustrialDataHandler; import com.mes.connect.IndustrialInterface.IndustrialServer; import com.mes.connect.addressParser.S7AddressParser; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Logger; /** * S7协议服务器实现(简化版) */ public class S7Server implements IndustrialServer { private static final Logger logger = Logger.getLogger(S7Server.class.getName()); private ServerSocket serverSocket; private Thread serverThread; private boolean running; private IndustrialDataHandler handler; private final int port; private final ExecutorService threadPool = Executors.newFixedThreadPool(10); private final AddressParser addressParser = new S7AddressParser(); public S7Server(int port) { this.port = port; } @Override public synchronized void start() throws IOException { if (!running) { serverSocket = new ServerSocket(port); serverThread = new Thread(this::serverLoop); serverThread.setDaemon(true); serverThread.start(); running = true; logger.info("S7 server started on port " + port); } } @Override public synchronized void stop() { if (running) { running = false; try { if (serverSocket != null) { serverSocket.close(); } if (serverThread != null) { serverThread.interrupt(); serverThread.join(1000); } threadPool.shutdownNow(); } catch (IOException | InterruptedException e) { logger.warning("Error stopping S7 server: " + e.getMessage()); } finally { serverSocket = null; serverThread = null; logger.info("S7 server stopped"); } } } @Override public boolean isRunning() { return running; } @Override public void setDataHandler(IndustrialDataHandler handler) { this.handler = handler; } @Override public void close() throws IOException { stop(); } private void serverLoop() { while (running) { try { Socket clientSocket = serverSocket.accept(); threadPool.submit(() -> handleClient(clientSocket)); } catch (IOException e) { if (running) { logger.warning("Error accepting client connection: " + e.getMessage()); } } } } private void handleClient(Socket clientSocket) { try (DataInputStream input = new DataInputStream(clientSocket.getInputStream()); DataOutputStream output = new DataOutputStream(clientSocket.getOutputStream())) { // S7通信流程:COTP连接建立 -> ISO连接建立 -> S7协议数据交换 // 1. 处理COTP连接请求 if (!handleCotpConnection(input, output)) { return; } // 2. 处理ISO连接请求 if (!handleIsoConnection(input, output)) { return; } // 3. 处理S7协议数据 handleS7Protocol(input, output); } catch (IOException e) { logger.warning("Error handling client: " + e.getMessage()); } } private boolean handleCotpConnection(DataInputStream input, DataOutputStream output) throws IOException { // 读取COTP连接请求 byte[] buffer = new byte[256]; int bytesRead = input.read(buffer); // 简单验证COTP连接请求 if (bytesRead < 6 || buffer[0] != 0x02 || buffer[1] != 0xF0 || buffer[2] != 0x80) { logger.warning("Invalid COTP connection request"); return false; } // 构建COTP连接确认 byte[] response = new byte[6]; response[0] = 0x02; // Length response[1] = (byte) 0xF0; // COTP TPDU type: DT response[2] = (byte) 0xD0; // TPDU number response[3] = 0x00; // Parameters response[4] = 0x00; response[5] = 0x00; output.write(response); return true; } private boolean handleIsoConnection(DataInputStream input, DataOutputStream output) throws IOException { // 读取ISO连接请求 byte[] buffer = new byte[256]; int bytesRead = input.read(buffer); // 简单验证ISO连接请求 if (bytesRead < 10 || buffer[0] != 0x03 || buffer[1] != 0x00) { logger.warning("Invalid ISO connection request"); return false; } // 构建ISO连接确认 byte[] response = new byte[10]; response[0] = 0x03; // Protocol ID response[1] = 0x00; response[2] = 0x00; response[3] = 0x0A; // Length response[4] = 0x02; // COTP header length response[5] = (byte) 0xF0; // COTP TPDU type: CC response[6] = (byte) 0x81; // Parameters response[7] = 0x00; response[8] = 0x00; response[9] = 0x00; output.write(response); return true; } private void handleS7Protocol(DataInputStream input, DataOutputStream output) throws IOException { byte[] buffer = new byte[1024]; while (true) { int bytesRead = input.read(buffer); if (bytesRead <= 0) { break; } // 解析S7请求 byte[] response = processS7Request(buffer, bytesRead); if (response != null) { output.write(response); } } } private byte[] processS7Request(byte[] request, int length) { // 解析S7请求PDU int protocolId = request[0] & 0xFF; int rosctr = request[1] & 0xFF; // 1 = Job, 3 = Ack_Data if (protocolId != 0x32 || rosctr != 1) { logger.warning("Unsupported S7 request type"); return null; } int function = request[12] & 0xFF; try { switch (function) { case 0x04: // 读请求 return handleS7ReadRequest(request, length); case 0x05: // 写请求 return handleS7WriteRequest(request, length); default: logger.warning("Unsupported S7 function: " + function); return buildS7ErrorResponse(request, 0x01); // 功能不支持 } } catch (Exception e) { logger.warning("Error processing S7 request: " + e.getMessage()); return buildS7ErrorResponse(request, 0x03); // 处理错误 } } private byte[] handleS7ReadRequest(byte[] request, int length) { // 解析请求参数 int itemCount = request[22] & 0xFF; // 构建响应 int responseLength = 24 + itemCount * 12; byte[] response = new byte[responseLength]; // 复制通用头部 System.arraycopy(request, 0, response, 0, 12); // 设置响应特定字段 response[1] = 0x03; // ROSCTR = Ack_Data response[10] = 0x00; // 参数长度 response[11] = 0x00; response[12] = 0x04; // 功能码:读 // 设置数据部分 response[13] = 0x00; // 数据长度高字节 response[14] = (byte) (responseLength - 16); // 数据长度低字节 response[15] = 0x00; // 返回代码:成功 // 处理每个读取项 int offset = 16; for (int i = 0; i < itemCount; i++) { int area = request[offset + 2] & 0xFF; int dbNumber = ((request[offset + 3] & 0xFF) << 8) | (request[offset + 4] & 0xFF); int wordLen = request[offset + 5] & 0xFF; int startAddress = ((request[offset + 6] & 0xFF) << 16) | ((request[offset + 7] & 0xFF) << 8) | (request[offset + 8] & 0xFF); int count = ((request[offset + 9] & 0xFF) << 8) | (request[offset + 10] & 0xFF); // 构建地址字符串 String addressStr; if (area == 0x84) { // DB addressStr = String.format("S7.DB%d.DBW%d", dbNumber, startAddress); } else if (area == 0x81) { // 输入 addressStr = String.format("S7.I%d", startAddress); } else if (area == 0x82) { // 输出 addressStr = String.format("S7.Q%d", startAddress); } else if (area == 0x83) { // 内存 addressStr = String.format("S7.M%d", startAddress); } else { response[offset] = (byte) 0x81; // 返回代码:参数错误 response[offset + 1] = 0x00; response[offset + 2] = 0x00; offset += 3; continue; } // 根据数据类型处理读取 if (wordLen == 0x04) { // 字节 response[offset] = 0x00; // 返回代码:成功 response[offset + 1] = 0x01; // 变量规范长度 response[offset + 2] = 0x04; // 数据类型:字节 response[offset + 3] = (byte) count; // 数据长度 for (int j = 0; j < count; j++) { String byteAddress = String.format("%s.%d", addressStr, j); int value = handler.handleReadRegister(byteAddress) & 0xFF; response[offset + 4 + j] = (byte) value; } offset += 4 + count; } else if (wordLen == 0x06) { // 字 response[offset] = 0x00; // 返回代码:成功 response[offset + 1] = 0x01; // 变量规范长度 response[offset + 2] = 0x06; // 数据类型:字 response[offset + 3] = (byte) (count * 2); // 数据长度 for (int j = 0; j < count; j++) { String wordAddress = String.format("%s.%d", addressStr, j); int value = handler.handleReadRegister(wordAddress); response[offset + 4 + j * 2] = (byte) (value >> 8); response[offset + 5 + j * 2] = (byte) value; } offset += 4 + count * 2; } else if (wordLen == 0x08) { // 双字 response[offset] = 0x00; // 返回代码:成功 response[offset + 1] = 0x01; // 变量规范长度 response[offset + 2] = 0x08; // 数据类型:双字 response[offset + 3] = (byte) (count * 4); // 数据长度 for (int j = 0; j < count; j++) { String dwordAddress = String.format("%s.%d", addressStr, j); int value = handler.handleReadRegister(dwordAddress); response[offset + 4 + j * 4] = (byte) (value >> 24); response[offset + 5 + j * 4] = (byte) (value >> 16); response[offset + 6 + j * 4] = (byte) (value >> 8); response[offset + 7 + j * 4] = (byte) value; } offset += 4 + count * 4; } else { response[offset] = (byte) 0x81; // 返回代码:参数错误 response[offset + 1] = 0x00; response[offset + 2] = 0x00; offset += 3; } } return response; } private byte[] handleS7WriteRequest(byte[] request, int length) { // 解析请求参数 int itemCount = request[22] & 0xFF; // 构建响应 byte[] response = new byte[24 + itemCount * 3]; // 复制通用头部 System.arraycopy(request, 0, response, 0, 12); // 设置响应特定字段 response[1] = 0x03; // ROSCTR = Ack_Data response[10] = 0x00; // 参数长度 response[11] = 0x00; response[12] = 0x05; // 功能码:写 // 设置数据部分 response[13] = 0x00; // 数据长度高字节 response[14] = (byte) (itemCount * 3); // 数据长度低字节 // 处理每个写入项 int offset = 16; for (int i = 0; i < itemCount; i++) { int area = request[offset + 2] & 0xFF; int dbNumber = ((request[offset + 3] & 0xFF) << 8) | (request[offset + 4] & 0xFF); int wordLen = request[offset + 5] & 0xFF; int startAddress = ((request[offset + 6] & 0xFF) << 16) | ((request[offset + 7] & 0xFF) << 8) | (request[offset + 8] & 0xFF); int count = ((request[offset + 9] & 0xFF) << 8) | (request[offset + 10] & 0xFF); int dataLength = request[offset + 11] & 0xFF; // 构建地址字符串 String addressStr; if (area == 0x84) { // DB addressStr = String.format("S7.DB%d.DBW%d", dbNumber, startAddress); } else if (area == 0x81) { // 输入 addressStr = String.format("S7.I%d", startAddress); } else if (area == 0x82) { // 输出 addressStr = String.format("S7.Q%d", startAddress); } else if (area == 0x83) { // 内存 addressStr = String.format("S7.M%d", startAddress); } else { response[offset] = (byte) 0x81; // 返回代码:参数错误 offset += 3; continue; } // 根据数据类型处理写入 if (wordLen == 0x04) { // 字节 int[] values = new int[count]; for (int j = 0; j < count; j++) { values[j] = request[offset + 12 + j] & 0xFF; String byteAddress = String.format("%s.%d", addressStr, j); handler.handleWriteRegister(byteAddress, values[j]); } response[offset] = 0x00; // 返回代码:成功 offset += 3; } else if (wordLen == 0x06) { // 字 int[] values = new int[count]; for (int j = 0; j < count; j++) { values[j] = ((request[offset + 12 + j * 2] & 0xFF) << 8) | (request[offset + 13 + j * 2] & 0xFF); String wordAddress = String.format("%s.%d", addressStr, j); handler.handleWriteRegister(wordAddress, values[j]); } response[offset] = 0x00; // 返回代码:成功 offset += 3; } else if (wordLen == 0x08) { // 双字 for (int j = 0; j < count; j++) { int value = ((request[offset + 12 + j * 4] & 0xFF) << 24) | ((request[offset + 13 + j * 4] & 0xFF) << 16) | ((request[offset + 14 + j * 4] & 0xFF) << 8) | (request[offset + 15 + j * 4] & 0xFF); String dwordAddress = String.format("%s.%d", addressStr, j); handler.handleWriteRegister(dwordAddress, value); } response[offset] = 0x00; // 返回代码:成功 offset += 3; } else { response[offset] = (byte) 0x81; // 返回代码:参数错误 offset += 3; } } return response; } private byte[] buildS7ErrorResponse(byte[] request, int errorCode) { byte[] response = new byte[24]; // 复制通用头部 System.arraycopy(request, 0, response, 0, 12); // 设置响应特定字段 response[1] = 0x03; // ROSCTR = Ack_Data response[10] = 0x00; // 参数长度 response[11] = 0x00; response[12] = request[12]; // 功能码 // 设置数据部分 response[13] = 0x00; // 数据长度高字节 response[14] = 0x03; // 数据长度低字节 response[15] = (byte) errorCode; // 返回代码 return response; } }