package com.mes.task.service; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.mes.device.entity.DeviceConfig; import com.mes.device.entity.DeviceGroupConfig; import com.mes.device.service.DeviceCoordinationService; import com.mes.device.service.DeviceInteractionService; import com.mes.device.service.GlassInfoService; import com.mes.device.vo.DevicePlcVO; import com.mes.interaction.DeviceInteraction; import com.mes.interaction.DeviceInteractionRegistry; import com.mes.interaction.DeviceLogicHandler; import com.mes.interaction.DeviceLogicHandlerFactory; import com.mes.interaction.base.InteractionContext; import com.mes.interaction.base.InteractionResult; import com.mes.task.dto.TaskParameters; import com.mes.task.entity.MultiDeviceTask; import com.mes.task.entity.TaskStepDetail; import com.mes.task.mapper.MultiDeviceTaskMapper; import com.mes.task.mapper.TaskStepDetailMapper; import com.mes.task.model.RetryPolicy; import com.mes.task.model.TaskExecutionContext; import com.mes.task.model.TaskExecutionResult; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * 多设备任务执行引擎 * 支持串行和并行两种执行模式 */ @Slf4j @Component @RequiredArgsConstructor public class TaskExecutionEngine { private static final Map DEFAULT_OPERATIONS = new HashMap<>(); private static final TypeReference> MAP_TYPE = new TypeReference>() {}; private static final int SCANNER_LOOKBACK_MINUTES = 2; private static final int SCANNER_LOOKBACK_LIMIT = 20; // 执行模式常量 private static final String EXECUTION_MODE_SERIAL = "SERIAL"; private static final String EXECUTION_MODE_PARALLEL = "PARALLEL"; static { DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.LOAD_VEHICLE, "feedGlass"); DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.LARGE_GLASS, "processGlass"); DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.WORKSTATION_SCANNER, "scanOnce"); DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.WORKSTATION_TRANSFER, "checkAndProcess"); } private final TaskStepDetailMapper taskStepDetailMapper; private final MultiDeviceTaskMapper multiDeviceTaskMapper; private final DeviceInteractionService deviceInteractionService; private final DeviceInteractionRegistry interactionRegistry; private final DeviceLogicHandlerFactory handlerFactory; private final DeviceCoordinationService deviceCoordinationService; private final TaskStatusNotificationService notificationService; private final ObjectMapper objectMapper; @Qualifier("deviceGlassInfoService") private final GlassInfoService glassInfoService; // 线程池用于并行执行 private final ExecutorService executorService = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r, "TaskExecutionEngine-Parallel"); t.setDaemon(true); return t; }); // 定时器线程池:用于设备定时扫描 private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(10, r -> { Thread t = new Thread(r, "TaskExecutionEngine-Scheduled"); t.setDaemon(true); return t; }); // 存储每个任务的定时器任务:taskId -> List private final Map>> taskScheduledTasks = new ConcurrentHashMap<>(); // 记录正在运行任务的上下文,便于取消任务时访问 private final Map runningTaskContexts = new ConcurrentHashMap<>(); public TaskExecutionResult execute(MultiDeviceTask task, DeviceGroupConfig groupConfig, List devices, TaskParameters parameters) { if (CollectionUtils.isEmpty(devices)) { return TaskExecutionResult.failure("设备组未配置设备,无法执行任务", Collections.emptyMap()); } TaskExecutionContext context = new TaskExecutionContext(parameters); runningTaskContexts.put(task.getTaskId(), context); // 将本次任务涉及的设备列表存入上下文,便于取消任务时做设备级收尾(如停止大车内部监控定时器) context.getSharedData().put("devices", devices); task.setTotalSteps(devices.size()); task.setStatus(MultiDeviceTask.Status.RUNNING.name()); multiDeviceTaskMapper.updateById(task); // 通知任务开始执行 notificationService.notifyTaskStatus(task); // 确定执行模式 String executionMode = determineExecutionMode(groupConfig); Integer maxConcurrent = getMaxConcurrentDevices(groupConfig); log.info("任务执行模式: {}, 最大并发数: {}, 设备数: {}", executionMode, maxConcurrent, devices.size()); List> stepSummaries; boolean success; String failureMessage; if (EXECUTION_MODE_PARALLEL.equals(executionMode)) { // 并行执行模式 stepSummaries = new ArrayList<>(Collections.nCopies(devices.size(), null)); Pair result = executeParallel(task, devices, context, stepSummaries, maxConcurrent); success = result.getFirst(); failureMessage = result.getSecond(); } else { // 串行执行模式(默认) stepSummaries = new ArrayList<>(); success = true; failureMessage = null; TaskParameters params = context.getParameters(); boolean hasGlassIds = !CollectionUtils.isEmpty(params.getGlassIds()); boolean triggerFirst = Boolean.TRUE.equals(params.getTriggerRequestFirst()); int currentOrder = 1; // 统计大车设备数量,用于区分进片大车和出片大车 int loadVehicleCount = 0; for (DeviceConfig device : devices) { if (DeviceConfig.DeviceType.LOAD_VEHICLE.equals(device.getDeviceType())) { loadVehicleCount++; } } int currentLoadVehicleIndex = 0; for (DeviceConfig device : devices) { String deviceType = device.getDeviceType(); log.debug("处理设备: deviceId={}, deviceType={}, deviceName={}, WORKSTATION_SCANNER常量={}, equals={}", device.getId(), deviceType, device.getDeviceName(), DeviceConfig.DeviceType.WORKSTATION_SCANNER, DeviceConfig.DeviceType.WORKSTATION_SCANNER.equals(deviceType)); boolean isLoadVehicle = DeviceConfig.DeviceType.LOAD_VEHICLE.equals(deviceType); boolean isScanner = DeviceConfig.DeviceType.WORKSTATION_SCANNER.equals(deviceType) || (deviceType != null && (deviceType.contains("扫码") || deviceType.contains("SCANNER"))); boolean isLargeGlass = DeviceConfig.DeviceType.LARGE_GLASS.equals(deviceType); boolean isTransfer = DeviceConfig.DeviceType.WORKSTATION_TRANSFER.equals(deviceType); log.debug("设备类型判断: deviceId={}, isLoadVehicle={}, isScanner={}, isLargeGlass={}, isTransfer={}", device.getId(), isLoadVehicle, isScanner, isLargeGlass, isTransfer); // 1. 卧转立扫码设备:启动定时器扫描(每10秒处理一个玻璃ID) if (isScanner) { log.debug("检测到扫码设备,准备启动定时器: deviceId={}, deviceType={}, deviceName={}", device.getId(), device.getDeviceType(), device.getDeviceName()); TaskStepDetail step = createStepRecord(task, device, currentOrder); ScheduledFuture scannerTask = startScannerTimer(task, step, device, context); if (scannerTask != null) { registerScheduledTask(task.getTaskId(), scannerTask); stepSummaries.add(createStepSummary(device.getDeviceName(), true, "定时器已启动,每10秒扫描一次")); log.debug("扫码设备定时器启动成功: deviceId={}, taskId={}", device.getId(), task.getTaskId()); } else { log.warn("扫码设备定时器启动失败,glassIds可能为空: deviceId={}, taskId={}, contextParams={}", device.getId(), task.getTaskId(), context.getParameters()); stepSummaries.add(createStepSummary(device.getDeviceName(), false, "启动定时器失败")); success = false; failureMessage = "卧转立扫码设备启动定时器失败"; break; } currentOrder++; continue; } // 2. 卧转立设备:启动定时器定期检查并处理(中转设备) if (isTransfer) { log.debug("检测到卧转立设备,准备启动定时器: deviceId={}, deviceType={}, deviceName={}", device.getId(), device.getDeviceType(), device.getDeviceName()); TaskStepDetail step = createStepRecord(task, device, currentOrder); ScheduledFuture transferTask = startTransferTimer(task, step, device, context); if (transferTask != null) { registerScheduledTask(task.getTaskId(), transferTask); stepSummaries.add(createStepSummary(device.getDeviceName(), true, "定时器已启动,定期检查并处理玻璃批次")); log.debug("卧转立设备定时器启动成功: deviceId={}, taskId={}", device.getId(), task.getTaskId()); } else { log.warn("卧转立设备定时器启动失败: deviceId={}, taskId={}", device.getId(), task.getTaskId()); stepSummaries.add(createStepSummary(device.getDeviceName(), false, "启动定时器失败")); success = false; failureMessage = "卧转立设备启动定时器失败"; break; } currentOrder++; continue; } // 3. 进片大车设备:启动定时器持续监控容量(第一个大车设备) if (isLoadVehicle) { currentLoadVehicleIndex++; boolean isInboundVehicle = currentLoadVehicleIndex == 1; // 第一个大车是进片大车 TaskStepDetail step = createStepRecord(task, device, currentOrder); ScheduledFuture vehicleTask; if (isInboundVehicle) { // 进片大车:监控容量,动态判断 vehicleTask = startInboundVehicleTimer(task, step, device, context); if (vehicleTask != null) { registerScheduledTask(task.getTaskId(), vehicleTask); stepSummaries.add(createStepSummary(device.getDeviceName(), true, "进片大车定时器已启动,持续监控容量")); } else { stepSummaries.add(createStepSummary(device.getDeviceName(), false, "启动定时器失败")); success = false; failureMessage = "进片大车设备启动定时器失败"; break; } } else { // 出片大车:启动定时器监控出片任务 vehicleTask = startOutboundVehicleTimer(task, step, device, context); if (vehicleTask != null) { registerScheduledTask(task.getTaskId(), vehicleTask); stepSummaries.add(createStepSummary(device.getDeviceName(), true, "出片大车定时器已启动,持续监控出片任务")); } else { stepSummaries.add(createStepSummary(device.getDeviceName(), false, "启动定时器失败")); success = false; failureMessage = "出片大车设备启动定时器失败"; break; } } currentOrder++; continue; } // 4. 大理片笼设备:启动定时器逻辑处理(不涉及PLC交互,只负责逻辑处理) if (isLargeGlass) { TaskStepDetail step = createStepRecord(task, device, currentOrder); ScheduledFuture largeGlassTask = startLargeGlassTimer(task, step, device, context); if (largeGlassTask != null) { registerScheduledTask(task.getTaskId(), largeGlassTask); stepSummaries.add(createStepSummary(device.getDeviceName(), true, "大理片笼定时器已启动,逻辑处理中")); } else { stepSummaries.add(createStepSummary(device.getDeviceName(), false, "启动定时器失败")); success = false; failureMessage = "大理片笼设备启动定时器失败"; break; } currentOrder++; continue; } // 其他设备:正常执行 TaskStepDetail step = createStepRecord(task, device, currentOrder); StepResult stepResult = executeStep(task, step, device, context); stepSummaries.add(stepResult.toSummary()); if (!stepResult.isSuccess()) { success = false; failureMessage = stepResult.getMessage(); break; } currentOrder++; } // 如果所有设备都是定时器模式,任务保持运行状态,不等待完成 // 定时器会在后台持续运行,直到手动停止或超时 boolean hasScheduledTasks = !CollectionUtils.isEmpty(taskScheduledTasks.get(task.getTaskId())); if (hasScheduledTasks) { log.debug("任务已启动所有定时器,保持运行状态: taskId={}, scheduledTasksCount={}", task.getTaskId(), taskScheduledTasks.get(task.getTaskId()).size()); // 任务保持 RUNNING 状态,定时器在后台运行 // 不更新任务状态为 COMPLETED,让任务持续运行 Map payload = new HashMap<>(); payload.put("steps", stepSummaries); payload.put("groupId", groupConfig.getId()); payload.put("deviceCount", devices.size()); payload.put("executionMode", executionMode); payload.put("message", "任务已启动,定时器在后台运行中"); // 通知任务状态(保持 RUNNING) notificationService.notifyTaskStatus(task); if (success) { return TaskExecutionResult.success(payload); } return TaskExecutionResult.failure(failureMessage != null ? failureMessage : "任务执行失败", payload); } // 如果没有定时器任务,等待所有步骤完成 // 这种情况通常不会发生,因为所有设备都是定时器模式 } Map payload = new HashMap<>(); payload.put("steps", stepSummaries); payload.put("groupId", groupConfig.getId()); payload.put("deviceCount", devices.size()); payload.put("executionMode", executionMode); // 停止所有定时器任务 stopScheduledTasks(task.getTaskId()); boolean cancelled = isTaskCancelled(context); // 更新任务最终状态 if (cancelled) { task.setStatus(MultiDeviceTask.Status.CANCELLED.name()); task.setErrorMessage("任务已取消"); } else if (success) { task.setStatus(MultiDeviceTask.Status.COMPLETED.name()); } else { task.setStatus(MultiDeviceTask.Status.FAILED.name()); task.setErrorMessage(failureMessage); } task.setEndTime(new Date()); multiDeviceTaskMapper.updateById(task); // 通知任务完成 notificationService.notifyTaskStatus(task); if (success) { return TaskExecutionResult.success(payload); } return TaskExecutionResult.failure(failureMessage != null ? failureMessage : "任务执行失败", payload); } /** * 请求取消任务:停止所有定时器并标记上下文 */ public void requestTaskCancellation(String taskId) { TaskExecutionContext context = runningTaskContexts.get(taskId); if (context != null) { context.getSharedData().put("taskCancelled", true); log.warn("已标记任务取消: taskId={}", taskId); // 同时通知相关设备逻辑处理器执行取消收尾逻辑(例如停止大车内部的监控定时器) try { Map cancelParams = new HashMap<>(); cancelParams.put("_taskContext", context); Object devicesObj = context.getSharedData().get("devices"); if (devicesObj instanceof List) { @SuppressWarnings("unchecked") List devices = (List) devicesObj; for (DeviceConfig device : devices) { if (device == null) { continue; } try { DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType()); if (handler != null) { // 目前大车逻辑处理器会在reset/clear等操作中停止内部监控定时器 // 这里统一调用一次“reset”作为任务取消时的收尾动作 handler.execute(device, "reset", cancelParams); } } catch (Exception e) { log.warn("任务取消时执行设备收尾(reset)失败: taskId={}, deviceId={}, error={}", taskId, device.getId(), e.getMessage()); } } } } catch (Exception e) { log.warn("任务取消时执行设备收尾逻辑异常: taskId={}, error={}", taskId, e.getMessage()); } } else { log.warn("请求取消任务但未找到上下文: taskId={}", taskId); } stopScheduledTasks(taskId); } /** * 启动卧转立扫码设备定时器:每10秒处理一个玻璃ID */ private ScheduledFuture startScannerTimer(MultiDeviceTask task, TaskStepDetail step, DeviceConfig device, TaskExecutionContext context) { try { TaskParameters params = context.getParameters(); List glassIds = params.getGlassIds(); log.debug("卧转立扫码定时器初始化: taskId={}, deviceId={}, glassIds={}, glassIdsSize={}, isEmpty={}", task.getTaskId(), device.getId(), glassIds, glassIds != null ? glassIds.size() : 0, CollectionUtils.isEmpty(glassIds)); if (CollectionUtils.isEmpty(glassIds)) { log.warn("卧转立扫码设备没有玻璃ID,定时器不启动: deviceId={}", device.getId()); return null; } // 创建待处理玻璃ID队列 Queue glassIdQueue = new ConcurrentLinkedQueue<>(glassIds); AtomicInteger processedCount = new AtomicInteger(0); AtomicInteger successCount = new AtomicInteger(0); AtomicInteger failCount = new AtomicInteger(0); // 从设备配置中获取扫码间隔,默认10秒 Map logicParams = parseLogicParams(device); Integer scanIntervalMs = getLogicParam(logicParams, "scanIntervalMs", 10_000); log.debug("启动卧转立扫码定时器: taskId={}, deviceId={}, glassCount={}, interval={}ms, glassIds={}", task.getTaskId(), device.getId(), glassIds.size(), scanIntervalMs, glassIds); // 启动定时任务 ScheduledFuture future = scheduledExecutor.scheduleWithFixedDelay(() -> { try { if (isTaskCancelled(context)) { log.debug("任务已取消,停止卧转立扫码定时器: taskId={}, deviceId={}", task.getTaskId(), device.getId()); return; } // 定时器第一次执行时,将设备状态从 WAITING 设置为 RUNNING deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.RUNNING, context); ensureStepRunning(step, task.getTaskId()); // 检查是否需要暂停 if (shouldPauseScanner(context)) { log.debug("卧转立扫码定时器暂停: taskId={}, deviceId={}", task.getTaskId(), device.getId()); return; } // 检查是否还有待处理的玻璃ID String glassId = glassIdQueue.poll(); if (glassId == null) { log.debug("卧转立扫码定时器完成: taskId={}, deviceId={}, processed={}/{}, success={}, fail={}", task.getTaskId(), device.getId(), processedCount.get(), glassIds.size(), successCount.get(), failCount.get()); // 清空plcRequest和plcGlassId(确保PLC状态清理) try { DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType()); if (handler != null) { Map clearParams = new HashMap<>(); clearParams.put("_taskContext", context); handler.execute(device, "clearPlc", clearParams); log.debug("卧转立扫码定时器完成,已清空PLC请求字段: taskId={}, deviceId={}", task.getTaskId(), device.getId()); } } catch (Exception e) { log.warn("卧转立扫码定时器完成时清空PLC失败: taskId={}, deviceId={}, error={}", task.getTaskId(), device.getId(), e.getMessage()); } // 若之前未出现失败,再将状态置为完成 boolean alreadyFailed = TaskStepDetail.Status.FAILED.name().equals(step.getStatus()); if (!alreadyFailed) { step.setStatus(TaskStepDetail.Status.COMPLETED.name()); step.setSuccessMessage(String.format("已完成扫描: 成功=%d, 失败=%d", successCount.get(), failCount.get())); if (step.getEndTime() == null) { step.setEndTime(new Date()); } taskStepDetailMapper.updateById(step); notificationService.notifyStepUpdate(task.getTaskId(), step); // 扫码设备完成后尝试自动收尾整个任务 checkAndCompleteTaskIfDone(step.getTaskId()); } deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.COMPLETED, context); return; } int currentIndex = processedCount.incrementAndGet(); log.debug("卧转立扫码定时器处理第{}/{}个玻璃: taskId={}, deviceId={}, glassId={}", currentIndex, glassIds.size(), task.getTaskId(), device.getId(), glassId); // 执行单次扫描 Map scanParams = new HashMap<>(); scanParams.put("glassId", glassId); scanParams.put("_taskContext", context); log.debug("卧转立扫码定时器准备执行: taskId={}, deviceId={}, glassId={}, scanParams={}", task.getTaskId(), device.getId(), glassId, scanParams); DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType()); if (handler != null) { // 将logicParams合并到scanParams中(使用已定义的logicParams变量) if (logicParams != null && !logicParams.isEmpty()) { scanParams.put("_logicParams", logicParams); } log.debug("卧转立扫码定时器调用handler.execute: taskId={}, deviceId={}, glassId={}, operation=scanOnce, scanParamsKeys={}, scanParams={}", task.getTaskId(), device.getId(), glassId, scanParams.keySet(), scanParams); DevicePlcVO.OperationResult result = handler.execute(device, "scanOnce", scanParams); log.debug("卧转立扫码定时器handler.execute返回: taskId={}, deviceId={}, glassId={}, success={}", task.getTaskId(), device.getId(), glassId, result.getSuccess()); if (Boolean.TRUE.equals(result.getSuccess())) { successCount.incrementAndGet(); log.debug("卧转立扫码定时器处理成功: taskId={}, deviceId={}, glassId={}", task.getTaskId(), device.getId(), glassId); } else { failCount.incrementAndGet(); log.warn("卧转立扫码定时器处理失败: taskId={}, deviceId={}, glassId={}, error={}", task.getTaskId(), device.getId(), glassId, result.getMessage()); } // 更新步骤状态(显示进度,保持RUNNING状态直到所有玻璃处理完成) updateStepStatusForScanner(step, result, currentIndex, glassIds.size(), successCount.get(), failCount.get()); // 通知步骤更新(让前端实时看到步骤状态和进度) notificationService.notifyStepUpdate(task.getTaskId(), step); boolean opSuccess = Boolean.TRUE.equals(result.getSuccess()); updateTaskProgress(task, step.getStepOrder(), opSuccess); if (!opSuccess) { deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); } } } catch (Exception e) { log.error("卧转立扫码定时器执行异常: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e); failCount.incrementAndGet(); } }, 0, scanIntervalMs, TimeUnit.MILLISECONDS); // 在串行执行模式下,扫码设备是第一个,应该立即设置为 RUNNING // 其他设备保持 WAITING,直到它们真正开始工作 deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.RUNNING, context); return future; } catch (Exception e) { log.error("启动卧转立扫码定时器失败: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e); return null; } } /** * 启动卧转立设备定时器:定期检查并处理玻璃批次 */ private ScheduledFuture startTransferTimer(MultiDeviceTask task, TaskStepDetail step, DeviceConfig device, TaskExecutionContext context) { try { // 从设备配置中获取监控间隔,默认5秒 Map logicParams = parseLogicParams(device); Integer monitorIntervalMs = getLogicParam(logicParams, "monitorIntervalMs", 5_000); log.debug("启动卧转立设备定时器: taskId={}, deviceId={}, interval={}ms", task.getTaskId(), device.getId(), monitorIntervalMs); // 启动定时任务 // 使用AtomicBoolean标记是否第一次执行 final java.util.concurrent.atomic.AtomicBoolean firstExecution = new java.util.concurrent.atomic.AtomicBoolean(true); ScheduledFuture future = scheduledExecutor.scheduleWithFixedDelay(() -> { try { if (isTaskCancelled(context)) { log.debug("任务已取消,停止卧转立设备定时器: taskId={}, deviceId={}", task.getTaskId(), device.getId()); return; } // 如果步骤已经完成,不再执行后续逻辑(避免状态被重置) if (TaskStepDetail.Status.COMPLETED.name().equals(step.getStatus())) { log.debug("卧转立设备步骤已完成,停止定时器执行: taskId={}, deviceId={}", task.getTaskId(), device.getId()); return; } // 构建参数 Map params = new HashMap<>(); params.put("_taskContext", context); if (logicParams != null && !logicParams.isEmpty()) { params.put("_logicParams", logicParams); } // 调用handler执行checkAndProcess DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType()); if (handler != null) { DevicePlcVO.OperationResult result = handler.execute(device, "checkAndProcess", params); // 检查是否有数据:如果有数据或正在处理,设置为RUNNING;如果缓冲队列为空且无待处理玻璃,保持PENDING String message = result.getMessage(); boolean hasData = result.getSuccess() != null && result.getSuccess() && message != null && !message.contains("缓冲队列为空,无待处理玻璃"); // 如果当前是PENDING状态,且检测到有数据,则设置为RUNNING(设备开始工作) boolean isPending = TaskStepDetail.Status.PENDING.name().equals(step.getStatus()); if (hasData && isPending) { // 检测到数据且当前是等待状态,设备开始工作,设置为RUNNING deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.RUNNING, context); step.setStatus(TaskStepDetail.Status.RUNNING.name()); if (step.getStartTime() == null) { step.setStartTime(new Date()); } taskStepDetailMapper.updateById(step); notificationService.notifyStepUpdate(task.getTaskId(), step); log.debug("卧转立设备定时器检测到数据,从PENDING转为RUNNING: taskId={}, deviceId={}, message={}", task.getTaskId(), device.getId(), message); } else if (!hasData) { // 没有数据,保持PENDING状态,等待扫码设备输出 if (firstExecution.compareAndSet(true, false)) { // 第一次执行,确保状态是PENDING if (!TaskStepDetail.Status.PENDING.name().equals(step.getStatus())) { step.setStatus(TaskStepDetail.Status.PENDING.name()); step.setSuccessMessage("等待扫码设备输出数据"); taskStepDetailMapper.updateById(step); notificationService.notifyStepUpdate(task.getTaskId(), step); } log.debug("卧转立设备定时器第一次执行,无数据,保持PENDING: taskId={}, deviceId={}, message={}", task.getTaskId(), device.getId(), message); } return; // 不执行后续逻辑,等待下一次定时器触发 } // 更新步骤状态(区分等待中和真正完成) updateStepStatusForTransfer(step, result, device, context); // 通知步骤更新(让前端实时看到步骤状态) notificationService.notifyStepUpdate(task.getTaskId(), step); boolean opSuccess = Boolean.TRUE.equals(result.getSuccess()); updateTaskProgress(task, step.getStepOrder(), opSuccess); // 根据执行结果更新设备状态 // 注意:设备已经开始工作(定时器已执行),所以应该保持RUNNING状态 // 只有在真正完成时才设置为COMPLETED if (opSuccess) { // 设备正在工作(等待缓冲、处理中等),保持RUNNING状态 deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.RUNNING, context); if (message != null && message.contains("批次已写入PLC")) { log.debug("卧转立设备定时器执行成功(已写入PLC): taskId={}, deviceId={}, message={}", task.getTaskId(), device.getId(), message); } else { log.debug("卧转立设备定时器工作中(等待缓冲): taskId={}, deviceId={}, message={}", task.getTaskId(), device.getId(), message); } } else { log.warn("卧转立设备定时器执行失败: taskId={}, deviceId={}, message={}", task.getTaskId(), device.getId(), result.getMessage()); deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); } } } catch (Exception e) { log.error("卧转立设备定时器执行异常: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e); } }, 0, monitorIntervalMs, TimeUnit.MILLISECONDS); // 在串行执行模式下,设备启动定时器时先设置为 WAITING,定时器第一次执行时再设置为 RUNNING deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.WAITING, context); return future; } catch (Exception e) { log.error("启动卧转立设备定时器失败: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e); return null; } } /** * 启动进片大车设备定时器:持续监控容量,动态判断 */ private ScheduledFuture startInboundVehicleTimer(MultiDeviceTask task, TaskStepDetail step, DeviceConfig device, TaskExecutionContext context) { try { final long MONITOR_INTERVAL_MS = 2_000; // 2秒监控一次 final AtomicInteger lastProcessedCount = new AtomicInteger(0); log.debug("启动进片大车设备定时器: taskId={}, deviceId={}, interval={}s", task.getTaskId(), device.getId(), MONITOR_INTERVAL_MS / 1000); // 启动定时任务 ScheduledFuture future = scheduledExecutor.scheduleWithFixedDelay(() -> { try { if (isTaskCancelled(context)) { log.debug("任务已取消,停止进片大车定时器: taskId={}, deviceId={}", task.getTaskId(), device.getId()); return; } // 进片大车设备:只有在真正开始处理时才设置为RUNNING // 检查是否有卧转立主体已输出、准备上大车的玻璃信息 List readyGlassIds = getTransferReadyGlassIds(context); // 如果当前没有新的玻璃,无论步骤是否已进入RUNNING,都应该轮询MES任务/确认状态 if (CollectionUtils.isEmpty(readyGlassIds)) { // 轮询MES任务/确认,避免错过MES侧后写入的任务 pollMesForVehicle(task, step, device, context); // 如果仍然没有卧转立输出的玻璃,保持/更新为PENDING提示 if (!TaskStepDetail.Status.RUNNING.name().equals(step.getStatus()) && !TaskStepDetail.Status.PENDING.name().equals(step.getStatus())) { step.setStatus(TaskStepDetail.Status.PENDING.name()); step.setSuccessMessage("等待卧转立输出玻璃"); taskStepDetailMapper.updateById(step); notificationService.notifyStepUpdate(task.getTaskId(), step); } return; } // 如果玻璃ID数量没有变化,说明没有新的玻璃 int currentCount = readyGlassIds.size(); if (currentCount == lastProcessedCount.get()) { // 玻璃数量没有变化:没有新玻璃,但仍需轮询MES任务/确认,避免错过MES侧的变化 pollMesForVehicle(task, step, device, context); log.debug("大车设备定时器:玻璃ID数量未变化,继续等待: taskId={}, deviceId={}, count={}", task.getTaskId(), device.getId(), currentCount); return; } log.debug("进片大车设备定时器检测到卧转立输出的玻璃信息: taskId={}, deviceId={}, glassCount={}", task.getTaskId(), device.getId(), currentCount); // 检查容量 Map checkParams = new HashMap<>(); checkParams.put("glassIds", new ArrayList<>(readyGlassIds)); checkParams.put("_taskContext", context); DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType()); if (handler != null) { // 将logicParams合并到checkParams中 Map logicParams = parseLogicParams(device); if (logicParams != null && !logicParams.isEmpty()) { checkParams.put("_logicParams", logicParams); } // 第一步:写入大车上料请求 DevicePlcVO.OperationResult feedResult = handler.execute(device, "feedGlass", checkParams); if (Boolean.TRUE.equals(feedResult.getSuccess())) { // 真正开始处理,设置为RUNNING deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.RUNNING, context); // 步骤状态也设置为RUNNING if (!TaskStepDetail.Status.RUNNING.name().equals(step.getStatus())) { step.setStatus(TaskStepDetail.Status.RUNNING.name()); if (step.getStartTime() == null) { step.setStartTime(new Date()); } taskStepDetailMapper.updateById(step); notificationService.notifyStepUpdate(task.getTaskId(), step); } log.debug("进片大车设备定时器执行成功: taskId={}, deviceId={}, glassCount={}", task.getTaskId(), device.getId(), readyGlassIds.size()); // 将已装载的玻璃ID保存到共享数据中(供大理片笼使用) setLoadedGlassIds(context, new ArrayList<>(readyGlassIds)); // 清空卧转立输出的玻璃ID列表(已处理) clearTransferReadyGlassIds(context); lastProcessedCount.set(0); // 确保卧转立扫码继续运行 setScannerPause(context, false); // feedGlass成功后,先检查MES任务(checkMesTask)来开始执行任务 DevicePlcVO.OperationResult mesTaskResult = null; try { mesTaskResult = handler.execute(device, "checkMesTask", Collections.emptyMap()); if (mesTaskResult != null && Boolean.TRUE.equals(mesTaskResult.getSuccess())) { log.info("进片大车设备已检查MES任务并开始执行: taskId={}, deviceId={}, message={}", task.getTaskId(), device.getId(), mesTaskResult.getMessage()); } } catch (Exception e) { log.warn("进片大车设备检查MES任务异常: taskId={}, deviceId={}, error={}", task.getTaskId(), device.getId(), e.getMessage()); } } else { // 装不下,保持WAITING状态和PENDING步骤状态 deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.WAITING, context); if (!TaskStepDetail.Status.PENDING.name().equals(step.getStatus())) { step.setStatus(TaskStepDetail.Status.PENDING.name()); step.setSuccessMessage("容量不足,等待中"); taskStepDetailMapper.updateById(step); notificationService.notifyStepUpdate(task.getTaskId(), step); } // 装不下,记录容量不足(是否需要影响扫码由工艺再决定) log.warn("进片大车设备定时器容量不足: taskId={}, deviceId={}, message={}", task.getTaskId(), device.getId(), feedResult.getMessage()); lastProcessedCount.set(currentCount); // 记录当前数量,避免重复检查 } // 第二步:检查MES确认状态(如果大车处理器支持的话) // 只有在任务已开始执行(有任务记录)时才检查MES确认 DevicePlcVO.OperationResult mesResult = null; try { mesResult = handler.execute(device, "checkMesConfirm", Collections.emptyMap()); } catch (Exception e) { log.warn("进片大车设备检查MES确认状态异常: taskId={}, deviceId={}, error={}", task.getTaskId(), device.getId(), e.getMessage()); } // 更新步骤状态(大车设备保持RUNNING,直到MES确认完成或任务取消) if (mesResult != null) { updateStepStatusForVehicle(step, mesResult); boolean opSuccess = Boolean.TRUE.equals(mesResult.getSuccess()); updateTaskProgress(task, step.getStepOrder(), opSuccess); if (!opSuccess) { deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); } } else { updateStepStatusForVehicle(step, feedResult); boolean opSuccess = Boolean.TRUE.equals(feedResult.getSuccess()); updateTaskProgress(task, step.getStepOrder(), opSuccess); if (!opSuccess) { deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); } } } } catch (Exception e) { log.error("进片大车设备定时器执行异常: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e); } }, 0, MONITOR_INTERVAL_MS, TimeUnit.MILLISECONDS); // 在串行执行模式下,设备启动定时器时先设置为 WAITING,定时器第一次执行时再设置为 RUNNING deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.WAITING, context); return future; } catch (Exception e) { log.error("启动进片大车设备定时器失败: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e); return null; } } /** * 启动出片大车设备定时器:持续监控出片任务 */ private ScheduledFuture startOutboundVehicleTimer(MultiDeviceTask task, TaskStepDetail step, DeviceConfig device, TaskExecutionContext context) { try { final long MONITOR_INTERVAL_MS = 2_000; // 2秒监控一次 log.debug("启动出片大车设备定时器: taskId={}, deviceId={}, interval={}s", task.getTaskId(), device.getId(), MONITOR_INTERVAL_MS / 1000); // 启动定时任务 ScheduledFuture future = scheduledExecutor.scheduleWithFixedDelay(() -> { try { if (isTaskCancelled(context)) { log.debug("任务已取消,停止出片大车定时器: taskId={}, deviceId={}", task.getTaskId(), device.getId()); return; } // 出片大车设备:只有在真正开始处理时才设置为RUNNING // 检查是否有已处理的玻璃信息(从大理片笼来的) List processedGlassIds = getProcessedGlassIds(context); boolean isRunning = TaskStepDetail.Status.RUNNING.name().equals(step.getStatus()); // 如果设备已经在运行中,即使没有新玻璃,也要继续监控MES任务和确认状态 if (CollectionUtils.isEmpty(processedGlassIds)) { if (isRunning) { // 设备正在运行中,先检查MES任务,然后监控MES确认状态 DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType()); if (handler != null) { Map logicParams = parseLogicParams(device); // 先检查MES任务(如果mesSend=1,会创建任务并开始执行) DevicePlcVO.OperationResult mesTaskResult = null; try { mesTaskResult = handler.execute(device, "checkMesTask", Collections.emptyMap()); if (mesTaskResult != null && Boolean.TRUE.equals(mesTaskResult.getSuccess())) { log.info("出片大车设备已检查MES任务并开始执行: taskId={}, deviceId={}, message={}", task.getTaskId(), device.getId(), mesTaskResult.getMessage()); } } catch (Exception e) { log.warn("出片大车设备检查MES任务异常: taskId={}, deviceId={}, error={}", task.getTaskId(), device.getId(), e.getMessage()); } // 然后检查MES确认状态(只有在任务已开始执行时才检查) DevicePlcVO.OperationResult mesResult = null; try { mesResult = handler.execute(device, "checkMesConfirm", Collections.emptyMap()); } catch (Exception e) { log.warn("出片大车设备检查MES确认状态异常: taskId={}, deviceId={}, error={}", task.getTaskId(), device.getId(), e.getMessage()); } // 更新步骤状态(大车设备保持RUNNING,直到MES确认完成或任务取消) if (mesResult != null) { updateStepStatusForVehicle(step, mesResult); boolean opSuccess = Boolean.TRUE.equals(mesResult.getSuccess()); updateTaskProgress(task, step.getStepOrder(), opSuccess); if (!opSuccess) { deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); } } } return; } else { // 没有数据,且设备未运行,保持PENDING状态 if (!TaskStepDetail.Status.PENDING.name().equals(step.getStatus())) { step.setStatus(TaskStepDetail.Status.PENDING.name()); step.setSuccessMessage("等待大理片笼处理完成"); taskStepDetailMapper.updateById(step); notificationService.notifyStepUpdate(task.getTaskId(), step); } log.debug("出片大车设备定时器:暂无已处理的玻璃信息: taskId={}, deviceId={}", task.getTaskId(), device.getId()); return; } } log.debug("出片大车设备定时器检测到已处理的玻璃信息: taskId={}, deviceId={}, glassCount={}", task.getTaskId(), device.getId(), processedGlassIds.size()); // 执行出片操作 Map checkParams = new HashMap<>(); checkParams.put("glassIds", new ArrayList<>(processedGlassIds)); checkParams.put("_taskContext", context); DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType()); if (handler != null) { // 将logicParams合并到checkParams中 Map logicParams = parseLogicParams(device); if (logicParams != null && !logicParams.isEmpty()) { checkParams.put("_logicParams", logicParams); } // 第一步:写入大车出片请求 DevicePlcVO.OperationResult feedResult = handler.execute(device, "feedGlass", checkParams); if (Boolean.TRUE.equals(feedResult.getSuccess())) { // 真正开始处理,设置为RUNNING deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.RUNNING, context); // 步骤状态也设置为RUNNING if (!TaskStepDetail.Status.RUNNING.name().equals(step.getStatus())) { step.setStatus(TaskStepDetail.Status.RUNNING.name()); if (step.getStartTime() == null) { step.setStartTime(new Date()); } taskStepDetailMapper.updateById(step); notificationService.notifyStepUpdate(task.getTaskId(), step); } log.debug("出片大车设备定时器执行成功: taskId={}, deviceId={}, glassCount={}", task.getTaskId(), device.getId(), processedGlassIds.size()); // 清空已处理的玻璃ID列表(已处理) clearProcessedGlassIds(context); // feedGlass成功后,先检查MES任务(checkMesTask)来开始执行任务 DevicePlcVO.OperationResult mesTaskResult = null; try { mesTaskResult = handler.execute(device, "checkMesTask", Collections.emptyMap()); if (mesTaskResult != null && Boolean.TRUE.equals(mesTaskResult.getSuccess())) { log.info("出片大车设备已检查MES任务并开始执行: taskId={}, deviceId={}, message={}", task.getTaskId(), device.getId(), mesTaskResult.getMessage()); } } catch (Exception e) { log.warn("出片大车设备检查MES任务异常: taskId={}, deviceId={}, error={}", task.getTaskId(), device.getId(), e.getMessage()); } } else { // 没有数据,保持WAITING状态和PENDING步骤状态 deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.WAITING, context); if (!TaskStepDetail.Status.PENDING.name().equals(step.getStatus())) { step.setStatus(TaskStepDetail.Status.PENDING.name()); step.setSuccessMessage("等待中"); taskStepDetailMapper.updateById(step); notificationService.notifyStepUpdate(task.getTaskId(), step); } log.debug("出片大车设备定时器执行失败: taskId={}, deviceId={}, message={}", task.getTaskId(), device.getId(), feedResult.getMessage()); } // 第二步:检查MES确认状态(如果大车处理器支持的话) // 只有在任务已开始执行(有任务记录)时才检查MES确认 DevicePlcVO.OperationResult mesResult = null; try { mesResult = handler.execute(device, "checkMesConfirm", Collections.emptyMap()); } catch (Exception e) { log.warn("出片大车设备检查MES确认状态异常: taskId={}, deviceId={}, error={}", task.getTaskId(), device.getId(), e.getMessage()); } // 更新步骤状态(大车设备保持RUNNING,直到MES确认完成或任务取消) if (mesResult != null) { updateStepStatusForVehicle(step, mesResult); boolean opSuccess = Boolean.TRUE.equals(mesResult.getSuccess()); updateTaskProgress(task, step.getStepOrder(), opSuccess); if (!opSuccess) { deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); } } else { updateStepStatusForVehicle(step, feedResult); boolean opSuccess = Boolean.TRUE.equals(feedResult.getSuccess()); updateTaskProgress(task, step.getStepOrder(), opSuccess); if (!opSuccess) { deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); } } } } catch (Exception e) { log.error("出片大车设备定时器执行异常: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e); } }, 0, MONITOR_INTERVAL_MS, TimeUnit.MILLISECONDS); // 在串行执行模式下,设备启动定时器时先设置为 WAITING,定时器第一次执行时再设置为 RUNNING deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.WAITING, context); return future; } catch (Exception e) { log.error("启动出片大车设备定时器失败: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e); return null; } } /** * 启动大理片笼设备定时器:逻辑处理(不涉及PLC交互,只负责逻辑处理,比如多久给任务汇报) */ private ScheduledFuture startLargeGlassTimer(MultiDeviceTask task, TaskStepDetail step, DeviceConfig device, TaskExecutionContext context) { try { // 从设备配置中获取处理时间(默认30秒) Map logicParams = parseLogicParams(device); Integer processTimeSeconds = getLogicParam(logicParams, "processTimeSeconds", 30); final long PROCESS_TIME_MS = processTimeSeconds * 1000; log.debug("启动大理片笼设备定时器: taskId={}, deviceId={}, processTime={}s", task.getTaskId(), device.getId(), processTimeSeconds); // 启动定时任务 ScheduledFuture future = scheduledExecutor.scheduleWithFixedDelay(() -> { try { if (isTaskCancelled(context)) { log.debug("任务已取消,停止大理片笼定时器: taskId={}, deviceId={}", task.getTaskId(), device.getId()); return; } // 大理片笼设备:只有在真正开始处理时才设置为RUNNING // 检查是否有已装载的玻璃信息(从进片大车来的) List loadedGlassIds = getLoadedGlassIds(context); if (CollectionUtils.isEmpty(loadedGlassIds)) { // 没有数据,保持WAITING状态和PENDING步骤状态 deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.WAITING, context); if (!TaskStepDetail.Status.PENDING.name().equals(step.getStatus())) { step.setStatus(TaskStepDetail.Status.PENDING.name()); step.setSuccessMessage("等待进片大车装载玻璃"); taskStepDetailMapper.updateById(step); notificationService.notifyStepUpdate(task.getTaskId(), step); } log.debug("大理片笼设备定时器:暂无已装载的玻璃信息: taskId={}, deviceId={}", task.getTaskId(), device.getId()); return; } // 有数据,设置为RUNNING deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.RUNNING, context); // 检查玻璃是否已经处理完成(通过处理时间判断) Long processStartTime = getProcessStartTime(context); if (processStartTime == null) { // 第一次检测到玻璃,记录开始处理时间 setProcessStartTime(context, System.currentTimeMillis()); log.debug("大理片笼设备开始处理: taskId={}, deviceId={}, glassCount={}, processTime={}s", task.getTaskId(), device.getId(), loadedGlassIds.size(), processTimeSeconds); return; } long elapsed = System.currentTimeMillis() - processStartTime; if (elapsed < PROCESS_TIME_MS) { // 处理时间未到,继续等待 log.debug("大理片笼设备处理中: taskId={}, deviceId={}, elapsed={}s, remaining={}s", task.getTaskId(), device.getId(), elapsed / 1000, (PROCESS_TIME_MS - elapsed) / 1000); return; } // 处理时间已到,完成任务汇报 log.debug("大理片笼设备处理完成: taskId={}, deviceId={}, glassCount={}, processTime={}s", task.getTaskId(), device.getId(), loadedGlassIds.size(), processTimeSeconds); // 将已处理的玻璃ID转移到已处理列表(供出片大车使用) setProcessedGlassIds(context, new ArrayList<>(loadedGlassIds)); clearLoadedGlassIds(context); clearProcessStartTime(context); // 更新步骤状态 step.setStatus(TaskStepDetail.Status.COMPLETED.name()); step.setErrorMessage(null); step.setOutputData(toJson(Collections.singletonMap("glassIds", loadedGlassIds))); taskStepDetailMapper.updateById(step); // 大理片笼完成后尝试自动收尾整个任务 checkAndCompleteTaskIfDone(step.getTaskId()); } catch (Exception e) { log.error("大理片笼设备定时器执行异常: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e); } }, 0, 1_000, TimeUnit.MILLISECONDS); // 每秒检查一次 // 在串行执行模式下,设备启动定时器时先设置为 WAITING,定时器第一次执行时再设置为 RUNNING deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.WAITING, context); return future; } catch (Exception e) { log.error("启动大理片笼设备定时器失败: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e); return null; } } /** * 获取逻辑参数 */ @SuppressWarnings("unchecked") private T getLogicParam(Map logicParams, String key, T defaultValue) { if (logicParams == null) { return defaultValue; } Object value = logicParams.get(key); if (value == null) { return defaultValue; } try { return (T) value; } catch (ClassCastException e) { return defaultValue; } } /** * 获取已装载的玻璃ID列表 */ @SuppressWarnings("unchecked") private List getLoadedGlassIds(TaskExecutionContext context) { if (context == null) { return Collections.emptyList(); } Object glassIds = context.getSharedData().get("loadedGlassIds"); if (glassIds instanceof List) { return new ArrayList<>((List) glassIds); } return Collections.emptyList(); } /** * 设置已装载的玻璃ID列表 */ private void setLoadedGlassIds(TaskExecutionContext context, List glassIds) { if (context != null) { context.getSharedData().put("loadedGlassIds", new ArrayList<>(glassIds)); } } /** * 清空已装载的玻璃ID列表 */ private void clearLoadedGlassIds(TaskExecutionContext context) { if (context != null) { context.getSharedData().put("loadedGlassIds", new ArrayList<>()); } } /** * 获取已处理的玻璃ID列表 */ @SuppressWarnings("unchecked") private List getProcessedGlassIds(TaskExecutionContext context) { if (context == null) { return Collections.emptyList(); } Object glassIds = context.getSharedData().get("processedGlassIds"); if (glassIds instanceof List) { return new ArrayList<>((List) glassIds); } return Collections.emptyList(); } /** * 设置已处理的玻璃ID列表 */ private void setProcessedGlassIds(TaskExecutionContext context, List glassIds) { if (context != null) { context.getSharedData().put("processedGlassIds", new ArrayList<>(glassIds)); } } /** * 清空已处理的玻璃ID列表 */ private void clearProcessedGlassIds(TaskExecutionContext context) { if (context != null) { context.getSharedData().put("processedGlassIds", new ArrayList<>()); } } /** * 获取处理开始时间 */ private Long getProcessStartTime(TaskExecutionContext context) { if (context == null) { return null; } Object time = context.getSharedData().get("processStartTime"); if (time instanceof Number) { return ((Number) time).longValue(); } return null; } /** * 设置处理开始时间 */ private void setProcessStartTime(TaskExecutionContext context, long time) { if (context != null) { context.getSharedData().put("processStartTime", time); } } /** * 清空处理开始时间 */ private void clearProcessStartTime(TaskExecutionContext context) { if (context != null) { context.getSharedData().remove("processStartTime"); } } /** * 设置卧转立扫码暂停标志 */ private void setScannerPause(TaskExecutionContext context, boolean pause) { if (context != null) { context.getSharedData().put("scannerPause", pause); } } private boolean isTaskCancelled(TaskExecutionContext context) { if (context == null) { return false; } Object cancelled = context.getSharedData().get("taskCancelled"); return cancelled instanceof Boolean && (Boolean) cancelled; } /** * 检查是否需要暂停卧转立扫码 */ private boolean shouldPauseScanner(TaskExecutionContext context) { if (context == null) { return false; } Object pauseFlag = context.getSharedData().get("scannerPause"); return pauseFlag instanceof Boolean && (Boolean) pauseFlag; } /** * 获取已扫描的玻璃ID列表 */ @SuppressWarnings("unchecked") private List getScannedGlassIds(TaskExecutionContext context) { if (context == null) { return Collections.emptyList(); } Object glassIds = context.getSharedData().get("scannedGlassIds"); if (glassIds instanceof List) { return new ArrayList<>((List) glassIds); } return Collections.emptyList(); } /** * 清空已扫描的玻璃ID列表 */ private void clearScannedGlassIds(TaskExecutionContext context) { if (context != null) { context.getSharedData().put("scannedGlassIds", new ArrayList<>()); } } /** * 获取卧转立主体已输出、准备上大车的玻璃ID列表 */ @SuppressWarnings("unchecked") private List getTransferReadyGlassIds(TaskExecutionContext context) { if (context == null) { return Collections.emptyList(); } Object glassIds = context.getSharedData().get("transferReadyGlassIds"); if (glassIds instanceof List) { return new ArrayList<>((List) glassIds); } return Collections.emptyList(); } /** * 清空卧转立主体已输出的玻璃ID列表 */ private void clearTransferReadyGlassIds(TaskExecutionContext context) { if (context != null) { context.getSharedData().put("transferReadyGlassIds", new ArrayList<>()); } } /** * 注册定时器任务 */ private void registerScheduledTask(String taskId, ScheduledFuture future) { taskScheduledTasks.computeIfAbsent(taskId, k -> new ArrayList<>()).add(future); } /** * 停止所有定时器任务 */ private void stopScheduledTasks(String taskId) { List> futures = taskScheduledTasks.remove(taskId); if (futures != null) { for (ScheduledFuture future : futures) { if (future != null && !future.isCancelled()) { future.cancel(false); } } log.debug("已停止任务的所有定时器: taskId={}, count={}", taskId, futures.size()); } runningTaskContexts.remove(taskId); } /** * 等待定时器任务完成(带超时) */ private void waitForScheduledTasks(String taskId, TaskExecutionContext context) { // 获取任务超时时间(默认30分钟) TaskParameters params = context.getParameters(); long timeoutMinutes = params != null && params.getTimeoutMinutes() != null ? params.getTimeoutMinutes() : 30; long timeoutMs = timeoutMinutes * 60 * 1000; long deadline = System.currentTimeMillis() + timeoutMs; log.debug("等待定时器任务完成: taskId={}, timeout={}分钟", taskId, timeoutMinutes); while (System.currentTimeMillis() < deadline) { List> futures = taskScheduledTasks.get(taskId); if (futures == null || futures.isEmpty()) { break; } // 检查是否所有任务都已完成 boolean allDone = true; for (ScheduledFuture future : futures) { if (future != null && !future.isDone()) { allDone = false; break; } } if (allDone) { break; } try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; } } log.debug("定时器任务等待完成: taskId={}", taskId); } /** * 当某个步骤可能完成时,检查任务是否所有步骤都已完成,如果是则自动将任务标记为已完成 */ private void checkAndCompleteTaskIfDone(String taskId) { if (taskId == null) { return; } try { MultiDeviceTask task = multiDeviceTaskMapper.selectOne( Wrappers.lambdaQuery() .eq(MultiDeviceTask::getTaskId, taskId) ); if (task == null) { return; } // 仅在任务仍为RUNNING时才尝试自动收尾 if (!MultiDeviceTask.Status.RUNNING.name().equals(task.getStatus())) { return; } int totalSteps = task.getTotalSteps() != null ? task.getTotalSteps() : 0; if (totalSteps <= 0) { return; } int completedSteps = countCompletedSteps(taskId); if (completedSteps < totalSteps) { return; } // 所有步骤都已完成,收尾任务 task.setStatus(MultiDeviceTask.Status.COMPLETED.name()); task.setEndTime(new Date()); multiDeviceTaskMapper.updateById(task); // 停止所有定时器 stopScheduledTasks(taskId); // 通知任务完成 notificationService.notifyTaskStatus(task); log.info("所有步骤已完成,自动将任务标记为已完成: taskId={}, totalSteps={}", taskId, totalSteps); } catch (Exception e) { log.warn("检查并自动完成任务失败: taskId={}", taskId, e); } } /** * 更新步骤状态 */ private void updateStepStatus(TaskStepDetail step, DevicePlcVO.OperationResult result) { if (step == null || result == null) { return; } boolean success = Boolean.TRUE.equals(result.getSuccess()); step.setStatus(success ? TaskStepDetail.Status.COMPLETED.name() : TaskStepDetail.Status.FAILED.name()); // 设置消息:成功时如果有消息也保存,失败时保存错误消息 String message = result.getMessage(); if (success) { // 成功时,如果有消息则保存(用于提示信息),否则清空 step.setSuccessMessage(StringUtils.hasText(message) ? message : null); // 如果状态变为完成,设置结束时间 if (TaskStepDetail.Status.COMPLETED.name().equals(step.getStatus()) && step.getEndTime() == null) { step.setEndTime(new Date()); } } else { // 失败时保存错误消息 step.setErrorMessage(message); // 如果状态变为失败,设置结束时间 if (TaskStepDetail.Status.FAILED.name().equals(step.getStatus()) && step.getEndTime() == null) { step.setEndTime(new Date()); } } step.setOutputData(toJson(result)); taskStepDetailMapper.updateById(step); } /** * 确保步骤进入RUNNING状态(仅在第一次真正执行前调用) */ private void ensureStepRunning(TaskStepDetail step, String taskId) { if (step == null) { return; } if (!TaskStepDetail.Status.RUNNING.name().equals(step.getStatus())) { step.setStatus(TaskStepDetail.Status.RUNNING.name()); if (step.getStartTime() == null) { step.setStartTime(new Date()); } taskStepDetailMapper.updateById(step); notificationService.notifyStepUpdate(taskId, step); } } /** * 更新扫码设备步骤状态(显示进度,保持RUNNING状态直到所有玻璃处理完成) */ private void updateStepStatusForScanner(TaskStepDetail step, DevicePlcVO.OperationResult result, int currentIndex, int totalCount, int successCount, int failCount) { if (step == null || result == null) { return; } boolean success = Boolean.TRUE.equals(result.getSuccess()); // 保持RUNNING状态,直到所有玻璃处理完成(在定时器完成时再设置为COMPLETED) step.setStatus(TaskStepDetail.Status.RUNNING.name()); // 更新时间和耗时,前端可以实时看到执行耗时 Date now = new Date(); if (step.getStartTime() == null) { step.setStartTime(now); } if (step.getStartTime() != null) { step.setDurationMs(now.getTime() - step.getStartTime().getTime()); } // 更新进度信息 String progressMessage = String.format("正在处理 %d/%d (成功:%d, 失败:%d)", currentIndex, totalCount, successCount, failCount); if (success) { // 成功时显示进度和成功消息 String resultMessage = result.getMessage(); if (StringUtils.hasText(resultMessage)) { step.setSuccessMessage(progressMessage + " - " + resultMessage); } else { step.setSuccessMessage(progressMessage); } step.setErrorMessage(null); } else { // 失败时显示进度和错误消息 String errorMessage = result.getMessage(); step.setErrorMessage(progressMessage + " - " + (StringUtils.hasText(errorMessage) ? errorMessage : "处理失败")); step.setSuccessMessage(null); } step.setOutputData(toJson(result)); taskStepDetailMapper.updateById(step); } /** * 更新大车设备步骤状态(保持RUNNING,直到手动停止或任务取消;失败时标记为FAILED) */ private void updateStepStatusForVehicle(TaskStepDetail step, DevicePlcVO.OperationResult result) { if (step == null || result == null) { return; } boolean success = Boolean.TRUE.equals(result.getSuccess()); boolean completed = false; if (result.getData() != null && result.getData().get("completed") != null) { Object flag = result.getData().get("completed"); if (flag instanceof Boolean) { completed = (Boolean) flag; } else { completed = "true".equalsIgnoreCase(String.valueOf(flag)); } } Date now = new Date(); // 初始化开始时间 if (step.getStartTime() == null) { step.setStartTime(now); } boolean waiting = false; String waitingReason = null; if (result.getData() != null) { Object waitingFlag = result.getData().get("waiting"); if (waitingFlag instanceof Boolean) { waiting = (Boolean) waitingFlag; } else if (waitingFlag != null) { waiting = "true".equalsIgnoreCase(String.valueOf(waitingFlag)); } Object reason = result.getData().get("waitingReason"); if (reason != null) { waitingReason = String.valueOf(reason); } } if (success && !completed) { // 成功但未完成:根据waiting状态决定显示为等待还是执行中 if (waiting) { step.setStatus(TaskStepDetail.Status.PENDING.name()); } else { step.setStatus(TaskStepDetail.Status.RUNNING.name()); } String message = result.getMessage(); if (!StringUtils.hasText(message) && waiting) { message = "大车设备等待中" + (StringUtils.hasText(waitingReason) ? "(" + waitingReason + ")" : ""); } step.setSuccessMessage(StringUtils.hasText(message) ? message : (waiting ? "大车设备等待中" : "大车设备运行中")); step.setErrorMessage(null); if (step.getStartTime() != null) { step.setDurationMs(now.getTime() - step.getStartTime().getTime()); } } else if (success && completed) { // 成功且MES已确认完成:标记为COMPLETED并记录结束时间 step.setStatus(TaskStepDetail.Status.COMPLETED.name()); String message = result.getMessage(); step.setSuccessMessage(StringUtils.hasText(message) ? message : "大车设备任务已完成"); step.setErrorMessage(null); if (step.getEndTime() == null) { step.setEndTime(now); } if (step.getStartTime() != null && step.getEndTime() != null) { step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); } // 尝试自动收尾整个任务 checkAndCompleteTaskIfDone(step.getTaskId()); } else { // 失败:标记为FAILED并记录结束时间 step.setStatus(TaskStepDetail.Status.FAILED.name()); String message = result.getMessage(); step.setErrorMessage(message); if (step.getEndTime() == null) { step.setEndTime(now); } if (step.getStartTime() != null && step.getEndTime() != null) { step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); } } step.setOutputData(toJson(result)); taskStepDetailMapper.updateById(step); } /** * 轮询大车设备的MES任务和确认状态 * 无论步骤当前是否为RUNNING/PENDING,都可以调用,用于避免错过MES端后写入的任务 */ private void pollMesForVehicle(MultiDeviceTask task, TaskStepDetail step, DeviceConfig device, TaskExecutionContext context) { try { DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType()); if (handler == null) { return; } Map logicParams = parseLogicParams(device); // 先检查MES任务(如果mesSend=1,会创建任务并开始执行) DevicePlcVO.OperationResult mesTaskResult = null; try { mesTaskResult = handler.execute(device, "checkMesTask", Collections.emptyMap()); if (mesTaskResult != null && Boolean.TRUE.equals(mesTaskResult.getSuccess())) { log.info("大车设备已检查MES任务并开始执行: taskId={}, deviceId={}, message={}", task.getTaskId(), device.getId(), mesTaskResult.getMessage()); } } catch (Exception e) { log.warn("大车设备检查MES任务异常: taskId={}, deviceId={}, error={}", task.getTaskId(), device.getId(), e.getMessage()); } // 然后检查MES确认状态 DevicePlcVO.OperationResult mesResult = null; try { mesResult = handler.execute(device, "checkMesConfirm", Collections.emptyMap()); } catch (Exception e) { log.warn("大车设备检查MES确认状态异常: taskId={}, deviceId={}, error={}", task.getTaskId(), device.getId(), e.getMessage()); } // 更新步骤状态 if (mesResult != null) { updateStepStatusForVehicle(step, mesResult); boolean opSuccess = Boolean.TRUE.equals(mesResult.getSuccess()); updateTaskProgress(task, step.getStepOrder(), opSuccess); if (!opSuccess) { deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); } } } catch (Exception e) { log.warn("轮询大车设备MES任务/确认状态异常: taskId={}, deviceId={}, error={}", task.getTaskId(), device.getId(), e.getMessage()); } } /** * 更新卧转立设备步骤状态(区分等待中和真正完成) */ private void updateStepStatusForTransfer(TaskStepDetail step, DevicePlcVO.OperationResult result, DeviceConfig device, TaskExecutionContext context) { if (step == null || result == null) { return; } boolean success = Boolean.TRUE.equals(result.getSuccess()); String message = result.getMessage(); // 判断是否真正完成: // 1. 写入PLC成功且缓冲已清空(表示所有玻璃已处理完,无新玻璃) // 注意:缓冲队列为空且无待处理玻璃,在任务刚开始时也可能出现,不应该立即标记为完成 // 只有当任务已经运行一段时间,且确实没有玻璃需要处理时,才标记为完成 boolean isRealCompleted = success && message != null && ( (message.contains("批次已写入PLC") && message.contains("缓冲已清空,任务完成")) ); if (isRealCompleted) { // 真正完成:设置为完成状态,并设置结束时间 // 注意:一旦标记为完成,状态不应该再被改变 if (!TaskStepDetail.Status.COMPLETED.name().equals(step.getStatus())) { step.setStatus(TaskStepDetail.Status.COMPLETED.name()); step.setSuccessMessage(message); if (step.getEndTime() == null) { step.setEndTime(new Date()); } // 计算耗时 if (step.getStartTime() != null && step.getEndTime() != null) { step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); } log.info("卧转立设备步骤已完成: stepId={}, durationMs={}, message={}", step.getId(), step.getDurationMs(), message); // 卧转立主体完成后尝试自动收尾整个任务 checkAndCompleteTaskIfDone(step.getTaskId()); // 更新设备状态为已完成 if (device != null) { deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.COMPLETED, context); } } } else if (success && message != null && message.contains("批次已写入PLC")) { // 写入PLC成功但缓冲还有玻璃(车满情况),继续运行 if (!TaskStepDetail.Status.RUNNING.name().equals(step.getStatus())) { step.setStatus(TaskStepDetail.Status.RUNNING.name()); } step.setSuccessMessage(message); // 确保开始时间已设置 if (step.getStartTime() == null) { step.setStartTime(new Date()); } } else if (success) { // 设备正在工作(等待缓冲、等待更多玻璃等),保持RUNNING状态 // 因为定时器已经执行,说明设备已经开始工作了 if (!TaskStepDetail.Status.RUNNING.name().equals(step.getStatus())) { step.setStatus(TaskStepDetail.Status.RUNNING.name()); } step.setSuccessMessage(message); // 确保开始时间已设置(设备已经开始工作) if (step.getStartTime() == null) { step.setStartTime(new Date()); } } else { // 失败:设置为失败状态,并设置结束时间 step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(message); if (step.getEndTime() == null) { step.setEndTime(new Date()); } // 计算耗时 if (step.getStartTime() != null && step.getEndTime() != null) { step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); } } step.setOutputData(toJson(result)); taskStepDetailMapper.updateById(step); } /** * 创建步骤摘要 */ private Map createStepSummary(String deviceName, boolean success, String message) { Map summary = new HashMap<>(); summary.put("deviceName", deviceName); summary.put("success", success); summary.put("message", message); return summary; } /** * 解析设备逻辑参数 */ @SuppressWarnings("unchecked") private Map parseLogicParams(DeviceConfig device) { String extraParams = device.getExtraParams(); if (!StringUtils.hasText(extraParams)) { return Collections.emptyMap(); } try { Map extraParamsMap = objectMapper.readValue(extraParams, MAP_TYPE); Object deviceLogic = extraParamsMap.get("deviceLogic"); if (deviceLogic instanceof Map) { return (Map) deviceLogic; } return Collections.emptyMap(); } catch (Exception e) { log.warn("解析设备逻辑参数失败: deviceId={}", device.getId(), e); return Collections.emptyMap(); } } /** * 并行执行多个设备操作 */ private Pair executeParallel(MultiDeviceTask task, List devices, TaskExecutionContext context, List> stepSummaries, Integer maxConcurrent) { int concurrency = maxConcurrent != null && maxConcurrent > 0 ? Math.min(maxConcurrent, devices.size()) : devices.size(); // 创建所有步骤记录 List steps = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { DeviceConfig device = devices.get(i); int order = i + 1; TaskStepDetail step = createStepRecord(task, device, order); steps.add(step); } // 使用信号量控制并发数 Semaphore semaphore = new Semaphore(concurrency); List> futures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { final int index = i; final DeviceConfig device = devices.get(index); final TaskStepDetail step = steps.get(index); CompletableFuture future = CompletableFuture.supplyAsync(() -> { try { semaphore.acquire(); try { return executeStep(task, step, device, context); } finally { semaphore.release(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("并行执行被中断, deviceId={}", device.getId(), e); return StepResult.failure(device.getDeviceName(), "执行被中断"); } catch (Exception e) { log.error("并行执行异常, deviceId={}", device.getId(), e); return StepResult.failure(device.getDeviceName(), e.getMessage()); } }, executorService); final int finalIndex = index; future.whenComplete((result, throwable) -> { if (throwable != null) { log.error("并行执行完成时异常, deviceId={}", device.getId(), throwable); stepSummaries.set(finalIndex, StepResult.failure(device.getDeviceName(), throwable.getMessage()).toSummary()); } else if (result != null) { stepSummaries.set(finalIndex, result.toSummary()); } }); futures.add(future); } // 等待所有任务完成 CompletableFuture allFutures = CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]) ); try { allFutures.get(30, TimeUnit.MINUTES); // 最多等待30分钟 } catch (TimeoutException e) { log.error("并行执行超时, taskId={}", task.getTaskId(), e); return Pair.of(false, "任务执行超时"); } catch (Exception e) { log.error("等待并行执行完成时异常, taskId={}", task.getTaskId(), e); return Pair.of(false, "等待执行完成时发生异常: " + e.getMessage()); } // 检查所有步骤的执行结果 boolean allSuccess = true; String firstFailureMessage = null; for (int i = 0; i < futures.size(); i++) { try { StepResult result = futures.get(i).get(); if (result != null && !result.isSuccess()) { allSuccess = false; if (firstFailureMessage == null) { firstFailureMessage = result.getMessage(); } } } catch (Exception e) { log.error("获取步骤执行结果异常, stepIndex={}", i, e); allSuccess = false; if (firstFailureMessage == null) { firstFailureMessage = "获取执行结果异常: " + e.getMessage(); } } } return Pair.of(allSuccess, firstFailureMessage); } /** * 确定执行模式 */ private String determineExecutionMode(DeviceGroupConfig groupConfig) { if (groupConfig == null) { return EXECUTION_MODE_SERIAL; // 默认串行 } // 从extraConfig中读取executionMode String extraConfig = groupConfig.getExtraConfig(); if (StringUtils.hasText(extraConfig)) { try { Map config = objectMapper.readValue(extraConfig, MAP_TYPE); Object mode = config.get("executionMode"); if (mode != null) { String modeStr = String.valueOf(mode).toUpperCase(); if (EXECUTION_MODE_PARALLEL.equals(modeStr) || EXECUTION_MODE_SERIAL.equals(modeStr)) { return modeStr; } } } catch (Exception e) { log.warn("解析设备组执行模式失败, groupId={}", groupConfig.getId(), e); } } // 如果有maxConcurrentDevices且大于1,默认使用并行模式 if (groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 1) { return EXECUTION_MODE_PARALLEL; } return EXECUTION_MODE_SERIAL; // 默认串行 } /** * 获取最大并发设备数 */ private Integer getMaxConcurrentDevices(DeviceGroupConfig groupConfig) { if (groupConfig == null) { return 1; } // 从extraConfig中读取maxConcurrent String extraConfig = groupConfig.getExtraConfig(); if (StringUtils.hasText(extraConfig)) { try { Map config = objectMapper.readValue(extraConfig, MAP_TYPE); Object maxConcurrent = config.get("maxConcurrent"); if (maxConcurrent instanceof Number) { return ((Number) maxConcurrent).intValue(); } } catch (Exception e) { log.warn("解析设备组最大并发数失败, groupId={}", groupConfig.getId(), e); } } // 使用实体字段 return groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 0 ? groupConfig.getMaxConcurrentDevices() : 1; } /** * 简单的Pair类用于返回两个值 */ private static class Pair { private final T first; private final U second; private Pair(T first, U second) { this.first = first; this.second = second; } public static Pair of(T first, U second) { return new Pair<>(first, second); } public T getFirst() { return first; } public U getSecond() { return second; } } /** * 分批执行大车设备玻璃上料(当玻璃ID数量超过6个时) */ private StepResult executeLoadVehicleWithBatches(MultiDeviceTask task, DeviceConfig device, int order, TaskExecutionContext context, List> stepSummaries) { List allGlassIds = context.getParameters().getGlassIds(); int batchSize = 6; // 每批最多6个玻璃ID // 分批处理 int totalBatches = (allGlassIds.size() + batchSize - 1) / batchSize; log.debug("大车设备分批上料: deviceId={}, totalGlassIds={}, batchSize={}, totalBatches={}", device.getId(), allGlassIds.size(), batchSize, totalBatches); for (int batchIndex = 0; batchIndex < totalBatches; batchIndex++) { int startIndex = batchIndex * batchSize; int endIndex = Math.min(startIndex + batchSize, allGlassIds.size()); List batchGlassIds = allGlassIds.subList(startIndex, endIndex); // 创建临时参数,只包含当前批次的玻璃ID TaskParameters batchParams = new TaskParameters(); batchParams.setGlassIds(new ArrayList<>(batchGlassIds)); batchParams.setPositionCode(context.getParameters().getPositionCode()); batchParams.setPositionValue(context.getParameters().getPositionValue()); // 创建临时上下文 TaskExecutionContext batchContext = new TaskExecutionContext(batchParams); // 创建步骤记录 TaskStepDetail step = createStepRecord(task, device, order); step.setStepName(step.getStepName() + String.format(" (批次 %d/%d)", batchIndex + 1, totalBatches)); // 执行当前批次 StepResult stepResult = executeStep(task, step, device, batchContext); stepSummaries.add(stepResult.toSummary()); if (!stepResult.isSuccess()) { log.error("大车设备分批上料失败: deviceId={}, batchIndex={}/{}, error={}", device.getId(), batchIndex + 1, totalBatches, stepResult.getMessage()); return stepResult; } log.debug("大车设备分批上料成功: deviceId={}, batchIndex={}/{}, glassIds={}", device.getId(), batchIndex + 1, totalBatches, batchGlassIds); } // 更新上下文中的已加载玻璃ID context.setLoadedGlassIds(new ArrayList<>(allGlassIds)); return StepResult.success(device.getDeviceName(), "分批上料完成,共" + totalBatches + "批"); } private TaskStepDetail createStepRecord(MultiDeviceTask task, DeviceConfig device, int order) { TaskStepDetail step = new TaskStepDetail(); step.setTaskId(task.getTaskId()); step.setStepOrder(order); step.setDeviceId(String.valueOf(device.getId())); step.setStepName(device.getDeviceName()); step.setStatus(TaskStepDetail.Status.PENDING.name()); step.setRetryCount(0); taskStepDetailMapper.insert(step); return step; } private StepResult executeStep(MultiDeviceTask task, TaskStepDetail step, DeviceConfig device, TaskExecutionContext context) { DeviceCoordinationService.DependencyCheckResult dependencyResult = deviceCoordinationService.checkDependencies(device, context); if (!dependencyResult.isSatisfied()) { log.warn("设备依赖未满足: deviceId={}, message={}", device.getId(), dependencyResult.getMessage()); step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(dependencyResult.getMessage()); step.setStartTime(new Date()); step.setEndTime(new Date()); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); return StepResult.failure(device.getDeviceName(), dependencyResult.getMessage()); } return executeStepWithRetry(task, step, device, context, getRetryPolicy(device)); } /** * 带重试的步骤执行 */ private StepResult executeStepWithRetry(MultiDeviceTask task, TaskStepDetail step, DeviceConfig device, TaskExecutionContext context, RetryPolicy retryPolicy) { Date startTime = new Date(); step.setStartTime(startTime); step.setStatus(TaskStepDetail.Status.RUNNING.name()); step.setRetryCount(0); DeviceInteraction deviceInteraction = interactionRegistry.getInteraction(device.getDeviceType()); if (deviceInteraction != null) { return executeInteractionStepWithRetry(task, step, device, context, deviceInteraction, retryPolicy); } Map params = buildOperationParams(device, context); // 将context引用放入params,供设备处理器使用(用于设备协调) params.put("_taskContext", context); log.debug("executeStepWithRetry构建参数: deviceId={}, deviceType={}, operation={}, paramsKeys={}, params={}", device.getId(), device.getDeviceType(), determineOperation(device, params), params.keySet(), params); step.setInputData(toJson(params)); taskStepDetailMapper.updateById(step); String operation = determineOperation(device, params); DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType()); int retryAttempt = 0; Exception lastException = null; while (retryAttempt <= retryPolicy.getMaxRetryCount()) { try { if (retryAttempt > 0) { // 重试前等待 long waitTime = retryPolicy.calculateRetryInterval(retryAttempt); log.debug("步骤执行重试: deviceId={}, operation={}, retryAttempt={}/{}, waitTime={}ms", device.getId(), operation, retryAttempt, retryPolicy.getMaxRetryCount(), waitTime); Thread.sleep(waitTime); // 更新步骤状态 step.setRetryCount(retryAttempt); step.setStatus(TaskStepDetail.Status.RUNNING.name()); step.setStartTime(new Date()); taskStepDetailMapper.updateById(step); } DevicePlcVO.OperationResult result; if (handler == null) { result = deviceInteractionService.executeOperation(device.getId(), operation, params); } else { result = handler.execute(device, operation, params); } boolean opSuccess = Boolean.TRUE.equals(result.getSuccess()); updateStepAfterOperation(step, result, opSuccess); updateTaskProgress(task, step.getStepOrder(), opSuccess); // 通知步骤更新 notificationService.notifyStepUpdate(task.getTaskId(), step); if (opSuccess) { updateContextAfterSuccess(device, context, params, result); // 同步设备状态 deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.COMPLETED, context); return StepResult.success(device.getDeviceName(), result.getMessage()); } else { // 业务失败,判断是否可重试 if (retryAttempt < retryPolicy.getMaxRetryCount() && isRetryableFailure(result)) { retryAttempt++; lastException = new RuntimeException(result.getMessage()); log.warn("步骤执行失败,准备重试: deviceId={}, operation={}, retryAttempt={}, message={}", device.getId(), operation, retryAttempt, result.getMessage()); continue; } // 同步失败状态 deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); return StepResult.failure(device.getDeviceName(), result.getMessage()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("步骤执行被中断, deviceId={}, operation={}", device.getId(), operation, e); step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage("执行被中断: " + e.getMessage()); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); step.setRetryCount(retryAttempt); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); return StepResult.failure(device.getDeviceName(), "执行被中断"); } catch (Exception e) { lastException = e; log.error("设备操作异常, deviceId={}, operation={}, retryAttempt={}", device.getId(), operation, retryAttempt, e); // 判断是否可重试 if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) { retryAttempt++; log.warn("步骤执行异常,准备重试: deviceId={}, operation={}, retryAttempt={}, exception={}", device.getId(), operation, retryAttempt, e.getClass().getSimpleName()); continue; } // 不可重试或达到最大重试次数 step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(e.getMessage()); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); step.setRetryCount(retryAttempt); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); // 通知步骤更新 notificationService.notifyStepUpdate(task.getTaskId(), step); // 同步失败状态 deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); String errorMsg = retryAttempt > 0 ? String.format("执行失败(已重试%d次): %s", retryAttempt, e.getMessage()) : e.getMessage(); return StepResult.failure(device.getDeviceName(), errorMsg); } } // 达到最大重试次数 step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(lastException != null ? lastException.getMessage() : "执行失败"); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); step.setRetryCount(retryAttempt); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); // 通知步骤更新 notificationService.notifyStepUpdate(task.getTaskId(), step); deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); return StepResult.failure(device.getDeviceName(), String.format("执行失败(已重试%d次)", retryAttempt)); } /** * 执行一次简单的设备操作步骤(不走交互引擎),用于触发请求等场景 */ private StepResult executeDirectOperationStep(MultiDeviceTask task, TaskStepDetail step, DeviceConfig device, TaskExecutionContext context, String operation, Map params) { Date startTime = new Date(); step.setStartTime(startTime); step.setStatus(TaskStepDetail.Status.RUNNING.name()); step.setRetryCount(0); step.setInputData(toJson(params)); taskStepDetailMapper.updateById(step); try { DeviceCoordinationService.DependencyCheckResult dependencyResult = deviceCoordinationService.checkDependencies(device, context); if (!dependencyResult.isSatisfied()) { log.warn("直接操作依赖未满足: deviceId={}, message={}", device.getId(), dependencyResult.getMessage()); step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(dependencyResult.getMessage()); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); return StepResult.failure(device.getDeviceName(), dependencyResult.getMessage()); } DevicePlcVO.OperationResult result = deviceInteractionService.executeOperation( device.getId(), operation, params); boolean opSuccess = Boolean.TRUE.equals(result.getSuccess()); updateStepAfterOperation(step, result, opSuccess); updateTaskProgress(task, step.getStepOrder(), opSuccess); if (opSuccess) { updateContextAfterSuccess(device, context, params, result); // 简单同步设备状态为已完成 deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.COMPLETED, context); return StepResult.success(device.getDeviceName(), result.getMessage()); } else { deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); return StepResult.failure(device.getDeviceName(), result.getMessage()); } } catch (Exception e) { log.error("直接设备操作异常, deviceId={}, operation={}", device.getId(), operation, e); step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(e.getMessage()); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); return StepResult.failure(device.getDeviceName(), e.getMessage()); } } /** * 带重试的交互步骤执行 */ private StepResult executeInteractionStepWithRetry(MultiDeviceTask task, TaskStepDetail step, DeviceConfig device, TaskExecutionContext context, DeviceInteraction deviceInteraction, RetryPolicy retryPolicy) { int retryAttempt = 0; Exception lastException = null; while (retryAttempt <= retryPolicy.getMaxRetryCount()) { try { if (retryAttempt > 0) { long waitTime = retryPolicy.calculateRetryInterval(retryAttempt); log.debug("交互步骤执行重试: deviceId={}, retryAttempt={}/{}, waitTime={}ms", device.getId(), retryAttempt, retryPolicy.getMaxRetryCount(), waitTime); Thread.sleep(waitTime); step.setRetryCount(retryAttempt); step.setStatus(TaskStepDetail.Status.RUNNING.name()); step.setStartTime(new Date()); taskStepDetailMapper.updateById(step); } InteractionContext interactionContext = new InteractionContext(device, context); step.setInputData(toJson(context.getParameters())); InteractionResult interactionResult = deviceInteraction.execute(interactionContext); boolean success = interactionResult != null && interactionResult.isSuccess(); updateStepAfterInteraction(step, interactionResult); updateTaskProgress(task, step.getStepOrder(), success); if (success) { deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.COMPLETED, context); return StepResult.success(device.getDeviceName(), interactionResult.getMessage()); } else { if (retryAttempt < retryPolicy.getMaxRetryCount()) { retryAttempt++; continue; } deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); String message = interactionResult != null ? interactionResult.getMessage() : "交互执行失败"; return StepResult.failure(device.getDeviceName(), message); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("交互步骤执行被中断, deviceId={}", device.getId(), e); step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage("执行被中断: " + e.getMessage()); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); step.setRetryCount(retryAttempt); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); return StepResult.failure(device.getDeviceName(), "执行被中断"); } catch (Exception e) { lastException = e; log.error("交互执行异常, deviceId={}, retryAttempt={}", device.getId(), retryAttempt, e); if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) { retryAttempt++; continue; } step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(e.getMessage()); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); step.setRetryCount(retryAttempt); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); // 通知步骤更新 notificationService.notifyStepUpdate(task.getTaskId(), step); deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); String errorMsg = retryAttempt > 0 ? String.format("执行失败(已重试%d次): %s", retryAttempt, e.getMessage()) : e.getMessage(); return StepResult.failure(device.getDeviceName(), errorMsg); } } step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(lastException != null ? lastException.getMessage() : "交互执行失败"); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); step.setRetryCount(retryAttempt); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); // 通知步骤更新 notificationService.notifyStepUpdate(task.getTaskId(), step); deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); return StepResult.failure(device.getDeviceName(), String.format("执行失败(已重试%d次)", retryAttempt)); } /** * 获取重试策略 */ private RetryPolicy getRetryPolicy(DeviceConfig device) { // 可以从设备配置中读取重试策略 // 暂时使用默认策略 return RetryPolicy.defaultPolicy(); } /** * 判断业务失败是否可重试 */ private boolean isRetryableFailure(DevicePlcVO.OperationResult result) { if (result == null || result.getMessage() == null) { return false; } String message = result.getMessage().toLowerCase(); // 网络错误、超时错误可重试 return message.contains("timeout") || message.contains("connection") || message.contains("网络") || message.contains("超时"); } private void updateStepAfterOperation(TaskStepDetail step, DevicePlcVO.OperationResult result, boolean success) { step.setEndTime(new Date()); if (step.getStartTime() != null) { step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); } step.setStatus(success ? TaskStepDetail.Status.COMPLETED.name() : TaskStepDetail.Status.FAILED.name()); // 设置消息:成功时如果有消息也保存,失败时保存错误消息 String message = result != null ? result.getMessage() : null; if (success) { // 成功时,如果有消息则保存(用于提示信息),否则清空 step.setErrorMessage(StringUtils.hasText(message) ? message : null); } else { // 失败时保存错误消息 step.setErrorMessage(message); } step.setOutputData(toJson(result)); taskStepDetailMapper.updateById(step); } private void updateStepAfterInteraction(TaskStepDetail step, InteractionResult result) { step.setEndTime(new Date()); if (step.getStartTime() != null) { step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); } boolean success = result != null && result.isSuccess(); step.setStatus(success ? TaskStepDetail.Status.COMPLETED.name() : TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(success ? null : (result != null ? result.getMessage() : "交互执行失败")); step.setOutputData(result != null ? toJson(result.getData()) : "{}"); taskStepDetailMapper.updateById(step); } private void updateTaskProgress(MultiDeviceTask task, int currentStep, boolean success) { if (!success) { task.setStatus(MultiDeviceTask.Status.FAILED.name()); } // 计算已完成的步骤数(用于进度显示) int completedSteps = countCompletedSteps(task.getTaskId()); int progressStep = success ? completedSteps : Math.max(completedSteps, currentStep); // 失败时至少显示当前步骤 LambdaUpdateWrapper update = Wrappers.lambdaUpdate() .eq(MultiDeviceTask::getId, task.getId()) .set(MultiDeviceTask::getCurrentStep, progressStep); if (!success) { update.set(MultiDeviceTask::getStatus, MultiDeviceTask.Status.FAILED.name()); } multiDeviceTaskMapper.update(null, update); // 更新任务对象的进度,用于通知 task.setCurrentStep(progressStep); // 通知任务状态更新(包含进度信息) notificationService.notifyTaskStatus(task); } /** * 统计已完成的步骤数 */ private int countCompletedSteps(String taskId) { if (taskId == null) { return 0; } try { return taskStepDetailMapper.selectCount( Wrappers.lambdaQuery() .eq(TaskStepDetail::getTaskId, taskId) .eq(TaskStepDetail::getStatus, TaskStepDetail.Status.COMPLETED.name()) ).intValue(); } catch (Exception e) { log.warn("统计已完成步骤数失败: taskId={}", taskId, e); return 0; } } private String determineOperation(DeviceConfig device, Map params) { if (params != null && params.containsKey("operation")) { Object op = params.get("operation"); if (op != null) { return String.valueOf(op); } } return DEFAULT_OPERATIONS.getOrDefault(device.getDeviceType(), "feedGlass"); } private Map buildOperationParams(DeviceConfig device, TaskExecutionContext context) { Map params = new HashMap<>(); TaskParameters taskParams = context.getParameters(); switch (device.getDeviceType()) { case DeviceConfig.DeviceType.LOAD_VEHICLE: params.put("glassIds", new ArrayList<>(taskParams.getGlassIds())); if (StringUtils.hasText(taskParams.getPositionCode())) { params.put("positionCode", taskParams.getPositionCode()); } if (taskParams.getPositionValue() != null) { params.put("positionValue", taskParams.getPositionValue()); } params.put("triggerRequest", true); break; case DeviceConfig.DeviceType.LARGE_GLASS: List source = context.getSafeLoadedGlassIds(); if (CollectionUtils.isEmpty(source)) { source = taskParams.getGlassIds(); } if (!CollectionUtils.isEmpty(source)) { params.put("glassId", source.get(0)); params.put("glassIds", new ArrayList<>(source)); } params.put("processType", taskParams.getProcessType() != null ? taskParams.getProcessType() : 1); params.put("triggerRequest", true); break; case DeviceConfig.DeviceType.WORKSTATION_SCANNER: // 卧转立扫码设备:从任务参数中获取玻璃ID列表,取第一个作为当前要测试的玻璃ID // 注意:扫码设备通常通过定时器执行,但如果通过executeStep执行,也需要传递glassId log.debug("buildOperationParams处理扫码设备: deviceId={}, taskParams.glassIds={}, isEmpty={}", device.getId(), taskParams.getGlassIds(), CollectionUtils.isEmpty(taskParams.getGlassIds())); if (!CollectionUtils.isEmpty(taskParams.getGlassIds())) { params.put("glassId", taskParams.getGlassIds().get(0)); params.put("glassIds", new ArrayList<>(taskParams.getGlassIds())); log.debug("buildOperationParams为扫码设备添加glassId: deviceId={}, glassId={}, glassIdsSize={}", device.getId(), taskParams.getGlassIds().get(0), taskParams.getGlassIds().size()); } else { log.warn("buildOperationParams扫码设备glassIds为空: deviceId={}, taskParams.glassIds={}, taskParams={}", device.getId(), taskParams.getGlassIds(), taskParams); } break; default: if (!CollectionUtils.isEmpty(taskParams.getExtra())) { params.putAll(taskParams.getExtra()); } } mergeOverrides(device, taskParams, params); return params; } private void mergeOverrides(DeviceConfig device, TaskParameters taskParameters, Map params) { if (CollectionUtils.isEmpty(taskParameters.getDeviceOverrides())) { return; } Map override = taskParameters.getDeviceOverrides().get(device.getDeviceType()); if (override == null && StringUtils.hasText(device.getDeviceCode())) { override = taskParameters.getDeviceOverrides().get(device.getDeviceCode()); } if (override != null) { params.putAll(override); } } private void updateContextAfterSuccess(DeviceConfig device, TaskExecutionContext context, Map params, DevicePlcVO.OperationResult result) { List glassIds = extractGlassIds(params); switch (device.getDeviceType()) { case DeviceConfig.DeviceType.WORKSTATION_SCANNER: handleScannerSuccess(context, result); break; case DeviceConfig.DeviceType.LOAD_VEHICLE: context.setLoadedGlassIds(glassIds); // 数据传递:大车设备 -> 下一个设备 if (!CollectionUtils.isEmpty(glassIds)) { Map transferData = new HashMap<>(); transferData.put("glassIds", glassIds); transferData.put("sourceDevice", device.getDeviceCode()); // 这里简化处理,实际应该找到下一个设备 // 在串行模式下,下一个设备会在循环中自动获取 } break; case DeviceConfig.DeviceType.LARGE_GLASS: context.setProcessedGlassIds(glassIds); // 数据传递:大理片 -> 下一个设备 if (!CollectionUtils.isEmpty(glassIds)) { Map transferData = new HashMap<>(); transferData.put("glassIds", glassIds); transferData.put("sourceDevice", device.getDeviceCode()); } break; default: break; } } private void handleScannerSuccess(TaskExecutionContext context, DevicePlcVO.OperationResult result) { List scannerGlassIds = extractGlassIdsFromResult(result); if (CollectionUtils.isEmpty(scannerGlassIds)) { String workLine = resolveWorkLineFromResult(result, context.getParameters()); scannerGlassIds = glassInfoService.getRecentScannedGlassIds( SCANNER_LOOKBACK_MINUTES, SCANNER_LOOKBACK_LIMIT, workLine); } if (!CollectionUtils.isEmpty(scannerGlassIds)) { context.getParameters().setGlassIds(new ArrayList<>(scannerGlassIds)); context.setLoadedGlassIds(new ArrayList<>(scannerGlassIds)); log.debug("卧转立扫码获取到玻璃ID: {}", scannerGlassIds); } else { log.warn("卧转立扫码未获取到玻璃ID,后续设备可能无法执行"); } } private List extractGlassIds(Map params) { if (params == null) { return Collections.emptyList(); } Object glassIds = params.get("glassIds"); if (glassIds instanceof List) { @SuppressWarnings("unchecked") List cast = (List) glassIds; return new ArrayList<>(cast); } Object glassId = params.get("glassId"); if (glassId != null) { return Collections.singletonList(String.valueOf(glassId)); } return Collections.emptyList(); } @SuppressWarnings("unchecked") private List extractGlassIdsFromResult(DevicePlcVO.OperationResult result) { if (result == null || result.getData() == null) { return Collections.emptyList(); } Object data = result.getData().get("glassIds"); if (data instanceof List) { List raw = (List) data; List converted = new ArrayList<>(); for (Object item : raw) { if (item != null) { converted.add(String.valueOf(item)); } } return converted; } if (data instanceof String && StringUtils.hasText((String) data)) { return Collections.singletonList((String) data); } return Collections.emptyList(); } private String resolveWorkLineFromResult(DevicePlcVO.OperationResult result, TaskParameters parameters) { if (result != null && result.getData() != null) { Object workLine = result.getData().get("workLine"); if (workLine != null && StringUtils.hasText(String.valueOf(workLine))) { return String.valueOf(workLine); } } if (parameters != null && !CollectionUtils.isEmpty(parameters.getExtra())) { Object extraWorkLine = parameters.getExtra().get("workLine"); if (extraWorkLine != null) { return String.valueOf(extraWorkLine); } } return null; } private String toJson(Object value) { try { return objectMapper.writeValueAsString(value); } catch (JsonProcessingException e) { return "{}"; } } private static class StepResult { private final boolean success; private final String message; private final String deviceName; private StepResult(boolean success, String message, String deviceName) { this.success = success; this.message = message; this.deviceName = deviceName; } public static StepResult success(String deviceName, String message) { return new StepResult(true, message, deviceName); } public static StepResult failure(String deviceName, String message) { return new StepResult(false, message, deviceName); } public boolean isSuccess() { return success; } public String getMessage() { return message; } public Map toSummary() { Map summary = new HashMap<>(); summary.put("deviceName", deviceName); summary.put("success", success); summary.put("message", message); return summary; } } }