package com.mes.interaction.workstation.transfer.handler; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.mes.device.entity.DeviceConfig; import com.mes.device.entity.GlassInfo; import com.mes.device.mapper.DeviceGlassInfoMapper; import com.mes.device.service.DevicePlcOperationService; import com.mes.device.service.GlassInfoService; import com.mes.device.vo.DevicePlcVO; import com.mes.interaction.workstation.base.WorkstationBaseHandler; import com.mes.interaction.workstation.config.WorkstationLogicConfig; import com.mes.s7.enhanced.EnhancedS7Serializer; import com.mes.s7.provider.S7SerializerProvider; import com.mes.service.PlcDynamicDataService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.PreDestroy; import java.time.LocalDateTime; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; /** * 卧转立主体设备逻辑处理器 * 负责玻璃缓冲、容量校验、批次组装、PLC写入等逻辑 */ @Slf4j @Component public class HorizontalTransferLogicHandler extends WorkstationBaseHandler { private final PlcDynamicDataService plcDynamicDataService; private final GlassInfoService glassInfoService; private final S7SerializerProvider s7SerializerProvider; @Autowired(required = false) private DeviceGlassInfoMapper glassInfoMapper; // 玻璃缓冲队列:deviceId -> 玻璃信息列表 private final Map> glassBuffer = new ConcurrentHashMap<>(); // 最后扫码时间:deviceId -> 最后扫码时间戳 private final Map lastScanTime = new ConcurrentHashMap<>(); // 监控任务:deviceId -> 监控任务 private final Map> monitorTasks = new ConcurrentHashMap<>(); // 监控线程池 private final ScheduledExecutorService monitorExecutor = Executors.newScheduledThreadPool(5, r -> { Thread t = new Thread(r, "HorizontalTransferMonitor"); t.setDaemon(true); return t; }); @Autowired public HorizontalTransferLogicHandler(DevicePlcOperationService devicePlcOperationService, PlcDynamicDataService plcDynamicDataService, @Qualifier("deviceGlassInfoService") GlassInfoService glassInfoService, S7SerializerProvider s7SerializerProvider) { super(devicePlcOperationService); this.plcDynamicDataService = plcDynamicDataService; this.glassInfoService = glassInfoService; this.s7SerializerProvider = s7SerializerProvider; } @Override public String getDeviceType() { return DeviceConfig.DeviceType.WORKSTATION_TRANSFER; } @Override protected DevicePlcVO.OperationResult doExecute(DeviceConfig deviceConfig, String operation, Map params, Map logicParams) { WorkstationLogicConfig config = parseWorkstationConfig(logicParams); try { switch (operation) { case "checkAndProcess": case "process": return handleCheckAndProcess(deviceConfig, config, logicParams); case "startMonitor": return handleStartMonitor(deviceConfig, config, logicParams); case "stopMonitor": return handleStopMonitor(deviceConfig); case "clearBuffer": return handleClearBuffer(deviceConfig); default: return buildResult(deviceConfig, operation, false, "不支持的操作: " + operation); } } catch (Exception e) { log.error("卧转立主体处理异常: deviceId={}, operation={}", deviceConfig.getId(), operation, e); return buildResult(deviceConfig, operation, false, "处理异常: " + e.getMessage()); } } /** * 检查并处理玻璃批次 * 从数据库读取最近扫码的玻璃,进行容量判断,组装批次,写入PLC */ private DevicePlcVO.OperationResult handleCheckAndProcess( DeviceConfig deviceConfig, WorkstationLogicConfig config, Map logicParams) { String deviceId = deviceConfig.getDeviceId(); EnhancedS7Serializer serializer = s7SerializerProvider.getSerializer(deviceConfig); if (serializer == null) { return buildResult(deviceConfig, "checkAndProcess", false, "获取PLC序列化器失败"); } try { // 1. 从数据库查询最近扫码的玻璃信息(最近1分钟内的记录) List recentGlasses = queryRecentScannedGlasses(deviceConfig, logicParams); if (recentGlasses.isEmpty()) { return buildResult(deviceConfig, "checkAndProcess", true, "暂无待处理的玻璃信息"); } log.info("查询到最近扫码的玻璃: deviceId={}, count={}", deviceId, recentGlasses.size()); // 2. 更新缓冲队列和最后扫码时间 updateBuffer(deviceId, recentGlasses); lastScanTime.put(deviceId, new AtomicLong(System.currentTimeMillis())); // 3. 检查是否需要立即处理(容量已满或30s内无新玻璃) List buffer = glassBuffer.get(deviceId); if (buffer == null || buffer.isEmpty()) { return buildResult(deviceConfig, "checkAndProcess", true, "缓冲队列为空"); } // 4. 判断是否满足处理条件 boolean shouldProcess = shouldProcessBatch(deviceId, buffer, config); if (!shouldProcess) { return buildResult(deviceConfig, "checkAndProcess", true, "等待更多玻璃或30s超时"); } // 5. 容量判断和批次组装 List batch = assembleBatch(buffer, config.getVehicleCapacity()); if (batch.isEmpty()) { return buildResult(deviceConfig, "checkAndProcess", false, "无法组装有效批次(容量不足)"); } // 6. 写入PLC DevicePlcVO.OperationResult writeResult = writeBatchToPlc( deviceConfig, batch, serializer, logicParams); if (!Boolean.TRUE.equals(writeResult.getSuccess())) { return writeResult; } // 7. 从缓冲队列中移除已处理的玻璃 removeProcessedGlasses(deviceId, batch); String msg = String.format("批次已写入PLC: glassCount=%d, glassIds=%s", batch.size(), batch.stream().map(GlassInfo::getGlassId).collect(Collectors.joining(","))); return buildResult(deviceConfig, "checkAndProcess", true, msg); } catch (Exception e) { log.error("检查并处理玻璃批次异常: deviceId={}", deviceId, e); return buildResult(deviceConfig, "checkAndProcess", false, "处理异常: " + e.getMessage()); } } /** * 查询最近扫码的玻璃信息 */ private List queryRecentScannedGlasses( DeviceConfig deviceConfig, Map logicParams) { if (glassInfoMapper == null) { log.warn("GlassInfoMapper未注入,无法查询最近扫码的玻璃"); return Collections.emptyList(); } try { // 从配置中获取workLine,用于过滤 String workLine = getLogicParam(logicParams, "workLine", null); // 查询最近2分钟内的玻璃记录(扩大时间窗口,确保不遗漏) Date twoMinutesAgo = new Date(System.currentTimeMillis() - 120000); LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); wrapper.eq(GlassInfo::getStatus, GlassInfo.Status.ACTIVE) .ge(GlassInfo::getCreatedTime, twoMinutesAgo) .orderByDesc(GlassInfo::getCreatedTime) .last("LIMIT 20"); // 限制查询数量,避免过多 // 如果配置了workLine,则过滤description if (workLine != null && !workLine.isEmpty()) { wrapper.like(GlassInfo::getDescription, "workLine=" + workLine); } List recentGlasses = glassInfoMapper.selectList(wrapper); log.debug("查询到最近扫码的玻璃: deviceId={}, workLine={}, count={}", deviceConfig.getId(), workLine, recentGlasses.size()); return recentGlasses; } catch (Exception e) { log.error("查询最近扫码的玻璃信息异常: deviceId={}", deviceConfig.getId(), e); return Collections.emptyList(); } } /** * 更新缓冲队列 */ private void updateBuffer(String deviceId, List newGlasses) { List buffer = glassBuffer.computeIfAbsent( deviceId, k -> new CopyOnWriteArrayList<>()); Set existingIds = buffer.stream() .map(item -> item.glassInfo.getGlassId()) .collect(Collectors.toSet()); for (GlassInfo glass : newGlasses) { if (!existingIds.contains(glass.getGlassId())) { buffer.add(new GlassBufferItem(glass, System.currentTimeMillis())); log.debug("添加玻璃到缓冲队列: deviceId={}, glassId={}", deviceId, glass.getGlassId()); } } } /** * 判断是否应该处理批次 */ private boolean shouldProcessBatch(String deviceId, List buffer, WorkstationLogicConfig config) { // 条件1:缓冲队列已满(达到容量限制) int totalLength = buffer.stream() .mapToInt(item -> item.glassInfo.getGlassLength() != null ? item.glassInfo.getGlassLength() : 0) .sum(); if (totalLength >= config.getVehicleCapacity()) { log.info("缓冲队列容量已满,触发批次处理: deviceId={}, totalLength={}, capacity={}", deviceId, totalLength, config.getVehicleCapacity()); return true; } // 条件2:30s内无新玻璃扫码 AtomicLong lastTime = lastScanTime.get(deviceId); if (lastTime != null) { long elapsed = System.currentTimeMillis() - lastTime.get(); if (elapsed >= config.getTransferDelayMs()) { log.info("30s内无新玻璃扫码,触发批次处理: deviceId={}, elapsed={}ms", deviceId, elapsed); return true; } } return false; } /** * 组装批次(容量判断) */ private List assembleBatch(List buffer, int vehicleCapacity) { List batch = new ArrayList<>(); int usedLength = 0; for (GlassBufferItem item : buffer) { GlassInfo glass = item.glassInfo; int glassLength = glass.getGlassLength() != null ? glass.getGlassLength() : 0; if (usedLength + glassLength <= vehicleCapacity && batch.size() < 6) { batch.add(glass); usedLength += glassLength; } else { break; } } return batch; } /** * 写入批次到PLC */ private DevicePlcVO.OperationResult writeBatchToPlc( DeviceConfig deviceConfig, List batch, EnhancedS7Serializer serializer, Map logicParams) { Map payload = new HashMap<>(); // 写入玻璃ID(最多6个) int count = Math.min(batch.size(), 6); for (int i = 0; i < count; i++) { String fieldName = "plcGlassId" + (i + 1); payload.put(fieldName, batch.get(i).getGlassId()); } // 写入玻璃数量 payload.put("plcGlassCount", count); // 写入位置信息(如果有配置) Integer inPosition = getLogicParam(logicParams, "inPosition", null); if (inPosition != null) { payload.put("inPosition", inPosition); } // 写入请求字(触发大车) payload.put("plcRequest", 1); try { plcDynamicDataService.writePlcData(deviceConfig, payload, serializer); log.info("批次已写入PLC: deviceId={}, glassCount={}", deviceConfig.getId(), count); return buildResult(deviceConfig, "writeBatchToPlc", true, "批次写入成功"); } catch (Exception e) { log.error("写入批次到PLC失败: deviceId={}", deviceConfig.getId(), e); return buildResult(deviceConfig, "writeBatchToPlc", false, "写入失败: " + e.getMessage()); } } /** * 从缓冲队列移除已处理的玻璃 */ private void removeProcessedGlasses(String deviceId, List processed) { List buffer = glassBuffer.get(deviceId); if (buffer == null) { return; } Set processedIds = processed.stream() .map(GlassInfo::getGlassId) .collect(Collectors.toSet()); buffer.removeIf(item -> processedIds.contains(item.glassInfo.getGlassId())); } /** * 启动监控任务(定期检查并处理) */ private DevicePlcVO.OperationResult handleStartMonitor( DeviceConfig deviceConfig, WorkstationLogicConfig config, Map logicParams) { String deviceId = deviceConfig.getDeviceId(); // 停止旧的监控任务 handleStopMonitor(deviceConfig); // 获取监控间隔 Integer monitorIntervalMs = getLogicParam(logicParams, "monitorIntervalMs", config.getScanIntervalMs()); // 启动监控任务 ScheduledFuture future = monitorExecutor.scheduleWithFixedDelay(() -> { try { handleCheckAndProcess(deviceConfig, config, logicParams); } catch (Exception e) { log.error("监控任务执行异常: deviceId={}", deviceId, e); } }, monitorIntervalMs, monitorIntervalMs, TimeUnit.MILLISECONDS); monitorTasks.put(deviceId, future); log.info("已启动卧转立监控任务: deviceId={}, interval={}ms", deviceId, monitorIntervalMs); return buildResult(deviceConfig, "startMonitor", true, "监控任务已启动"); } /** * 停止监控任务 */ private DevicePlcVO.OperationResult handleStopMonitor(DeviceConfig deviceConfig) { String deviceId = deviceConfig.getDeviceId(); ScheduledFuture future = monitorTasks.remove(deviceId); if (future != null && !future.isCancelled()) { future.cancel(false); log.info("已停止卧转立监控任务: deviceId={}", deviceId); } return buildResult(deviceConfig, "stopMonitor", true, "监控任务已停止"); } /** * 清空缓冲队列 */ private DevicePlcVO.OperationResult handleClearBuffer(DeviceConfig deviceConfig) { String deviceId = deviceConfig.getDeviceId(); glassBuffer.remove(deviceId); lastScanTime.remove(deviceId); log.info("已清空缓冲队列: deviceId={}", deviceId); return buildResult(deviceConfig, "clearBuffer", true, "缓冲队列已清空"); } /** * 构建操作结果 */ private DevicePlcVO.OperationResult buildResult(DeviceConfig deviceConfig, String operation, boolean success, String message) { return DevicePlcVO.OperationResult.builder() .deviceId(deviceConfig.getId()) .deviceName(deviceConfig.getDeviceName()) .deviceCode(deviceConfig.getDeviceCode()) .projectId(deviceConfig.getProjectId() != null ? String.valueOf(deviceConfig.getProjectId()) : null) .operation(operation) .success(success) .message(message) .timestamp(LocalDateTime.now()) .build(); } /** * 应用关闭时清理资源 */ @PreDestroy public void destroy() { log.info("正在关闭卧转立监控线程池..."); // 停止所有监控任务 for (String deviceId : new ArrayList<>(monitorTasks.keySet())) { ScheduledFuture future = monitorTasks.remove(deviceId); if (future != null && !future.isCancelled()) { future.cancel(false); } } // 关闭线程池 monitorExecutor.shutdown(); try { if (!monitorExecutor.awaitTermination(5, TimeUnit.SECONDS)) { monitorExecutor.shutdownNow(); if (!monitorExecutor.awaitTermination(5, TimeUnit.SECONDS)) { log.warn("卧转立监控线程池未能正常关闭"); } } } catch (InterruptedException e) { monitorExecutor.shutdownNow(); Thread.currentThread().interrupt(); } log.info("卧转立监控线程池已关闭"); } /** * 玻璃缓冲项 */ private static class GlassBufferItem { final GlassInfo glassInfo; final long timestamp; GlassBufferItem(GlassInfo glassInfo, long timestamp) { this.glassInfo = glassInfo; this.timestamp = timestamp; } } }