package com.mes.connect.Thread; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.databind.ObjectMapper; import com.mes.common.JsonConversion; import com.mes.common.ReadFile; import com.mes.connect.IndustrialInterface.Api; import com.mes.connect.IndustrialInterface.IndustrialClient; import com.mes.connect.entity.*; import com.mes.connect.modbus.ModbusIpClient; import com.mes.connect.modbus.ModbusTcpClient; import com.mes.connect.protocol.ProtocolType; import com.mes.connect.s7.S7Client; import com.mes.connect.s7.S7ClientOld; import com.mes.model.entity.Machine; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpMethod; import org.springframework.stereotype.Service; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @Slf4j public class MachineThread extends Thread { //当前设备的参数 private Machine machine; private ProtocolType protocolType; private IndustrialClient client; private PlcParameters plcParameters; private LogicConfig logicConfig; private Api api; // 存储所有逻辑线程 private Map logicThreads = new ConcurrentHashMap<>(); // 控制逻辑线程运行的标志 private Map logicRunningFlags = new ConcurrentHashMap<>(); // 主线程运行标志 private boolean running = true; // 主线程执行间隔 private int mainThreadInterval = 1000; public MachineThread(Machine machine, Api api) throws IOException { this.machine = machine; this.api = api; this.logicConfig = JsonConversion.jsonToObjectByJackson(ReadFile.readJson(machine.getLogicFile()).toString(), LogicConfig.class); this.plcParameters = JsonConversion.jsonToObjectByJackson(ReadFile.readJson(machine.getMachineFile()).toString(), PlcParameters.class); switch (machine.getProtocolType().getName()) { case "ModbusTcp": client = new ModbusTcpClient(machine.getIp(), machine.getPort(), 1); break; case "ModbusIp": client = new ModbusIpClient(machine.getIp(), machine.getPort()); break; case "S7": client = new S7Client(machine.getIp(), machine.getPort(), 0, 1); break; case "S7Old": client = new S7ClientOld(machine.getPlcType().getName(),machine.getIp(), machine.getPort(), 0, 1);; break; default: log.error("无效的协议类型: {}", protocolType); throw new IllegalArgumentException("无效的协议类型: " + protocolType); } if (client != null) { client.connect(); boolean connected = client.isConnected(); if (!connected) { log.error("连接PLC失败: {}", machine.getIp()); } } } @Override public void run() { log.info("MachineThread启动,设备IP: {}", machine.getIp()); if (client == null || !client.isConnected()) { log.error("PLC客户端未连接,线程退出"); return; } plcParameters.Initialization(); try { // 初始化读取PLC参数 readPlcParameter(); } catch (Exception e) { log.error("初始化读取PLC参数失败: {}", e.getMessage(), e); return; } // 为每个逻辑项创建并启动子线程 //startLogicThread(logicConfig.getLogics().get(0)); for (LogicItem logicItem : logicConfig.getLogics()) { startLogicThread(logicItem); //startLogicThread(logicConfig.getLogics().get(0)); } // 主线程持续运行,定期读取PLC参数并监控连接状态 while (running) { try { // 读取PLC参数,为逻辑处理提供最新数据 readPlcParameter(); // 检查PLC连接状态 if (!client.isConnected()) { log.warn("PLC连接断开,尝试重新连接"); tryReconnect(); } // 检查逻辑线程状态,重启已终止的线程 checkLogicThreadsStatus(); // 等待下一个执行周期 Thread.sleep(mainThreadInterval); } catch (InterruptedException e) { log.info("主线程被中断,准备退出"); running = false; Thread.currentThread().interrupt(); } catch (Exception e) { log.error("执行过程中发生错误: {}", e.getMessage(), e); // 尝试重新连接 tryReconnect(); // 等待一段时间再继续执行 try { Thread.sleep(5000); } catch (InterruptedException ie) { log.info("主线程被中断,准备退出"); running = false; Thread.currentThread().interrupt(); } } } // 线程退出前关闭所有资源 shutdown(); log.info("MachineThread已退出,设备IP: {}", machine.getIp()); } // 为指定逻辑项创建并启动子线程 private void startLogicThread(LogicItem logicItem) { String logicId = logicItem.getName(); // 假设每个LogicItem有唯一ID if (logicThreads.containsKey(logicId) && logicThreads.get(logicId).isAlive()) { log.warn("逻辑项线程已在运行: {}", logicId); return; } // 设置运行标志 logicRunningFlags.put(logicId, true); // 创建并启动线程 Thread thread = new Thread(() -> { log.info("逻辑项线程启动: {}", logicId); // 逻辑项子线程持续运行的循环 while (logicRunningFlags.getOrDefault(logicId, false)) { try { // 执行实际业务逻辑 basicsLogic(logicItem); // 根据逻辑项的执行频率设置等待时间,默认1000ms int logicInterval = 1000; Thread.sleep(logicInterval); } catch (InterruptedException e) { log.info("逻辑项线程被中断,准备退出: {}", logicId); logicRunningFlags.put(logicId, false); Thread.currentThread().interrupt(); } catch (Exception e) { log.error("执行逻辑项失败: {}, 错误: {}", logicId, e.getMessage(), e); // 等待一段时间再继续执行 try { Thread.sleep(5000); } catch (InterruptedException ie) { log.info("逻辑项线程被中断,准备退出: {}", logicId); logicRunningFlags.put(logicId, false); Thread.currentThread().interrupt(); } } } log.info("逻辑项线程已退出: {}", logicId); }); // 设置线程名称 thread.setName("Logic-" + logicId); // 存储线程引用 logicThreads.put(logicId, thread); // 启动线程 thread.start(); log.info("已启动逻辑项线程: {}", logicId); } // 检查逻辑线程状态,重启已终止的线程 private void checkLogicThreadsStatus() { for (LogicItem logicItem : logicConfig.getLogics()) { String logicId = logicItem.getName(); Thread thread = logicThreads.get(logicId); // 如果线程不存在或已终止且运行标志为true,则重启线程 if ((thread == null || !thread.isAlive()) && logicRunningFlags.getOrDefault(logicId, false)) { log.warn("逻辑项线程已终止,准备重启: {}", logicId); startLogicThread(logicItem); } } } // 尝试重新连接PLC private void tryReconnect() { if (client != null) { try { log.info("尝试重新连接PLC: {}", machine.getIp()); client.disconnect(); Thread.sleep(2000); boolean reconnected = client.isConnected(); if (reconnected) { log.info("PLC重新连接成功: {}", machine.getIp()); } else { log.error("PLC重新连接失败: {}", machine.getIp()); } } catch (Exception e) { log.error("重新连接PLC异常: {}", e.getMessage(), e); } } } // 关闭线程前的清理工作 public void shutdown() { running = false; // 停止所有逻辑线程 for (String logicId : logicRunningFlags.keySet()) { logicRunningFlags.put(logicId, false); Thread thread = logicThreads.get(logicId); if (thread != null && thread.isAlive()) { thread.interrupt(); } } // 等待所有逻辑线程结束 for (String logicId : logicThreads.keySet()) { Thread thread = logicThreads.get(logicId); if (thread != null && thread.isAlive()) { try { thread.join(1000); // 等待最多1秒 } catch (InterruptedException e) { log.error("等待逻辑线程结束时被中断: {}", e.getMessage()); Thread.currentThread().interrupt(); } } } // 清空线程映射 logicThreads.clear(); logicRunningFlags.clear(); // 关闭PLC连接 if (client != null) { try { client.disconnect(); log.info("PLC连接已关闭: {}", machine.getIp()); } catch (Exception e) { log.error("关闭PLC连接异常: {}", e.getMessage(), e); } } } //示例 解读配置 根据接口返内容给PLC public void basicsLogic(LogicItem logicItem) throws JSONException, IOException { //1.读取PLC当前参数 try { //遍历逻辑 boolean isEqual=true; for (Logic logic:logicItem.getLogic()){ String plcValue=plcParameters.getMap().get(logic.getCodeId()).getReadValue().toString(); if (!logic.getValue().contains(plcValue)){ isEqual=false; logic.setEquals(false); logicItem.setEquals(false); break; } } if (isEqual){ List result=new ArrayList<>(); //3.查询此逻辑下需要返回给PLC的数据 result可接收 HTTP接口,视图,存储过程 等 如下 try{ String connectType=logicItem.getConnectType(); String connectAddress=logicItem.getConnectAddress(); Map map=new HashMap(); switch (connectType) { case "Http": map.put("method","POST"); map.put("plcParameter",plcParameters); result= api.httpApi(connectAddress,map); log.info("接口返回内容:{}",result); break; case "View": // 视图/表 result= api.viewApi(connectAddress,map); break; case "Procedure": // 存储过程 //result= api.procedureAPI(connectAddress,plcParameter); break; default: log.warn("不支持的连接类型: {}", connectType); return; // 不支持的方式 } }catch (Exception e){ log.error("调用接口失败: {}", e.getMessage(), e); } //4.返回PLC内容 if (result != null&&!result.isEmpty()) { basicsResult(result,logicItem.getReturnValue()); } } } catch (Exception e) { log.error("执行basicsLogic失败: {}", e.getMessage(), e); } } //传入需要发送的数据 public boolean basicsResult(List sendData, List returnValue) throws JSONException, IOException { for (ReturnValue itemReturnValue:returnValue){ //需要返回PLC的值 String values =""; if (itemReturnValue.isFixed()){ //固定值 values= itemReturnValue.getValue(); }else{ //按传递的参数值 values= sendData.get(Integer.valueOf(itemReturnValue.getValue())-1); } if (values!=null && !values.equals("")){ //还需增添 不同类型调用不同方法 switch (itemReturnValue.getPlcDataType()) { case "int": client.writeRegister(itemReturnValue.getAddress(),(int)Double.parseDouble(values)); break; case "string": client.writeString(itemReturnValue.getAddress(),values); break; case "float": client.writeFloat(itemReturnValue.getAddress(),Float.valueOf(values)); break; default: log.error("不支持的数据类型: {}", itemReturnValue.getPlcDataType()); return false; } } } return true; } // 按照json文件读取plc内容 private PlcParameters readPlcParameter() throws IOException, JSONException { List parametersList=plcParameters.getParameters(); for (int i=0;i