package com.mes.connect.Thread;
|
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONException;
|
import com.alibaba.fastjson.JSONObject;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.mes.common.JsonConversion;
|
import com.mes.common.ReadFile;
|
import com.mes.connect.IndustrialInterface.Api;
|
import com.mes.connect.IndustrialInterface.IndustrialClient;
|
import com.mes.connect.entity.*;
|
import com.mes.connect.modbus.ModbusIpClient;
|
import com.mes.connect.modbus.ModbusTcpClient;
|
import com.mes.connect.protocol.ProtocolType;
|
import com.mes.connect.s7.S7Client;
|
import com.mes.connect.s7.S7ClientOld;
|
import com.mes.model.entity.Machine;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.http.HttpMethod;
|
import org.springframework.stereotype.Service;
|
|
import java.io.BufferedReader;
|
import java.io.IOException;
|
import java.io.InputStream;
|
import java.io.InputStreamReader;
|
import java.util.*;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.stream.Collectors;
|
|
@Slf4j
|
public class MachineThread extends Thread {
|
//当前设备的参数
|
private Machine machine;
|
private ProtocolType protocolType;
|
private IndustrialClient client;
|
private PlcParameters plcParameters;
|
private LogicConfig logicConfig;
|
private Api api;
|
|
// 存储所有逻辑线程
|
private Map<String, Thread> logicThreads = new ConcurrentHashMap<>();
|
// 控制逻辑线程运行的标志
|
private Map<String, Boolean> logicRunningFlags = new ConcurrentHashMap<>();
|
// 主线程运行标志
|
private boolean running = true;
|
// 主线程执行间隔
|
private int mainThreadInterval = 1000;
|
|
public MachineThread(Machine machine, Api api) throws IOException {
|
this.machine = machine;
|
this.api = api;
|
this.logicConfig = JsonConversion.jsonToObjectByJackson(ReadFile.readJson(machine.getLogicFile()).toString(), LogicConfig.class);
|
this.plcParameters = JsonConversion.jsonToObjectByJackson(ReadFile.readJson(machine.getMachineFile()).toString(), PlcParameters.class);
|
this.logicConfig.getLogics();
|
this.plcParameters.Initialization();
|
for (LogicItem logicItem : logicConfig.getLogics()) {
|
if (logicItem.getLogicInterval()==0){
|
//默认1000毫秒
|
logicItem.setLogicInterval(1000);
|
}
|
for (Logic logic : logicItem.getLogic()) {
|
logic.setAddress(this.plcParameters.getMap().get(logic.getCodeId()).getAddress());
|
logic.setPlcDataType(this.plcParameters.getMap().get(logic.getCodeId()).getPlcDataType());
|
}
|
for (ReturnValue returnValue : logicItem.getReturnValue()) {
|
returnValue.setAddress(this.plcParameters.getMap().get(returnValue.getCodeId()).getAddress());
|
returnValue.setPlcDataType(this.plcParameters.getMap().get(returnValue.getCodeId()).getPlcDataType());
|
}
|
}
|
|
|
switch (machine.getProtocolType().getName()) {
|
case "ModbusTcp":
|
client = new ModbusTcpClient(machine.getIp(), machine.getPort(), 1);
|
break;
|
case "ModbusIp":
|
client = new ModbusIpClient(machine.getIp(), machine.getPort());
|
break;
|
case "S7":
|
client = new S7Client(machine.getIp(), machine.getPort(), 0, 1);
|
break;
|
case "S7Old":
|
client = new S7ClientOld(machine.getPlcType().getName(),machine.getIp(), machine.getPort(), 0, 1);;
|
break;
|
default:
|
log.error("无效的协议类型: {}", protocolType);
|
throw new IllegalArgumentException("无效的协议类型: " + protocolType);
|
}
|
}
|
|
@Override
|
public void run() {
|
// 主线程持续运行,定期读取PLC参数并监控连接状态
|
while (running) {
|
try {
|
// 检查PLC连接状态
|
if (!client.isConnected()) {
|
log.info("PLC尝试连接... 设备IP: {}", machine.getIp());
|
tryReconnect();
|
}
|
// 读取PLC参数,为逻辑处理提供最新数据
|
this.readPlcParameter();
|
plcParameters.Initialization();
|
// 检查逻辑线程状态,开启/重启 线程
|
checkLogicThreadsStatus();
|
// 等待下一个执行周期
|
Thread.sleep(mainThreadInterval);
|
} catch (InterruptedException e) {
|
log.info("主线程被中断,准备退出");
|
running = false;
|
Thread.currentThread().interrupt();
|
} catch (Exception e) {
|
log.error("执行过程中发生错误: {}", e.getMessage(), e);
|
// 尝试重新连接
|
tryReconnect();
|
// 等待一段时间再继续执行
|
try {
|
Thread.sleep(5000);
|
} catch (InterruptedException ie) {
|
log.info("主线程被中断,准备退出");
|
running = false;
|
Thread.currentThread().interrupt();
|
}
|
}
|
}
|
// 线程退出前关闭所有资源
|
shutdown();
|
log.info("MachineThread已退出,设备IP: {}", machine.getIp());
|
}
|
|
// 为指定逻辑项创建并启动子线程
|
private void startLogicThread(LogicItem logicItem) {
|
String logicId = this.machine.getId()+" - "+this.machine.getName()+" - "+ logicItem.getName(); // 假设每个LogicItem有唯一ID
|
if (logicThreads.containsKey(logicId) && logicThreads.get(logicId).isAlive()) {
|
return;
|
}
|
// 设置运行标志
|
logicRunningFlags.put(logicId, true);
|
// 创建并启动线程
|
Thread thread = new Thread(() -> {
|
log.info("逻辑项线程启动: {}", logicId);
|
// 逻辑项子线程持续运行的循环
|
while (logicRunningFlags.getOrDefault(logicId, false)) {
|
try {
|
// 执行实际业务逻辑
|
basicsLogic(logicItem);
|
// 根据逻辑项的执行频率设置等待时间,默认1000ms
|
Thread.sleep(logicItem.getLogicInterval());
|
} catch (InterruptedException e) {
|
log.info("逻辑项线程被中断,准备退出: {}", logicId);
|
logicRunningFlags.put(logicId, false);
|
Thread.currentThread().interrupt();
|
} catch (Exception e) {
|
log.error("执行逻辑项失败: {}, 错误: {}", logicId, e.getMessage(), e);
|
// 等待一段时间再继续执行
|
try {
|
Thread.sleep(5000);
|
} catch (InterruptedException ie) {
|
log.info("逻辑项线程被中断,准备退出: {}", logicId);
|
logicRunningFlags.put(logicId, false);
|
Thread.currentThread().interrupt();
|
}
|
}
|
}
|
});
|
// 设置线程名称
|
thread.setName(logicId);
|
// 存储线程引用
|
logicThreads.put(logicId, thread);
|
// 启动线程
|
thread.start();
|
//log.info("已启动逻辑项线程: {}", logicId);
|
}
|
|
// 检查逻辑线程状态,重启已终止的线程
|
private void checkLogicThreadsStatus() {
|
for (LogicItem logicItem : logicConfig.getLogics()) {
|
String logicId = this.machine.getId()+" - "+this.machine.getName()+" - "+ logicItem.getName();
|
Thread thread = logicThreads.get(logicId);
|
// 如果线程不存在或已终止且运行标志为true,则重启线程
|
if ((thread == null || !thread.isAlive()) &&!logicRunningFlags.getOrDefault(logicId, false)) {
|
startLogicThread(logicItem);
|
}
|
}
|
}
|
|
// 尝试重新连接PLC
|
private void tryReconnect() {
|
if (client != null) {
|
try {
|
client.disconnect();
|
Thread.sleep(2000);
|
client.connect();
|
boolean reconnected = client.isConnected();
|
if (reconnected) {
|
log.info("PLC重新连接成功: {}", machine.getIp());
|
} else {
|
log.error("PLC重新连接失败: {}", machine.getIp());
|
}
|
} catch (Exception e) {
|
log.error("重新连接PLC异常: {}", e.getMessage(), e);
|
}
|
}
|
}
|
|
// 关闭线程前的清理工作
|
public void shutdown() {
|
running = false;
|
// 停止所有逻辑线程
|
for (String logicId : logicRunningFlags.keySet()) {
|
logicRunningFlags.put(logicId, false);
|
Thread thread = logicThreads.get(logicId);
|
if (thread != null && thread.isAlive()) {
|
thread.interrupt();
|
}
|
}
|
// 等待所有逻辑线程结束
|
for (String logicId : logicThreads.keySet()) {
|
Thread thread = logicThreads.get(logicId);
|
if (thread != null && thread.isAlive()) {
|
try {
|
thread.join(1000); // 等待最多1秒
|
} catch (InterruptedException e) {
|
log.error("等待逻辑线程结束时被中断: {}", e.getMessage());
|
Thread.currentThread().interrupt();
|
}
|
}
|
}
|
|
// 清空线程映射
|
logicThreads.clear();
|
logicRunningFlags.clear();
|
|
// 关闭PLC连接
|
if (client != null) {
|
try {
|
client.disconnect();
|
log.info("PLC连接已关闭: {}", machine.getIp());
|
} catch (Exception e) {
|
log.error("关闭PLC连接异常: {}", e.getMessage(), e);
|
}
|
}
|
}
|
|
//示例 解读配置 根据接口返内容给PLC
|
public void basicsLogic(LogicItem logicItem) throws JSONException, IOException {
|
//1.读取PLC当前参数
|
try {
|
//遍历逻辑
|
boolean isEqual=true;
|
for (Logic logic:logicItem.getLogic()){
|
String plcValue=plcParameters.getMap().get(logic.getCodeId()).getReadValue().toString();
|
if (!logic.getValue().contains(plcValue)){
|
isEqual=false;
|
logic.setEquals(false);
|
logicItem.setEquals(false);
|
break;
|
}
|
}
|
if (isEqual){
|
log.info("满足条件:{}",logicItem.getName());
|
//3.查询此逻辑下需要返回给PLC的数据 result可接收 HTTP接口,视图,存储过程 等 如下
|
List<String> resultWrite=null;
|
if (logicItem.getApiConfigBefore()!=null){
|
ApiConfig apiConfig=logicItem.getApiConfigBefore();
|
resultWrite=api.callApi(apiConfig,plcParameters);
|
log.info("ApiConfigBefore 外置接口[{}:{}]返回的内容:{}",apiConfig.getType(),apiConfig.getAddress(),resultWrite);
|
}
|
//写入PLC返回值成功时调用接口
|
if (basicsResult(resultWrite,logicItem.getReturnValue())){
|
ApiConfig apiConfig=logicItem.getApiConfigAfter();
|
if (logicItem.getApiConfigAfter()!=null){
|
List<String> result=api.callApi(apiConfig,plcParameters);
|
log.info("ApiConfigAfter 外置接口[{}:{}]返回的内容:{}",apiConfig.getType(),apiConfig.getAddress(),result);
|
}
|
}
|
|
|
}
|
} catch (Exception e) {
|
log.error("执行basicsLogic失败: {}", e.getMessage(), e);
|
}
|
}
|
|
//传入需要发送的数据
|
public boolean basicsResult(List<String> sendData, List<ReturnValue> returnValue) throws JSONException, IOException {
|
for (ReturnValue itemReturnValue:returnValue){
|
//需要返回PLC的值
|
String values ="";
|
if (itemReturnValue.isFixed()){
|
//固定值
|
values= itemReturnValue.getValue();
|
}else{
|
//按传递的参数值
|
values= sendData.get(Integer.valueOf(itemReturnValue.getValue())-1);
|
}
|
if (values!=null && !values.equals("")){
|
//还需增添 不同类型调用不同方法
|
switch (itemReturnValue.getPlcDataType()) {
|
case "Word":
|
client.writeRegister(itemReturnValue.getAddress(),(int)Double.parseDouble(values));
|
break;
|
case "String":
|
client.writeString(itemReturnValue.getAddress(),values);
|
break;
|
case "Float":
|
client.writeFloat(itemReturnValue.getAddress(),Float.valueOf(values));
|
break;
|
case "bit":
|
client.writeBit(itemReturnValue.getAddress(),Boolean.valueOf(values));
|
break;
|
default:
|
log.error("不支持的数据类型: {}", itemReturnValue.getPlcDataType());
|
return false;
|
}
|
}
|
}
|
return true;
|
}
|
|
// 按照json文件读取plc内容
|
private PlcParameters readPlcParameter() throws IOException, JSONException {
|
List<Parameters> parametersList=plcParameters.getParameters();
|
for (int i=0;i<parametersList.size();i++){
|
//根据类型读取
|
switch (parametersList.get(i).getPlcDataType()) {
|
case "Word":
|
int value=client.readRegister(parametersList.get(i).getAddress());
|
parametersList.get(i).setReadValue(value);
|
break;
|
case "string":
|
//多寄存器 传递数量
|
String strValue = client.readString(parametersList.get(i).getAddress(), 2);
|
parametersList.get(i).setReadValue(strValue);
|
break;
|
case "bit":
|
//读线圈
|
boolean bitValue = client.readBit(parametersList.get(i).getAddress());
|
parametersList.get(i).setReadValue(bitValue);
|
break;
|
default:
|
log.error("不支持的数据类型: {}", parametersList.get(i).getPlcDataType());
|
return null;
|
}
|
}
|
plcParameters.setParameters(parametersList);
|
return plcParameters;
|
}
|
}
|