package com.mes.connect.s7;
|
|
import com.mes.connect.IndustrialInterface.AddressParser;
|
import com.mes.connect.IndustrialInterface.IndustrialDataHandler;
|
import com.mes.connect.IndustrialInterface.IndustrialServer;
|
import com.mes.connect.addressParser.S7AddressParser;
|
|
import java.io.DataInputStream;
|
import java.io.DataOutputStream;
|
import java.io.IOException;
|
import java.net.ServerSocket;
|
import java.net.Socket;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.logging.Logger;
|
|
/**
|
* S7协议服务器实现(简化版)
|
*/
|
public class S7Server implements IndustrialServer {
|
private static final Logger logger = Logger.getLogger(S7Server.class.getName());
|
private ServerSocket serverSocket;
|
private Thread serverThread;
|
private boolean running;
|
private IndustrialDataHandler handler;
|
private final int port;
|
private final ExecutorService threadPool = Executors.newFixedThreadPool(10);
|
private final AddressParser addressParser = new S7AddressParser();
|
|
public S7Server(int port) {
|
this.port = port;
|
}
|
|
@Override
|
public synchronized void start() throws IOException {
|
if (!running) {
|
serverSocket = new ServerSocket(port);
|
serverThread = new Thread(this::serverLoop);
|
serverThread.setDaemon(true);
|
serverThread.start();
|
running = true;
|
logger.info("S7 server started on port " + port);
|
}
|
}
|
|
@Override
|
public synchronized void stop() {
|
if (running) {
|
running = false;
|
try {
|
if (serverSocket != null) {
|
serverSocket.close();
|
}
|
if (serverThread != null) {
|
serverThread.interrupt();
|
serverThread.join(1000);
|
}
|
threadPool.shutdownNow();
|
} catch (IOException | InterruptedException e) {
|
logger.warning("Error stopping S7 server: " + e.getMessage());
|
} finally {
|
serverSocket = null;
|
serverThread = null;
|
logger.info("S7 server stopped");
|
}
|
}
|
}
|
|
@Override
|
public boolean isRunning() {
|
return running;
|
}
|
|
@Override
|
public void setDataHandler(IndustrialDataHandler handler) {
|
this.handler = handler;
|
}
|
|
@Override
|
public void close() throws IOException {
|
stop();
|
}
|
|
private void serverLoop() {
|
while (running) {
|
try {
|
Socket clientSocket = serverSocket.accept();
|
threadPool.submit(() -> handleClient(clientSocket));
|
} catch (IOException e) {
|
if (running) {
|
logger.warning("Error accepting client connection: " + e.getMessage());
|
}
|
}
|
}
|
}
|
|
private void handleClient(Socket clientSocket) {
|
try (DataInputStream input = new DataInputStream(clientSocket.getInputStream());
|
DataOutputStream output = new DataOutputStream(clientSocket.getOutputStream())) {
|
|
// S7通信流程:COTP连接建立 -> ISO连接建立 -> S7协议数据交换
|
|
// 1. 处理COTP连接请求
|
if (!handleCotpConnection(input, output)) {
|
return;
|
}
|
|
// 2. 处理ISO连接请求
|
if (!handleIsoConnection(input, output)) {
|
return;
|
}
|
|
// 3. 处理S7协议数据
|
handleS7Protocol(input, output);
|
|
} catch (IOException e) {
|
logger.warning("Error handling client: " + e.getMessage());
|
}
|
}
|
|
private boolean handleCotpConnection(DataInputStream input, DataOutputStream output) throws IOException {
|
// 读取COTP连接请求
|
byte[] buffer = new byte[256];
|
int bytesRead = input.read(buffer);
|
|
// 简单验证COTP连接请求
|
if (bytesRead < 6 || buffer[0] != 0x02 || buffer[1] != 0xF0 || buffer[2] != 0x80) {
|
logger.warning("Invalid COTP connection request");
|
return false;
|
}
|
|
// 构建COTP连接确认
|
byte[] response = new byte[6];
|
response[0] = 0x02; // Length
|
response[1] = (byte) 0xF0; // COTP TPDU type: DT
|
response[2] = (byte) 0xD0; // TPDU number
|
response[3] = 0x00; // Parameters
|
response[4] = 0x00;
|
response[5] = 0x00;
|
|
output.write(response);
|
return true;
|
}
|
|
private boolean handleIsoConnection(DataInputStream input, DataOutputStream output) throws IOException {
|
// 读取ISO连接请求
|
byte[] buffer = new byte[256];
|
int bytesRead = input.read(buffer);
|
|
// 简单验证ISO连接请求
|
if (bytesRead < 10 || buffer[0] != 0x03 || buffer[1] != 0x00) {
|
logger.warning("Invalid ISO connection request");
|
return false;
|
}
|
|
// 构建ISO连接确认
|
byte[] response = new byte[10];
|
response[0] = 0x03; // Protocol ID
|
response[1] = 0x00;
|
response[2] = 0x00;
|
response[3] = 0x0A; // Length
|
response[4] = 0x02; // COTP header length
|
response[5] = (byte) 0xF0; // COTP TPDU type: CC
|
response[6] = (byte) 0x81; // Parameters
|
response[7] = 0x00;
|
response[8] = 0x00;
|
response[9] = 0x00;
|
|
output.write(response);
|
return true;
|
}
|
|
private void handleS7Protocol(DataInputStream input, DataOutputStream output) throws IOException {
|
byte[] buffer = new byte[1024];
|
|
while (true) {
|
int bytesRead = input.read(buffer);
|
if (bytesRead <= 0) {
|
break;
|
}
|
|
// 解析S7请求
|
byte[] response = processS7Request(buffer, bytesRead);
|
if (response != null) {
|
output.write(response);
|
}
|
}
|
}
|
|
private byte[] processS7Request(byte[] request, int length) {
|
// 解析S7请求PDU
|
int protocolId = request[0] & 0xFF;
|
int rosctr = request[1] & 0xFF; // 1 = Job, 3 = Ack_Data
|
|
if (protocolId != 0x32 || rosctr != 1) {
|
logger.warning("Unsupported S7 request type");
|
return null;
|
}
|
|
int function = request[12] & 0xFF;
|
|
try {
|
switch (function) {
|
case 0x04: // 读请求
|
return handleS7ReadRequest(request, length);
|
case 0x05: // 写请求
|
return handleS7WriteRequest(request, length);
|
default:
|
logger.warning("Unsupported S7 function: " + function);
|
return buildS7ErrorResponse(request, 0x01); // 功能不支持
|
}
|
} catch (Exception e) {
|
logger.warning("Error processing S7 request: " + e.getMessage());
|
return buildS7ErrorResponse(request, 0x03); // 处理错误
|
}
|
}
|
|
private byte[] handleS7ReadRequest(byte[] request, int length) {
|
// 解析请求参数
|
int itemCount = request[22] & 0xFF;
|
|
// 构建响应
|
int responseLength = 24 + itemCount * 12;
|
byte[] response = new byte[responseLength];
|
|
// 复制通用头部
|
System.arraycopy(request, 0, response, 0, 12);
|
|
// 设置响应特定字段
|
response[1] = 0x03; // ROSCTR = Ack_Data
|
response[10] = 0x00; // 参数长度
|
response[11] = 0x00;
|
response[12] = 0x04; // 功能码:读
|
|
// 设置数据部分
|
response[13] = 0x00; // 数据长度高字节
|
response[14] = (byte) (responseLength - 16); // 数据长度低字节
|
response[15] = 0x00; // 返回代码:成功
|
|
// 处理每个读取项
|
int offset = 16;
|
for (int i = 0; i < itemCount; i++) {
|
int area = request[offset + 2] & 0xFF;
|
int dbNumber = ((request[offset + 3] & 0xFF) << 8) | (request[offset + 4] & 0xFF);
|
int wordLen = request[offset + 5] & 0xFF;
|
int startAddress = ((request[offset + 6] & 0xFF) << 16) |
|
((request[offset + 7] & 0xFF) << 8) |
|
(request[offset + 8] & 0xFF);
|
int count = ((request[offset + 9] & 0xFF) << 8) | (request[offset + 10] & 0xFF);
|
|
// 构建地址字符串
|
String addressStr;
|
if (area == 0x84) { // DB
|
addressStr = String.format("S7.DB%d.DBW%d", dbNumber, startAddress);
|
} else if (area == 0x81) { // 输入
|
addressStr = String.format("S7.I%d", startAddress);
|
} else if (area == 0x82) { // 输出
|
addressStr = String.format("S7.Q%d", startAddress);
|
} else if (area == 0x83) { // 内存
|
addressStr = String.format("S7.M%d", startAddress);
|
} else {
|
response[offset] = (byte) 0x81; // 返回代码:参数错误
|
response[offset + 1] = 0x00;
|
response[offset + 2] = 0x00;
|
offset += 3;
|
continue;
|
}
|
|
// 根据数据类型处理读取
|
if (wordLen == 0x04) { // 字节
|
response[offset] = 0x00; // 返回代码:成功
|
response[offset + 1] = 0x01; // 变量规范长度
|
response[offset + 2] = 0x04; // 数据类型:字节
|
response[offset + 3] = (byte) count; // 数据长度
|
|
for (int j = 0; j < count; j++) {
|
String byteAddress = String.format("%s.%d", addressStr, j);
|
int value = handler.handleReadRegister(byteAddress) & 0xFF;
|
response[offset + 4 + j] = (byte) value;
|
}
|
|
offset += 4 + count;
|
} else if (wordLen == 0x06) { // 字
|
response[offset] = 0x00; // 返回代码:成功
|
response[offset + 1] = 0x01; // 变量规范长度
|
response[offset + 2] = 0x06; // 数据类型:字
|
response[offset + 3] = (byte) (count * 2); // 数据长度
|
|
for (int j = 0; j < count; j++) {
|
String wordAddress = String.format("%s.%d", addressStr, j);
|
int value = handler.handleReadRegister(wordAddress);
|
response[offset + 4 + j * 2] = (byte) (value >> 8);
|
response[offset + 5 + j * 2] = (byte) value;
|
}
|
|
offset += 4 + count * 2;
|
} else if (wordLen == 0x08) { // 双字
|
response[offset] = 0x00; // 返回代码:成功
|
response[offset + 1] = 0x01; // 变量规范长度
|
response[offset + 2] = 0x08; // 数据类型:双字
|
response[offset + 3] = (byte) (count * 4); // 数据长度
|
|
for (int j = 0; j < count; j++) {
|
String dwordAddress = String.format("%s.%d", addressStr, j);
|
int value = handler.handleReadRegister(dwordAddress);
|
response[offset + 4 + j * 4] = (byte) (value >> 24);
|
response[offset + 5 + j * 4] = (byte) (value >> 16);
|
response[offset + 6 + j * 4] = (byte) (value >> 8);
|
response[offset + 7 + j * 4] = (byte) value;
|
}
|
|
offset += 4 + count * 4;
|
} else {
|
response[offset] = (byte) 0x81; // 返回代码:参数错误
|
response[offset + 1] = 0x00;
|
response[offset + 2] = 0x00;
|
offset += 3;
|
}
|
}
|
|
return response;
|
}
|
|
private byte[] handleS7WriteRequest(byte[] request, int length) {
|
// 解析请求参数
|
int itemCount = request[22] & 0xFF;
|
|
// 构建响应
|
byte[] response = new byte[24 + itemCount * 3];
|
|
// 复制通用头部
|
System.arraycopy(request, 0, response, 0, 12);
|
|
// 设置响应特定字段
|
response[1] = 0x03; // ROSCTR = Ack_Data
|
response[10] = 0x00; // 参数长度
|
response[11] = 0x00;
|
response[12] = 0x05; // 功能码:写
|
|
// 设置数据部分
|
response[13] = 0x00; // 数据长度高字节
|
response[14] = (byte) (itemCount * 3); // 数据长度低字节
|
|
// 处理每个写入项
|
int offset = 16;
|
for (int i = 0; i < itemCount; i++) {
|
int area = request[offset + 2] & 0xFF;
|
int dbNumber = ((request[offset + 3] & 0xFF) << 8) | (request[offset + 4] & 0xFF);
|
int wordLen = request[offset + 5] & 0xFF;
|
int startAddress = ((request[offset + 6] & 0xFF) << 16) |
|
((request[offset + 7] & 0xFF) << 8) |
|
(request[offset + 8] & 0xFF);
|
int count = ((request[offset + 9] & 0xFF) << 8) | (request[offset + 10] & 0xFF);
|
int dataLength = request[offset + 11] & 0xFF;
|
|
// 构建地址字符串
|
String addressStr;
|
if (area == 0x84) { // DB
|
addressStr = String.format("S7.DB%d.DBW%d", dbNumber, startAddress);
|
} else if (area == 0x81) { // 输入
|
addressStr = String.format("S7.I%d", startAddress);
|
} else if (area == 0x82) { // 输出
|
addressStr = String.format("S7.Q%d", startAddress);
|
} else if (area == 0x83) { // 内存
|
addressStr = String.format("S7.M%d", startAddress);
|
} else {
|
response[offset] = (byte) 0x81; // 返回代码:参数错误
|
offset += 3;
|
continue;
|
}
|
|
// 根据数据类型处理写入
|
if (wordLen == 0x04) { // 字节
|
int[] values = new int[count];
|
for (int j = 0; j < count; j++) {
|
values[j] = request[offset + 12 + j] & 0xFF;
|
String byteAddress = String.format("%s.%d", addressStr, j);
|
handler.handleWriteRegister(byteAddress, values[j]);
|
}
|
|
response[offset] = 0x00; // 返回代码:成功
|
offset += 3;
|
} else if (wordLen == 0x06) { // 字
|
int[] values = new int[count];
|
for (int j = 0; j < count; j++) {
|
values[j] = ((request[offset + 12 + j * 2] & 0xFF) << 8) |
|
(request[offset + 13 + j * 2] & 0xFF);
|
String wordAddress = String.format("%s.%d", addressStr, j);
|
handler.handleWriteRegister(wordAddress, values[j]);
|
}
|
|
response[offset] = 0x00; // 返回代码:成功
|
offset += 3;
|
} else if (wordLen == 0x08) { // 双字
|
for (int j = 0; j < count; j++) {
|
int value = ((request[offset + 12 + j * 4] & 0xFF) << 24) |
|
((request[offset + 13 + j * 4] & 0xFF) << 16) |
|
((request[offset + 14 + j * 4] & 0xFF) << 8) |
|
(request[offset + 15 + j * 4] & 0xFF);
|
String dwordAddress = String.format("%s.%d", addressStr, j);
|
handler.handleWriteRegister(dwordAddress, value);
|
}
|
|
response[offset] = 0x00; // 返回代码:成功
|
offset += 3;
|
} else {
|
response[offset] = (byte) 0x81; // 返回代码:参数错误
|
offset += 3;
|
}
|
}
|
|
return response;
|
}
|
|
private byte[] buildS7ErrorResponse(byte[] request, int errorCode) {
|
byte[] response = new byte[24];
|
|
// 复制通用头部
|
System.arraycopy(request, 0, response, 0, 12);
|
|
// 设置响应特定字段
|
response[1] = 0x03; // ROSCTR = Ack_Data
|
response[10] = 0x00; // 参数长度
|
response[11] = 0x00;
|
response[12] = request[12]; // 功能码
|
|
// 设置数据部分
|
response[13] = 0x00; // 数据长度高字节
|
response[14] = 0x03; // 数据长度低字节
|
response[15] = (byte) errorCode; // 返回代码
|
|
return response;
|
}
|
}
|