package com.mes.connect.thread; import com.alibaba.fastjson.JSONException; import com.mes.common.JsonConversion; import com.mes.common.ReadFile; import com.mes.connect.industrialinterface.ApiImpl; import com.mes.connect.industrialinterface.IndustrialClient; import com.mes.connect.entity.*; import com.mes.connect.modbus.ModbusIpClient; import com.mes.connect.modbus.ModbusTcpClient; import com.mes.connect.protocol.ProtocolType; import com.mes.connect.s7.S7ClientOld; import com.mes.model.entity.Machine; import lombok.extern.slf4j.Slf4j; import org.apache.tomcat.util.threads.ThreadPoolExecutor; import javax.annotation.Resource; import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.*; /** * 设备线程 * * @author yzx * @version 1.0 */ @Slf4j public class MachineThread extends Thread { /** * 当前设备的参数 */ private Machine machine; private ProtocolType protocolType; private IndustrialClient client; private PlcParameters plcParameters; private LogicConfig logicConfig; @Resource private ApiImpl api; /** * 存储所有逻辑线程 */ private Map logicThreads = new ConcurrentHashMap<>(); /** * 控制逻辑线程运行的标志 */ private Map logicRunningFlags = new ConcurrentHashMap<>(); /** * 主线程运行标志 */ private boolean running = true; /** * 主线程执行间隔 */ private int mainThreadInterval = 1000; /** * 线程池服务 */ private final ExecutorService threadPool; /** * 时间 */ private int timeOut = 10; /** * @param machine * @param api * @throws IOException */ public MachineThread(Machine machine, ApiImpl api) throws IOException { this.machine = machine; this.api = api; this.logicConfig = JsonConversion.jsonToObjectByJackson(ReadFile.readJson(machine.getLogicFile()).toString(), LogicConfig.class); this.plcParameters = JsonConversion.jsonToObjectByJackson(ReadFile.readJson(machine.getMachineFile()).toString(), PlcParameters.class); this.logicConfig.getLogics(); this.plcParameters.initialization(); // 手动配置线程池参数 核心线程数,最大线程数,线程空闲时间,,有界队列,防止OOM,拒绝策略:调用者执行 threadPool = new ThreadPoolExecutor( 10, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy() ); for (LogicItem logicItem : logicConfig.getLogics()) { if (logicItem.getLogicInterval() == 0) { //默认1000毫秒 logicItem.setLogicInterval(1000); } for (Logic logic : logicItem.getLogic()) { logic.setAddress(this.plcParameters.getMap().get(logic.getCodeId()).getAddress()); logic.setPlcDataType(this.plcParameters.getMap().get(logic.getCodeId()).getPlcDataType()); } for (ReturnValue returnValue : logicItem.getReturnValue()) { returnValue.setAddress(this.plcParameters.getMap().get(returnValue.getCodeId()).getAddress()); returnValue.setPlcDataType(this.plcParameters.getMap().get(returnValue.getCodeId()).getPlcDataType()); } } switch (machine.getProtocolType().getName()) { case "ModbusTcp": client = new ModbusTcpClient(machine.getIp(), machine.getPort(), 1); break; case "ModbusIp": client = new ModbusIpClient(machine.getIp(), machine.getPort()); break; case "S7": break; case "S7Old": client = new S7ClientOld(machine.getPlcType().getName(), machine.getIp(), machine.getPort(), 0, 1); ; break; default: log.error("无效的协议类型: {}", protocolType); throw new IllegalArgumentException("无效的协议类型: " + protocolType); } } public void executeTask(Runnable task) { threadPool.execute(task); } /** * 应用关闭时调用 */ public void threadPoolShutdown() { threadPool.shutdown(); try { if (!threadPool.awaitTermination(timeOut, TimeUnit.SECONDS)) { threadPool.shutdownNow(); } } catch (InterruptedException e) { threadPool.shutdownNow(); } } /** * 运行入口 */ @Override public void run() { // 主线程持续运行,定期读取PLC参数并监控连接状态 while (running) { try { // 检查PLC连接状态 if (!client.isConnected()) { log.info("PLC尝试连接... 设备IP: {}", machine.getIp()); tryReconnect(); } // 读取PLC参数,为逻辑处理提供最新数据 this.readPlcParameter(); plcParameters.initialization(); // 检查逻辑线程状态,开启/重启 线程 checkLogicThreadsStatus(); // 等待下一个执行周期 Thread.sleep(mainThreadInterval); } catch (InterruptedException e) { log.info("主线程被中断,准备退出"); running = false; Thread.currentThread().interrupt(); } catch (Exception e) { log.error("执行过程中发生错误: {}", e.getMessage(), e); // 尝试重新连接 tryReconnect(); // 等待一段时间再继续执行 try { Thread.sleep(5000); } catch (InterruptedException ie) { log.info("主线程被中断,准备退出"); running = false; Thread.currentThread().interrupt(); } } } // 线程退出前关闭所有资源 shutdown(); log.info("MachineThread已退出,设备IP: {}", machine.getIp()); } /** * 为指定逻辑项创建并启动子线程 * * @param logicItem */ private void startLogicThread(LogicItem logicItem) { String logicId = this.machine.getId() + " - " + this.machine.getName() + " - " + logicItem.getName(); // 假设每个LogicItem有唯一ID if (logicThreads.containsKey(logicId) && logicThreads.get(logicId).isAlive()) { return; } // 设置运行标志 logicRunningFlags.put(logicId, true); // 创建并启动线程 executeTask(() -> { System.out.println("任务在线程池中执行: " + Thread.currentThread().getName()); log.info("逻辑项线程启动: {}", logicId); // 逻辑项子线程持续运行的循环 while (logicRunningFlags.getOrDefault(logicId, false)) { try { // 执行实际业务逻辑 basicsLogic(logicItem); // 根据逻辑项的执行频率设置等待时间,默认1000ms Thread.sleep(logicItem.getLogicInterval()); } catch (InterruptedException e) { log.info("逻辑项线程被中断,准备退出: {}", logicId); logicRunningFlags.put(logicId, false); Thread.currentThread().interrupt(); } catch (Exception e) { log.error("执行逻辑项失败: {}, 错误: {}", logicId, e.getMessage(), e); // 等待一段时间再继续执行 try { Thread.sleep(5000); } catch (InterruptedException ie) { log.info("逻辑项线程被中断,准备退出: {}", logicId); logicRunningFlags.put(logicId, false); Thread.currentThread().interrupt(); } } } }); // // 设置线程名称 // thread.setName(logicId); // // 存储线程引用 // logicThreads.put(logicId, thread); // // 启动线程 // thread.start(); //log.info("已启动逻辑项线程: {}", logicId); } /** * 检查逻辑线程状态,重启已终止的线程 */ private void checkLogicThreadsStatus() { for (LogicItem logicItem : logicConfig.getLogics()) { String logicId = this.machine.getId() + " - " + this.machine.getName() + " - " + logicItem.getName(); Thread thread = logicThreads.get(logicId); boolean isFlags = logicRunningFlags.getOrDefault(logicId, false); // 如果线程不存在或已终止且运行标志为true,则重启线程 boolean result = (thread == null || !thread.isAlive()) && !isFlags; if (result) { startLogicThread(logicItem); } } } /** * 尝试重新连接PLC */ private void tryReconnect() { if (client != null) { try { client.disconnect(); Thread.sleep(2000); client.connect(); boolean reconnected = client.isConnected(); if (reconnected) { log.info("PLC重新连接成功: {}", machine.getIp()); } else { log.error("PLC重新连接失败: {}", machine.getIp()); } } catch (Exception e) { log.error("重新连接PLC异常: {}", e.getMessage(), e); } } } /** * 关闭线程前的清理工作 */ public void shutdown() { running = false; // 停止所有逻辑线程 for (String logicId : logicRunningFlags.keySet()) { logicRunningFlags.put(logicId, false); Thread thread = logicThreads.get(logicId); if (thread != null && thread.isAlive()) { thread.interrupt(); } } // 等待所有逻辑线程结束 for (String logicId : logicThreads.keySet()) { Thread thread = logicThreads.get(logicId); if (thread != null && thread.isAlive()) { try { // 等待最多1秒 thread.join(1000); } catch (InterruptedException e) { log.error("等待逻辑线程结束时被中断: {}", e.getMessage()); Thread.currentThread().interrupt(); } } } // 清空线程映射 logicThreads.clear(); logicRunningFlags.clear(); // 关闭PLC连接 if (client != null) { try { client.disconnect(); log.info("PLC连接已关闭: {}", machine.getIp()); } catch (Exception e) { log.error("关闭PLC连接异常: {}", e.getMessage(), e); } } } /** * 示例 解读配置 根据接口返内容给PLC * * @param logicItem * @throws JSONException * @throws IOException */ public void basicsLogic(LogicItem logicItem) throws JSONException, IOException { //1.读取PLC当前参数 try { //遍历逻辑 boolean isEqual = true; for (Logic logic : logicItem.getLogic()) { String plcValue = plcParameters.getMap().get(logic.getCodeId()).getReadValue().toString(); if (!logic.getValue().contains(plcValue)) { isEqual = false; logic.setEquals(false); logicItem.setEquals(false); break; } } if (isEqual) { log.info("满足条件:{}", logicItem.getName()); //3.查询此逻辑下需要返回给PLC的数据 result可接收 HTTP接口,视图,存储过程 等 如下 List resultWrite = null; if (logicItem.getApiConfigBefore() != null) { ApiConfig apiConfig = logicItem.getApiConfigBefore(); resultWrite = api.callApi(apiConfig, plcParameters); log.info("ApiConfigBefore 外置接口[{}:{}]返回的内容:{}", apiConfig.getType(), apiConfig.getAddress(), resultWrite); } //写入PLC返回值成功时调用接口 if (basicsResult(resultWrite, logicItem.getReturnValue())) { ApiConfig apiConfig = logicItem.getApiConfigAfter(); if (logicItem.getApiConfigAfter() != null) { List result = api.callApi(apiConfig, plcParameters); log.info("ApiConfigAfter 外置接口[{}:{}]返回的内容:{}", apiConfig.getType(), apiConfig.getAddress(), result); } } } } catch (Exception e) { log.error("执行basicsLogic失败: {}", e.getMessage(), e); } } /** * 传入需要发送的数据 * * @param sendData * @param returnValue * @return * @throws JSONException * @throws IOException */ public boolean basicsResult(List sendData, List returnValue) throws JSONException, IOException { for (ReturnValue itemReturnValue : returnValue) { //需要返回PLC的值 String values = ""; if (itemReturnValue.isFixed()) { //固定值 values = itemReturnValue.getValue(); } else { //按传递的参数值 values = sendData.get(Integer.valueOf(itemReturnValue.getValue()) - 1); } if (values != null && !"".equals(values)) { //还需增添 不同类型调用不同方法 switch (itemReturnValue.getPlcDataType()) { case "Word": client.writeRegister(itemReturnValue.getAddress(), (int) Double.parseDouble(values)); break; case "String": client.writeString(itemReturnValue.getAddress(), values); break; case "Float": client.writeFloat(itemReturnValue.getAddress(), Float.valueOf(values)); break; case "bit": client.writeBit(itemReturnValue.getAddress(), Boolean.valueOf(values)); break; default: log.error("不支持的数据类型: {}", itemReturnValue.getPlcDataType()); return false; } } } return true; } /** * 按照json文件读取plc内容 * * @return * @throws IOException * @throws JSONException */ private PlcParameters readPlcParameter() throws IOException, JSONException { List parametersList = plcParameters.getParameters(); for (int i = 0; i < parametersList.size(); i++) { //根据类型读取 switch (parametersList.get(i).getPlcDataType()) { case "Word": int value = client.readRegister(parametersList.get(i).getAddress()); parametersList.get(i).setReadValue(value); break; case "string": //多寄存器 传递数量 String strValue = client.readString(parametersList.get(i).getAddress(), 2); parametersList.get(i).setReadValue(strValue); break; case "bit": //读线圈 boolean bitValue = client.readBit(parametersList.get(i).getAddress()); parametersList.get(i).setReadValue(bitValue); break; default: log.error("不支持的数据类型: {}", parametersList.get(i).getPlcDataType()); return null; } } plcParameters.setParameters(parametersList); return plcParameters; } }