package com.mes.task.service; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.mes.device.entity.DeviceConfig; import com.mes.device.entity.DeviceGroupConfig; import com.mes.interaction.DeviceInteraction; import com.mes.interaction.DeviceInteractionRegistry; import com.mes.interaction.DeviceLogicHandler; import com.mes.interaction.DeviceLogicHandlerFactory; import com.mes.interaction.base.InteractionContext; import com.mes.interaction.base.InteractionResult; import com.mes.device.service.DeviceCoordinationService; import com.mes.device.service.DeviceInteractionService; import com.mes.task.dto.TaskParameters; import com.mes.task.entity.MultiDeviceTask; import com.mes.task.entity.TaskStepDetail; import com.mes.task.mapper.MultiDeviceTaskMapper; import com.mes.task.mapper.TaskStepDetailMapper; import com.mes.task.model.RetryPolicy; import com.mes.task.model.TaskExecutionContext; import com.mes.task.model.TaskExecutionResult; import com.mes.task.service.TaskStatusNotificationService; import com.mes.device.vo.DevicePlcVO; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.util.*; import java.util.concurrent.*; /** * 多设备任务执行引擎 * 支持串行和并行两种执行模式 */ @Slf4j @Component @RequiredArgsConstructor public class TaskExecutionEngine { private static final Map DEFAULT_OPERATIONS = new HashMap<>(); private static final TypeReference> MAP_TYPE = new TypeReference>() {}; // 执行模式常量 private static final String EXECUTION_MODE_SERIAL = "SERIAL"; private static final String EXECUTION_MODE_PARALLEL = "PARALLEL"; static { DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.LOAD_VEHICLE, "feedGlass"); DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.LARGE_GLASS, "processGlass"); DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.GLASS_STORAGE, "storeGlass"); } private final TaskStepDetailMapper taskStepDetailMapper; private final MultiDeviceTaskMapper multiDeviceTaskMapper; private final DeviceInteractionService deviceInteractionService; private final DeviceInteractionRegistry interactionRegistry; private final DeviceLogicHandlerFactory handlerFactory; private final DeviceCoordinationService deviceCoordinationService; private final TaskStatusNotificationService notificationService; private final ObjectMapper objectMapper; // 线程池用于并行执行 private final ExecutorService executorService = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r, "TaskExecutionEngine-Parallel"); t.setDaemon(true); return t; }); public TaskExecutionResult execute(MultiDeviceTask task, DeviceGroupConfig groupConfig, List devices, TaskParameters parameters) { if (CollectionUtils.isEmpty(devices)) { return TaskExecutionResult.failure("设备组未配置设备,无法执行任务", Collections.emptyMap()); } TaskExecutionContext context = new TaskExecutionContext(parameters); // 设备协调:检查依赖关系和执行条件 DeviceCoordinationService.CoordinationResult coordinationResult = deviceCoordinationService.coordinateExecution(groupConfig, devices, context); if (!coordinationResult.canExecute()) { log.warn("设备协调失败: {}", coordinationResult.getMessage()); return TaskExecutionResult.failure(coordinationResult.getMessage(), Collections.emptyMap()); } log.info("设备协调成功: {}", coordinationResult.getMessage()); task.setTotalSteps(devices.size()); task.setStatus(MultiDeviceTask.Status.RUNNING.name()); multiDeviceTaskMapper.updateById(task); // 通知任务开始执行 notificationService.notifyTaskStatus(task); // 确定执行模式 String executionMode = determineExecutionMode(groupConfig); Integer maxConcurrent = getMaxConcurrentDevices(groupConfig); log.info("任务执行模式: {}, 最大并发数: {}, 设备数: {}", executionMode, maxConcurrent, devices.size()); List> stepSummaries; boolean success; String failureMessage; if (EXECUTION_MODE_PARALLEL.equals(executionMode)) { // 并行执行模式 stepSummaries = new ArrayList<>(Collections.nCopies(devices.size(), null)); Pair result = executeParallel(task, devices, context, stepSummaries, maxConcurrent); success = result.getFirst(); failureMessage = result.getSecond(); } else { // 串行执行模式(默认) stepSummaries = new ArrayList<>(); success = true; failureMessage = null; for (int i = 0; i < devices.size(); i++) { DeviceConfig device = devices.get(i); int order = i + 1; TaskStepDetail step = createStepRecord(task, device, order); StepResult stepResult = executeStep(task, step, device, context); stepSummaries.add(stepResult.toSummary()); if (!stepResult.isSuccess()) { success = false; failureMessage = stepResult.getMessage(); break; } } } Map payload = new HashMap<>(); payload.put("steps", stepSummaries); payload.put("groupId", groupConfig.getId()); payload.put("deviceCount", devices.size()); payload.put("executionMode", executionMode); // 更新任务最终状态 if (success) { task.setStatus(MultiDeviceTask.Status.COMPLETED.name()); } else { task.setStatus(MultiDeviceTask.Status.FAILED.name()); task.setErrorMessage(failureMessage); } task.setEndTime(new Date()); multiDeviceTaskMapper.updateById(task); // 通知任务完成 notificationService.notifyTaskStatus(task); if (success) { return TaskExecutionResult.success(payload); } return TaskExecutionResult.failure(failureMessage != null ? failureMessage : "任务执行失败", payload); } /** * 并行执行多个设备操作 */ private Pair executeParallel(MultiDeviceTask task, List devices, TaskExecutionContext context, List> stepSummaries, Integer maxConcurrent) { int concurrency = maxConcurrent != null && maxConcurrent > 0 ? Math.min(maxConcurrent, devices.size()) : devices.size(); // 创建所有步骤记录 List steps = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { DeviceConfig device = devices.get(i); int order = i + 1; TaskStepDetail step = createStepRecord(task, device, order); steps.add(step); } // 使用信号量控制并发数 Semaphore semaphore = new Semaphore(concurrency); List> futures = new ArrayList<>(); for (int i = 0; i < devices.size(); i++) { final int index = i; final DeviceConfig device = devices.get(index); final TaskStepDetail step = steps.get(index); CompletableFuture future = CompletableFuture.supplyAsync(() -> { try { semaphore.acquire(); try { return executeStep(task, step, device, context); } finally { semaphore.release(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("并行执行被中断, deviceId={}", device.getId(), e); return StepResult.failure(device.getDeviceName(), "执行被中断"); } catch (Exception e) { log.error("并行执行异常, deviceId={}", device.getId(), e); return StepResult.failure(device.getDeviceName(), e.getMessage()); } }, executorService); final int finalIndex = index; future.whenComplete((result, throwable) -> { if (throwable != null) { log.error("并行执行完成时异常, deviceId={}", device.getId(), throwable); stepSummaries.set(finalIndex, StepResult.failure(device.getDeviceName(), throwable.getMessage()).toSummary()); } else if (result != null) { stepSummaries.set(finalIndex, result.toSummary()); } }); futures.add(future); } // 等待所有任务完成 CompletableFuture allFutures = CompletableFuture.allOf( futures.toArray(new CompletableFuture[0]) ); try { allFutures.get(30, TimeUnit.MINUTES); // 最多等待30分钟 } catch (TimeoutException e) { log.error("并行执行超时, taskId={}", task.getTaskId(), e); return Pair.of(false, "任务执行超时"); } catch (Exception e) { log.error("等待并行执行完成时异常, taskId={}", task.getTaskId(), e); return Pair.of(false, "等待执行完成时发生异常: " + e.getMessage()); } // 检查所有步骤的执行结果 boolean allSuccess = true; String firstFailureMessage = null; for (int i = 0; i < futures.size(); i++) { try { StepResult result = futures.get(i).get(); if (result != null && !result.isSuccess()) { allSuccess = false; if (firstFailureMessage == null) { firstFailureMessage = result.getMessage(); } } } catch (Exception e) { log.error("获取步骤执行结果异常, stepIndex={}", i, e); allSuccess = false; if (firstFailureMessage == null) { firstFailureMessage = "获取执行结果异常: " + e.getMessage(); } } } return Pair.of(allSuccess, firstFailureMessage); } /** * 确定执行模式 */ private String determineExecutionMode(DeviceGroupConfig groupConfig) { if (groupConfig == null) { return EXECUTION_MODE_SERIAL; // 默认串行 } // 从extraConfig中读取executionMode String extraConfig = groupConfig.getExtraConfig(); if (StringUtils.hasText(extraConfig)) { try { Map config = objectMapper.readValue(extraConfig, MAP_TYPE); Object mode = config.get("executionMode"); if (mode != null) { String modeStr = String.valueOf(mode).toUpperCase(); if (EXECUTION_MODE_PARALLEL.equals(modeStr) || EXECUTION_MODE_SERIAL.equals(modeStr)) { return modeStr; } } } catch (Exception e) { log.warn("解析设备组执行模式失败, groupId={}", groupConfig.getId(), e); } } // 如果有maxConcurrentDevices且大于1,默认使用并行模式 if (groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 1) { return EXECUTION_MODE_PARALLEL; } return EXECUTION_MODE_SERIAL; // 默认串行 } /** * 获取最大并发设备数 */ private Integer getMaxConcurrentDevices(DeviceGroupConfig groupConfig) { if (groupConfig == null) { return 1; } // 从extraConfig中读取maxConcurrent String extraConfig = groupConfig.getExtraConfig(); if (StringUtils.hasText(extraConfig)) { try { Map config = objectMapper.readValue(extraConfig, MAP_TYPE); Object maxConcurrent = config.get("maxConcurrent"); if (maxConcurrent instanceof Number) { return ((Number) maxConcurrent).intValue(); } } catch (Exception e) { log.warn("解析设备组最大并发数失败, groupId={}", groupConfig.getId(), e); } } // 使用实体字段 return groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 0 ? groupConfig.getMaxConcurrentDevices() : 1; } /** * 简单的Pair类用于返回两个值 */ private static class Pair { private final T first; private final U second; private Pair(T first, U second) { this.first = first; this.second = second; } public static Pair of(T first, U second) { return new Pair<>(first, second); } public T getFirst() { return first; } public U getSecond() { return second; } } private TaskStepDetail createStepRecord(MultiDeviceTask task, DeviceConfig device, int order) { TaskStepDetail step = new TaskStepDetail(); step.setTaskId(task.getTaskId()); step.setStepOrder(order); step.setDeviceId(String.valueOf(device.getId())); step.setStepName(device.getDeviceName()); step.setStatus(TaskStepDetail.Status.PENDING.name()); step.setRetryCount(0); taskStepDetailMapper.insert(step); return step; } private StepResult executeStep(MultiDeviceTask task, TaskStepDetail step, DeviceConfig device, TaskExecutionContext context) { return executeStepWithRetry(task, step, device, context, getRetryPolicy(device)); } /** * 带重试的步骤执行 */ private StepResult executeStepWithRetry(MultiDeviceTask task, TaskStepDetail step, DeviceConfig device, TaskExecutionContext context, RetryPolicy retryPolicy) { Date startTime = new Date(); step.setStartTime(startTime); step.setStatus(TaskStepDetail.Status.RUNNING.name()); step.setRetryCount(0); DeviceInteraction deviceInteraction = interactionRegistry.getInteraction(device.getDeviceType()); if (deviceInteraction != null) { return executeInteractionStepWithRetry(task, step, device, context, deviceInteraction, retryPolicy); } Map params = buildOperationParams(device, context); step.setInputData(toJson(params)); taskStepDetailMapper.updateById(step); String operation = determineOperation(device, params); DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType()); int retryAttempt = 0; Exception lastException = null; while (retryAttempt <= retryPolicy.getMaxRetryCount()) { try { if (retryAttempt > 0) { // 重试前等待 long waitTime = retryPolicy.calculateRetryInterval(retryAttempt); log.info("步骤执行重试: deviceId={}, operation={}, retryAttempt={}/{}, waitTime={}ms", device.getId(), operation, retryAttempt, retryPolicy.getMaxRetryCount(), waitTime); Thread.sleep(waitTime); // 更新步骤状态 step.setRetryCount(retryAttempt); step.setStatus(TaskStepDetail.Status.RUNNING.name()); step.setStartTime(new Date()); taskStepDetailMapper.updateById(step); } DevicePlcVO.OperationResult result; if (handler == null) { result = deviceInteractionService.executeOperation(device.getId(), operation, params); } else { result = handler.execute(device, operation, params); } boolean opSuccess = Boolean.TRUE.equals(result.getSuccess()); updateStepAfterOperation(step, result, opSuccess); updateTaskProgress(task, step.getStepOrder(), opSuccess); // 通知步骤更新 notificationService.notifyStepUpdate(task.getTaskId(), step); if (opSuccess) { updateContextAfterSuccess(device, context, params); // 同步设备状态 deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.COMPLETED, context); return StepResult.success(device.getDeviceName(), result.getMessage()); } else { // 业务失败,判断是否可重试 if (retryAttempt < retryPolicy.getMaxRetryCount() && isRetryableFailure(result)) { retryAttempt++; lastException = new RuntimeException(result.getMessage()); log.warn("步骤执行失败,准备重试: deviceId={}, operation={}, retryAttempt={}, message={}", device.getId(), operation, retryAttempt, result.getMessage()); continue; } // 同步失败状态 deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); return StepResult.failure(device.getDeviceName(), result.getMessage()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("步骤执行被中断, deviceId={}, operation={}", device.getId(), operation, e); step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage("执行被中断: " + e.getMessage()); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); step.setRetryCount(retryAttempt); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); return StepResult.failure(device.getDeviceName(), "执行被中断"); } catch (Exception e) { lastException = e; log.error("设备操作异常, deviceId={}, operation={}, retryAttempt={}", device.getId(), operation, retryAttempt, e); // 判断是否可重试 if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) { retryAttempt++; log.warn("步骤执行异常,准备重试: deviceId={}, operation={}, retryAttempt={}, exception={}", device.getId(), operation, retryAttempt, e.getClass().getSimpleName()); continue; } // 不可重试或达到最大重试次数 step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(e.getMessage()); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); step.setRetryCount(retryAttempt); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); // 通知步骤更新 notificationService.notifyStepUpdate(task.getTaskId(), step); // 同步失败状态 deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); String errorMsg = retryAttempt > 0 ? String.format("执行失败(已重试%d次): %s", retryAttempt, e.getMessage()) : e.getMessage(); return StepResult.failure(device.getDeviceName(), errorMsg); } } // 达到最大重试次数 step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(lastException != null ? lastException.getMessage() : "执行失败"); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); step.setRetryCount(retryAttempt); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); // 通知步骤更新 notificationService.notifyStepUpdate(task.getTaskId(), step); deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); return StepResult.failure(device.getDeviceName(), String.format("执行失败(已重试%d次)", retryAttempt)); } /** * 带重试的交互步骤执行 */ private StepResult executeInteractionStepWithRetry(MultiDeviceTask task, TaskStepDetail step, DeviceConfig device, TaskExecutionContext context, DeviceInteraction deviceInteraction, RetryPolicy retryPolicy) { int retryAttempt = 0; Exception lastException = null; while (retryAttempt <= retryPolicy.getMaxRetryCount()) { try { if (retryAttempt > 0) { long waitTime = retryPolicy.calculateRetryInterval(retryAttempt); log.info("交互步骤执行重试: deviceId={}, retryAttempt={}/{}, waitTime={}ms", device.getId(), retryAttempt, retryPolicy.getMaxRetryCount(), waitTime); Thread.sleep(waitTime); step.setRetryCount(retryAttempt); step.setStatus(TaskStepDetail.Status.RUNNING.name()); step.setStartTime(new Date()); taskStepDetailMapper.updateById(step); } InteractionContext interactionContext = new InteractionContext(device, context); step.setInputData(toJson(context.getParameters())); InteractionResult interactionResult = deviceInteraction.execute(interactionContext); boolean success = interactionResult != null && interactionResult.isSuccess(); updateStepAfterInteraction(step, interactionResult); updateTaskProgress(task, step.getStepOrder(), success); if (success) { deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.COMPLETED, context); return StepResult.success(device.getDeviceName(), interactionResult.getMessage()); } else { if (retryAttempt < retryPolicy.getMaxRetryCount()) { retryAttempt++; continue; } deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); String message = interactionResult != null ? interactionResult.getMessage() : "交互执行失败"; return StepResult.failure(device.getDeviceName(), message); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("交互步骤执行被中断, deviceId={}", device.getId(), e); step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage("执行被中断: " + e.getMessage()); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); step.setRetryCount(retryAttempt); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); return StepResult.failure(device.getDeviceName(), "执行被中断"); } catch (Exception e) { lastException = e; log.error("交互执行异常, deviceId={}, retryAttempt={}", device.getId(), retryAttempt, e); if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) { retryAttempt++; continue; } step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(e.getMessage()); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); step.setRetryCount(retryAttempt); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); // 通知步骤更新 notificationService.notifyStepUpdate(task.getTaskId(), step); deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); String errorMsg = retryAttempt > 0 ? String.format("执行失败(已重试%d次): %s", retryAttempt, e.getMessage()) : e.getMessage(); return StepResult.failure(device.getDeviceName(), errorMsg); } } step.setStatus(TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(lastException != null ? lastException.getMessage() : "交互执行失败"); step.setEndTime(new Date()); step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); step.setRetryCount(retryAttempt); taskStepDetailMapper.updateById(step); updateTaskProgress(task, step.getStepOrder(), false); // 通知步骤更新 notificationService.notifyStepUpdate(task.getTaskId(), step); deviceCoordinationService.syncDeviceStatus(device, DeviceCoordinationService.DeviceStatus.FAILED, context); return StepResult.failure(device.getDeviceName(), String.format("执行失败(已重试%d次)", retryAttempt)); } /** * 获取重试策略 */ private RetryPolicy getRetryPolicy(DeviceConfig device) { // 可以从设备配置中读取重试策略 // 暂时使用默认策略 return RetryPolicy.defaultPolicy(); } /** * 判断业务失败是否可重试 */ private boolean isRetryableFailure(DevicePlcVO.OperationResult result) { if (result == null || result.getMessage() == null) { return false; } String message = result.getMessage().toLowerCase(); // 网络错误、超时错误可重试 return message.contains("timeout") || message.contains("connection") || message.contains("网络") || message.contains("超时"); } private void updateStepAfterOperation(TaskStepDetail step, DevicePlcVO.OperationResult result, boolean success) { step.setEndTime(new Date()); if (step.getStartTime() != null) { step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); } step.setStatus(success ? TaskStepDetail.Status.COMPLETED.name() : TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(success ? null : result.getMessage()); step.setOutputData(toJson(result)); taskStepDetailMapper.updateById(step); } private void updateStepAfterInteraction(TaskStepDetail step, InteractionResult result) { step.setEndTime(new Date()); if (step.getStartTime() != null) { step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); } boolean success = result != null && result.isSuccess(); step.setStatus(success ? TaskStepDetail.Status.COMPLETED.name() : TaskStepDetail.Status.FAILED.name()); step.setErrorMessage(success ? null : (result != null ? result.getMessage() : "交互执行失败")); step.setOutputData(result != null ? toJson(result.getData()) : "{}"); taskStepDetailMapper.updateById(step); } private void updateTaskProgress(MultiDeviceTask task, int currentStep, boolean success) { task.setCurrentStep(currentStep); if (!success) { task.setStatus(MultiDeviceTask.Status.FAILED.name()); } LambdaUpdateWrapper update = Wrappers.lambdaUpdate() .eq(MultiDeviceTask::getId, task.getId()) .set(MultiDeviceTask::getCurrentStep, currentStep); if (!success) { update.set(MultiDeviceTask::getStatus, MultiDeviceTask.Status.FAILED.name()); } multiDeviceTaskMapper.update(null, update); // 通知任务状态更新 notificationService.notifyTaskStatus(task); } private String determineOperation(DeviceConfig device, Map params) { if (params != null && params.containsKey("operation")) { Object op = params.get("operation"); if (op != null) { return String.valueOf(op); } } return DEFAULT_OPERATIONS.getOrDefault(device.getDeviceType(), "feedGlass"); } private Map buildOperationParams(DeviceConfig device, TaskExecutionContext context) { Map params = new HashMap<>(); TaskParameters taskParams = context.getParameters(); switch (device.getDeviceType()) { case DeviceConfig.DeviceType.LOAD_VEHICLE: params.put("glassIds", new ArrayList<>(taskParams.getGlassIds())); if (StringUtils.hasText(taskParams.getPositionCode())) { params.put("positionCode", taskParams.getPositionCode()); } if (taskParams.getPositionValue() != null) { params.put("positionValue", taskParams.getPositionValue()); } params.put("triggerRequest", true); break; case DeviceConfig.DeviceType.LARGE_GLASS: List source = context.getSafeLoadedGlassIds(); if (CollectionUtils.isEmpty(source)) { source = taskParams.getGlassIds(); } if (!CollectionUtils.isEmpty(source)) { params.put("glassId", source.get(0)); params.put("glassIds", new ArrayList<>(source)); } params.put("processType", taskParams.getProcessType() != null ? taskParams.getProcessType() : 1); params.put("triggerRequest", true); break; case DeviceConfig.DeviceType.GLASS_STORAGE: List processed = context.getSafeProcessedGlassIds(); if (CollectionUtils.isEmpty(processed)) { processed = context.getSafeLoadedGlassIds(); } if (!CollectionUtils.isEmpty(processed)) { params.put("glassId", processed.get(0)); params.put("glassIds", new ArrayList<>(processed)); } if (taskParams.getStoragePosition() != null) { params.put("storagePosition", taskParams.getStoragePosition()); } params.put("triggerRequest", true); break; default: if (!CollectionUtils.isEmpty(taskParams.getExtra())) { params.putAll(taskParams.getExtra()); } } mergeOverrides(device, taskParams, params); return params; } private void mergeOverrides(DeviceConfig device, TaskParameters taskParameters, Map params) { if (CollectionUtils.isEmpty(taskParameters.getDeviceOverrides())) { return; } Map override = taskParameters.getDeviceOverrides().get(device.getDeviceType()); if (override == null && StringUtils.hasText(device.getDeviceCode())) { override = taskParameters.getDeviceOverrides().get(device.getDeviceCode()); } if (override != null) { params.putAll(override); } } private void updateContextAfterSuccess(DeviceConfig device, TaskExecutionContext context, Map params) { List glassIds = extractGlassIds(params); switch (device.getDeviceType()) { case DeviceConfig.DeviceType.LOAD_VEHICLE: context.setLoadedGlassIds(glassIds); // 数据传递:大车设备 -> 下一个设备 if (!CollectionUtils.isEmpty(glassIds)) { Map transferData = new HashMap<>(); transferData.put("glassIds", glassIds); transferData.put("sourceDevice", device.getDeviceCode()); // 这里简化处理,实际应该找到下一个设备 // 在串行模式下,下一个设备会在循环中自动获取 } break; case DeviceConfig.DeviceType.LARGE_GLASS: context.setProcessedGlassIds(glassIds); // 数据传递:大理片 -> 下一个设备 if (!CollectionUtils.isEmpty(glassIds)) { Map transferData = new HashMap<>(); transferData.put("glassIds", glassIds); transferData.put("sourceDevice", device.getDeviceCode()); } break; default: break; } } private List extractGlassIds(Map params) { if (params == null) { return Collections.emptyList(); } Object glassIds = params.get("glassIds"); if (glassIds instanceof List) { @SuppressWarnings("unchecked") List cast = (List) glassIds; return new ArrayList<>(cast); } Object glassId = params.get("glassId"); if (glassId != null) { return Collections.singletonList(String.valueOf(glassId)); } return Collections.emptyList(); } private String toJson(Object value) { try { return objectMapper.writeValueAsString(value); } catch (JsonProcessingException e) { return "{}"; } } private static class StepResult { private final boolean success; private final String message; private final String deviceName; private StepResult(boolean success, String message, String deviceName) { this.success = success; this.message = message; this.deviceName = deviceName; } public static StepResult success(String deviceName, String message) { return new StepResult(true, message, deviceName); } public static StepResult failure(String deviceName, String message) { return new StepResult(false, message, deviceName); } public boolean isSuccess() { return success; } public String getMessage() { return message; } public Map toSummary() { Map summary = new HashMap<>(); summary.put("deviceName", deviceName); summary.put("success", success); summary.put("message", message); return summary; } } }