huang
2025-11-20 366ba040d2447bacd3455299425e3166f1f992bb
mes-processes/mes-plcSend/src/main/java/com/mes/task/service/TaskExecutionEngine.java
@@ -3,6 +3,7 @@
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mes.device.entity.DeviceConfig;
import com.mes.device.entity.DeviceGroupConfig;
@@ -12,14 +13,17 @@
import com.mes.interaction.DeviceLogicHandlerFactory;
import com.mes.interaction.base.InteractionContext;
import com.mes.interaction.base.InteractionResult;
import com.mes.device.service.DeviceCoordinationService;
import com.mes.device.service.DeviceInteractionService;
import com.mes.task.dto.TaskParameters;
import com.mes.task.entity.MultiDeviceTask;
import com.mes.task.entity.TaskStepDetail;
import com.mes.task.mapper.MultiDeviceTaskMapper;
import com.mes.task.mapper.TaskStepDetailMapper;
import com.mes.task.model.RetryPolicy;
import com.mes.task.model.TaskExecutionContext;
import com.mes.task.model.TaskExecutionResult;
import com.mes.task.service.TaskStatusNotificationService;
import com.mes.device.vo.DevicePlcVO;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -28,9 +32,11 @@
import org.springframework.util.StringUtils;
import java.util.*;
import java.util.concurrent.*;
/**
 * 多设备任务执行引擎
 * 支持串行和并行两种执行模式
 */
@Slf4j
@Component
@@ -38,6 +44,11 @@
public class TaskExecutionEngine {
    private static final Map<String, String> DEFAULT_OPERATIONS = new HashMap<>();
    private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<Map<String, Object>>() {};
    // 执行模式常量
    private static final String EXECUTION_MODE_SERIAL = "SERIAL";
    private static final String EXECUTION_MODE_PARALLEL = "PARALLEL";
    static {
        DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.LOAD_VEHICLE, "feedGlass");
@@ -50,7 +61,16 @@
    private final DeviceInteractionService deviceInteractionService;
    private final DeviceInteractionRegistry interactionRegistry;
    private final DeviceLogicHandlerFactory handlerFactory;
    private final DeviceCoordinationService deviceCoordinationService;
    private final TaskStatusNotificationService notificationService;
    private final ObjectMapper objectMapper;
    // 线程池用于并行执行
    private final ExecutorService executorService = Executors.newCachedThreadPool(r -> {
        Thread t = new Thread(r, "TaskExecutionEngine-Parallel");
        t.setDaemon(true);
        return t;
    });
    public TaskExecutionResult execute(MultiDeviceTask task,
                                       DeviceGroupConfig groupConfig,
@@ -62,24 +82,55 @@
        }
        TaskExecutionContext context = new TaskExecutionContext(parameters);
        // 设备协调:检查依赖关系和执行条件
        DeviceCoordinationService.CoordinationResult coordinationResult =
            deviceCoordinationService.coordinateExecution(groupConfig, devices, context);
        if (!coordinationResult.canExecute()) {
            log.warn("设备协调失败: {}", coordinationResult.getMessage());
            return TaskExecutionResult.failure(coordinationResult.getMessage(), Collections.emptyMap());
        }
        log.info("设备协调成功: {}", coordinationResult.getMessage());
        task.setTotalSteps(devices.size());
        task.setStatus(MultiDeviceTask.Status.RUNNING.name());
        multiDeviceTaskMapper.updateById(task);
        // 通知任务开始执行
        notificationService.notifyTaskStatus(task);
        // 确定执行模式
        String executionMode = determineExecutionMode(groupConfig);
        Integer maxConcurrent = getMaxConcurrentDevices(groupConfig);
        List<Map<String, Object>> stepSummaries = new ArrayList<>();
        boolean success = true;
        String failureMessage = null;
        log.info("任务执行模式: {}, 最大并发数: {}, 设备数: {}", executionMode, maxConcurrent, devices.size());
        for (int i = 0; i < devices.size(); i++) {
            DeviceConfig device = devices.get(i);
            int order = i + 1;
            TaskStepDetail step = createStepRecord(task, device, order);
            StepResult stepResult = executeStep(task, step, device, context);
            stepSummaries.add(stepResult.toSummary());
            if (!stepResult.isSuccess()) {
                success = false;
                failureMessage = stepResult.getMessage();
                break;
        List<Map<String, Object>> stepSummaries;
        boolean success;
        String failureMessage;
        if (EXECUTION_MODE_PARALLEL.equals(executionMode)) {
            // 并行执行模式
            stepSummaries = new ArrayList<>(Collections.nCopies(devices.size(), null));
            Pair<Boolean, String> result = executeParallel(task, devices, context, stepSummaries, maxConcurrent);
            success = result.getFirst();
            failureMessage = result.getSecond();
        } else {
            // 串行执行模式(默认)
            stepSummaries = new ArrayList<>();
            success = true;
            failureMessage = null;
            for (int i = 0; i < devices.size(); i++) {
                DeviceConfig device = devices.get(i);
                int order = i + 1;
                TaskStepDetail step = createStepRecord(task, device, order);
                StepResult stepResult = executeStep(task, step, device, context);
                stepSummaries.add(stepResult.toSummary());
                if (!stepResult.isSuccess()) {
                    success = false;
                    failureMessage = stepResult.getMessage();
                    break;
                }
            }
        }
@@ -87,11 +138,212 @@
        payload.put("steps", stepSummaries);
        payload.put("groupId", groupConfig.getId());
        payload.put("deviceCount", devices.size());
        payload.put("executionMode", executionMode);
        // 更新任务最终状态
        if (success) {
            task.setStatus(MultiDeviceTask.Status.COMPLETED.name());
        } else {
            task.setStatus(MultiDeviceTask.Status.FAILED.name());
            task.setErrorMessage(failureMessage);
        }
        task.setEndTime(new Date());
        multiDeviceTaskMapper.updateById(task);
        // 通知任务完成
        notificationService.notifyTaskStatus(task);
        if (success) {
            return TaskExecutionResult.success(payload);
        }
        return TaskExecutionResult.failure(failureMessage != null ? failureMessage : "任务执行失败", payload);
    }
    /**
     * 并行执行多个设备操作
     */
    private Pair<Boolean, String> executeParallel(MultiDeviceTask task,
                                                   List<DeviceConfig> devices,
                                                   TaskExecutionContext context,
                                                   List<Map<String, Object>> stepSummaries,
                                                   Integer maxConcurrent) {
        int concurrency = maxConcurrent != null && maxConcurrent > 0
            ? Math.min(maxConcurrent, devices.size())
            : devices.size();
        // 创建所有步骤记录
        List<TaskStepDetail> steps = new ArrayList<>();
        for (int i = 0; i < devices.size(); i++) {
            DeviceConfig device = devices.get(i);
            int order = i + 1;
            TaskStepDetail step = createStepRecord(task, device, order);
            steps.add(step);
        }
        // 使用信号量控制并发数
        Semaphore semaphore = new Semaphore(concurrency);
        List<CompletableFuture<StepResult>> futures = new ArrayList<>();
        for (int i = 0; i < devices.size(); i++) {
            final int index = i;
            final DeviceConfig device = devices.get(index);
            final TaskStepDetail step = steps.get(index);
            CompletableFuture<StepResult> future = CompletableFuture.supplyAsync(() -> {
                try {
                    semaphore.acquire();
                    try {
                        return executeStep(task, step, device, context);
                    } finally {
                        semaphore.release();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.error("并行执行被中断, deviceId={}", device.getId(), e);
                    return StepResult.failure(device.getDeviceName(), "执行被中断");
                } catch (Exception e) {
                    log.error("并行执行异常, deviceId={}", device.getId(), e);
                    return StepResult.failure(device.getDeviceName(), e.getMessage());
                }
            }, executorService);
            final int finalIndex = index;
            future.whenComplete((result, throwable) -> {
                if (throwable != null) {
                    log.error("并行执行完成时异常, deviceId={}", device.getId(), throwable);
                    stepSummaries.set(finalIndex, StepResult.failure(device.getDeviceName(),
                        throwable.getMessage()).toSummary());
                } else if (result != null) {
                    stepSummaries.set(finalIndex, result.toSummary());
                }
            });
            futures.add(future);
        }
        // 等待所有任务完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        );
        try {
            allFutures.get(30, TimeUnit.MINUTES); // 最多等待30分钟
        } catch (TimeoutException e) {
            log.error("并行执行超时, taskId={}", task.getTaskId(), e);
            return Pair.of(false, "任务执行超时");
        } catch (Exception e) {
            log.error("等待并行执行完成时异常, taskId={}", task.getTaskId(), e);
            return Pair.of(false, "等待执行完成时发生异常: " + e.getMessage());
        }
        // 检查所有步骤的执行结果
        boolean allSuccess = true;
        String firstFailureMessage = null;
        for (int i = 0; i < futures.size(); i++) {
            try {
                StepResult result = futures.get(i).get();
                if (result != null && !result.isSuccess()) {
                    allSuccess = false;
                    if (firstFailureMessage == null) {
                        firstFailureMessage = result.getMessage();
                    }
                }
            } catch (Exception e) {
                log.error("获取步骤执行结果异常, stepIndex={}", i, e);
                allSuccess = false;
                if (firstFailureMessage == null) {
                    firstFailureMessage = "获取执行结果异常: " + e.getMessage();
                }
            }
        }
        return Pair.of(allSuccess, firstFailureMessage);
    }
    /**
     * 确定执行模式
     */
    private String determineExecutionMode(DeviceGroupConfig groupConfig) {
        if (groupConfig == null) {
            return EXECUTION_MODE_SERIAL; // 默认串行
        }
        // 从extraConfig中读取executionMode
        String extraConfig = groupConfig.getExtraConfig();
        if (StringUtils.hasText(extraConfig)) {
            try {
                Map<String, Object> config = objectMapper.readValue(extraConfig, MAP_TYPE);
                Object mode = config.get("executionMode");
                if (mode != null) {
                    String modeStr = String.valueOf(mode).toUpperCase();
                    if (EXECUTION_MODE_PARALLEL.equals(modeStr) || EXECUTION_MODE_SERIAL.equals(modeStr)) {
                        return modeStr;
                    }
                }
            } catch (Exception e) {
                log.warn("解析设备组执行模式失败, groupId={}", groupConfig.getId(), e);
            }
        }
        // 如果有maxConcurrentDevices且大于1,默认使用并行模式
        if (groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 1) {
            return EXECUTION_MODE_PARALLEL;
        }
        return EXECUTION_MODE_SERIAL; // 默认串行
    }
    /**
     * 获取最大并发设备数
     */
    private Integer getMaxConcurrentDevices(DeviceGroupConfig groupConfig) {
        if (groupConfig == null) {
            return 1;
        }
        // 从extraConfig中读取maxConcurrent
        String extraConfig = groupConfig.getExtraConfig();
        if (StringUtils.hasText(extraConfig)) {
            try {
                Map<String, Object> config = objectMapper.readValue(extraConfig, MAP_TYPE);
                Object maxConcurrent = config.get("maxConcurrent");
                if (maxConcurrent instanceof Number) {
                    return ((Number) maxConcurrent).intValue();
                }
            } catch (Exception e) {
                log.warn("解析设备组最大并发数失败, groupId={}", groupConfig.getId(), e);
            }
        }
        // 使用实体字段
        return groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 0
            ? groupConfig.getMaxConcurrentDevices()
            : 1;
    }
    /**
     * 简单的Pair类用于返回两个值
     */
    private static class Pair<T, U> {
        private final T first;
        private final U second;
        private Pair(T first, U second) {
            this.first = first;
            this.second = second;
        }
        public static <T, U> Pair<T, U> of(T first, U second) {
            return new Pair<>(first, second);
        }
        public T getFirst() {
            return first;
        }
        public U getSecond() {
            return second;
        }
    }
    private TaskStepDetail createStepRecord(MultiDeviceTask task, DeviceConfig device, int order) {
@@ -110,13 +362,25 @@
                                   TaskStepDetail step,
                                   DeviceConfig device,
                                   TaskExecutionContext context) {
        return executeStepWithRetry(task, step, device, context, getRetryPolicy(device));
    }
    /**
     * 带重试的步骤执行
     */
    private StepResult executeStepWithRetry(MultiDeviceTask task,
                                            TaskStepDetail step,
                                            DeviceConfig device,
                                            TaskExecutionContext context,
                                            RetryPolicy retryPolicy) {
        Date startTime = new Date();
        step.setStartTime(startTime);
        step.setStatus(TaskStepDetail.Status.RUNNING.name());
        step.setRetryCount(0);
        DeviceInteraction deviceInteraction = interactionRegistry.getInteraction(device.getDeviceType());
        if (deviceInteraction != null) {
            return executeInteractionStep(task, step, device, context, deviceInteraction);
            return executeInteractionStepWithRetry(task, step, device, context, deviceInteraction, retryPolicy);
        }
        Map<String, Object> params = buildOperationParams(device, context);
@@ -125,65 +389,260 @@
        String operation = determineOperation(device, params);
        DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType());
        DevicePlcVO.OperationResult result;
        int retryAttempt = 0;
        Exception lastException = null;
        while (retryAttempt <= retryPolicy.getMaxRetryCount()) {
            try {
                if (retryAttempt > 0) {
                    // 重试前等待
                    long waitTime = retryPolicy.calculateRetryInterval(retryAttempt);
                    log.info("步骤执行重试: deviceId={}, operation={}, retryAttempt={}/{}, waitTime={}ms",
                        device.getId(), operation, retryAttempt, retryPolicy.getMaxRetryCount(), waitTime);
                    Thread.sleep(waitTime);
                    // 更新步骤状态
                    step.setRetryCount(retryAttempt);
                    step.setStatus(TaskStepDetail.Status.RUNNING.name());
                    step.setStartTime(new Date());
                    taskStepDetailMapper.updateById(step);
                }
        try {
            if (handler == null) {
                result = deviceInteractionService.executeOperation(device.getId(), operation, params);
            } else {
                result = handler.execute(device, operation, params);
                DevicePlcVO.OperationResult result;
                if (handler == null) {
                    result = deviceInteractionService.executeOperation(device.getId(), operation, params);
                } else {
                    result = handler.execute(device, operation, params);
                }
                boolean opSuccess = Boolean.TRUE.equals(result.getSuccess());
                updateStepAfterOperation(step, result, opSuccess);
                updateTaskProgress(task, step.getStepOrder(), opSuccess);
                // 通知步骤更新
                notificationService.notifyStepUpdate(task.getTaskId(), step);
                if (opSuccess) {
                    updateContextAfterSuccess(device, context, params);
                    // 同步设备状态
                    deviceCoordinationService.syncDeviceStatus(device,
                        DeviceCoordinationService.DeviceStatus.COMPLETED, context);
                    return StepResult.success(device.getDeviceName(), result.getMessage());
                } else {
                    // 业务失败,判断是否可重试
                    if (retryAttempt < retryPolicy.getMaxRetryCount() && isRetryableFailure(result)) {
                        retryAttempt++;
                        lastException = new RuntimeException(result.getMessage());
                        log.warn("步骤执行失败,准备重试: deviceId={}, operation={}, retryAttempt={}, message={}",
                            device.getId(), operation, retryAttempt, result.getMessage());
                        continue;
                    }
                    // 同步失败状态
                    deviceCoordinationService.syncDeviceStatus(device,
                        DeviceCoordinationService.DeviceStatus.FAILED, context);
                    return StepResult.failure(device.getDeviceName(), result.getMessage());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("步骤执行被中断, deviceId={}, operation={}", device.getId(), operation, e);
                step.setStatus(TaskStepDetail.Status.FAILED.name());
                step.setErrorMessage("执行被中断: " + e.getMessage());
                step.setEndTime(new Date());
                step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
                step.setRetryCount(retryAttempt);
                taskStepDetailMapper.updateById(step);
                updateTaskProgress(task, step.getStepOrder(), false);
                return StepResult.failure(device.getDeviceName(), "执行被中断");
            } catch (Exception e) {
                lastException = e;
                log.error("设备操作异常, deviceId={}, operation={}, retryAttempt={}",
                    device.getId(), operation, retryAttempt, e);
                // 判断是否可重试
                if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) {
                    retryAttempt++;
                    log.warn("步骤执行异常,准备重试: deviceId={}, operation={}, retryAttempt={}, exception={}",
                        device.getId(), operation, retryAttempt, e.getClass().getSimpleName());
                    continue;
                }
                // 不可重试或达到最大重试次数
                step.setStatus(TaskStepDetail.Status.FAILED.name());
                step.setErrorMessage(e.getMessage());
                step.setEndTime(new Date());
                step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
                step.setRetryCount(retryAttempt);
                taskStepDetailMapper.updateById(step);
                updateTaskProgress(task, step.getStepOrder(), false);
                // 通知步骤更新
                notificationService.notifyStepUpdate(task.getTaskId(), step);
                // 同步失败状态
                deviceCoordinationService.syncDeviceStatus(device,
                    DeviceCoordinationService.DeviceStatus.FAILED, context);
                String errorMsg = retryAttempt > 0
                    ? String.format("执行失败(已重试%d次): %s", retryAttempt, e.getMessage())
                    : e.getMessage();
                return StepResult.failure(device.getDeviceName(), errorMsg);
            }
            boolean opSuccess = Boolean.TRUE.equals(result.getSuccess());
            updateStepAfterOperation(step, result, opSuccess);
            updateTaskProgress(task, step.getStepOrder(), opSuccess);
            if (opSuccess) {
                updateContextAfterSuccess(device, context, params);
                return StepResult.success(device.getDeviceName(), result.getMessage());
            }
            return StepResult.failure(device.getDeviceName(), result.getMessage());
        } catch (Exception e) {
            log.error("设备操作异常, deviceId={}, operation={}", device.getId(), operation, e);
            step.setStatus(TaskStepDetail.Status.FAILED.name());
            step.setErrorMessage(e.getMessage());
            step.setEndTime(new Date());
            step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
            taskStepDetailMapper.updateById(step);
            updateTaskProgress(task, step.getStepOrder(), false);
            return StepResult.failure(device.getDeviceName(), e.getMessage());
        }
        // 达到最大重试次数
        step.setStatus(TaskStepDetail.Status.FAILED.name());
        step.setErrorMessage(lastException != null ? lastException.getMessage() : "执行失败");
        step.setEndTime(new Date());
        step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
        step.setRetryCount(retryAttempt);
        taskStepDetailMapper.updateById(step);
        updateTaskProgress(task, step.getStepOrder(), false);
        // 通知步骤更新
        notificationService.notifyStepUpdate(task.getTaskId(), step);
        deviceCoordinationService.syncDeviceStatus(device,
            DeviceCoordinationService.DeviceStatus.FAILED, context);
        return StepResult.failure(device.getDeviceName(),
            String.format("执行失败(已重试%d次)", retryAttempt));
    }
    private StepResult executeInteractionStep(MultiDeviceTask task,
                                              TaskStepDetail step,
                                              DeviceConfig device,
                                              TaskExecutionContext context,
                                              DeviceInteraction deviceInteraction) {
        try {
            InteractionContext interactionContext = new InteractionContext(device, context);
            step.setInputData(toJson(context.getParameters()));
            InteractionResult interactionResult = deviceInteraction.execute(interactionContext);
            boolean success = interactionResult != null && interactionResult.isSuccess();
            updateStepAfterInteraction(step, interactionResult);
            updateTaskProgress(task, step.getStepOrder(), success);
    /**
     * 带重试的交互步骤执行
     */
    private StepResult executeInteractionStepWithRetry(MultiDeviceTask task,
                                                      TaskStepDetail step,
                                                      DeviceConfig device,
                                                      TaskExecutionContext context,
                                                      DeviceInteraction deviceInteraction,
                                                      RetryPolicy retryPolicy) {
        int retryAttempt = 0;
        Exception lastException = null;
        while (retryAttempt <= retryPolicy.getMaxRetryCount()) {
            try {
                if (retryAttempt > 0) {
                    long waitTime = retryPolicy.calculateRetryInterval(retryAttempt);
                    log.info("交互步骤执行重试: deviceId={}, retryAttempt={}/{}, waitTime={}ms",
                        device.getId(), retryAttempt, retryPolicy.getMaxRetryCount(), waitTime);
                    Thread.sleep(waitTime);
                    step.setRetryCount(retryAttempt);
                    step.setStatus(TaskStepDetail.Status.RUNNING.name());
                    step.setStartTime(new Date());
                    taskStepDetailMapper.updateById(step);
                }
            if (success) {
                return StepResult.success(device.getDeviceName(), interactionResult.getMessage());
                InteractionContext interactionContext = new InteractionContext(device, context);
                step.setInputData(toJson(context.getParameters()));
                InteractionResult interactionResult = deviceInteraction.execute(interactionContext);
                boolean success = interactionResult != null && interactionResult.isSuccess();
                updateStepAfterInteraction(step, interactionResult);
                updateTaskProgress(task, step.getStepOrder(), success);
                if (success) {
                    deviceCoordinationService.syncDeviceStatus(device,
                        DeviceCoordinationService.DeviceStatus.COMPLETED, context);
                    return StepResult.success(device.getDeviceName(), interactionResult.getMessage());
                } else {
                    if (retryAttempt < retryPolicy.getMaxRetryCount()) {
                        retryAttempt++;
                        continue;
                    }
                    deviceCoordinationService.syncDeviceStatus(device,
                        DeviceCoordinationService.DeviceStatus.FAILED, context);
                    String message = interactionResult != null ? interactionResult.getMessage() : "交互执行失败";
                    return StepResult.failure(device.getDeviceName(), message);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("交互步骤执行被中断, deviceId={}", device.getId(), e);
                step.setStatus(TaskStepDetail.Status.FAILED.name());
                step.setErrorMessage("执行被中断: " + e.getMessage());
                step.setEndTime(new Date());
                step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
                step.setRetryCount(retryAttempt);
                taskStepDetailMapper.updateById(step);
                updateTaskProgress(task, step.getStepOrder(), false);
                return StepResult.failure(device.getDeviceName(), "执行被中断");
            } catch (Exception e) {
                lastException = e;
                log.error("交互执行异常, deviceId={}, retryAttempt={}", device.getId(), retryAttempt, e);
                if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) {
                    retryAttempt++;
                    continue;
                }
                step.setStatus(TaskStepDetail.Status.FAILED.name());
                step.setErrorMessage(e.getMessage());
                step.setEndTime(new Date());
                step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
                step.setRetryCount(retryAttempt);
                taskStepDetailMapper.updateById(step);
                updateTaskProgress(task, step.getStepOrder(), false);
                // 通知步骤更新
                notificationService.notifyStepUpdate(task.getTaskId(), step);
                deviceCoordinationService.syncDeviceStatus(device,
                    DeviceCoordinationService.DeviceStatus.FAILED, context);
                String errorMsg = retryAttempt > 0
                    ? String.format("执行失败(已重试%d次): %s", retryAttempt, e.getMessage())
                    : e.getMessage();
                return StepResult.failure(device.getDeviceName(), errorMsg);
            }
            String message = interactionResult != null ? interactionResult.getMessage() : "交互执行失败";
            return StepResult.failure(device.getDeviceName(), message);
        } catch (Exception e) {
            log.error("交互执行异常, deviceId={}", device.getId(), e);
            step.setStatus(TaskStepDetail.Status.FAILED.name());
            step.setErrorMessage(e.getMessage());
            step.setEndTime(new Date());
            step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
            taskStepDetailMapper.updateById(step);
            updateTaskProgress(task, step.getStepOrder(), false);
            return StepResult.failure(device.getDeviceName(), e.getMessage());
        }
        step.setStatus(TaskStepDetail.Status.FAILED.name());
        step.setErrorMessage(lastException != null ? lastException.getMessage() : "交互执行失败");
        step.setEndTime(new Date());
        step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
        step.setRetryCount(retryAttempt);
        taskStepDetailMapper.updateById(step);
        updateTaskProgress(task, step.getStepOrder(), false);
        // 通知步骤更新
        notificationService.notifyStepUpdate(task.getTaskId(), step);
        deviceCoordinationService.syncDeviceStatus(device,
            DeviceCoordinationService.DeviceStatus.FAILED, context);
        return StepResult.failure(device.getDeviceName(),
            String.format("执行失败(已重试%d次)", retryAttempt));
    }
    /**
     * 获取重试策略
     */
    private RetryPolicy getRetryPolicy(DeviceConfig device) {
        // 可以从设备配置中读取重试策略
        // 暂时使用默认策略
        return RetryPolicy.defaultPolicy();
    }
    /**
     * 判断业务失败是否可重试
     */
    private boolean isRetryableFailure(DevicePlcVO.OperationResult result) {
        if (result == null || result.getMessage() == null) {
            return false;
        }
        String message = result.getMessage().toLowerCase();
        // 网络错误、超时错误可重试
        return message.contains("timeout") ||
               message.contains("connection") ||
               message.contains("网络") ||
               message.contains("超时");
    }
    private void updateStepAfterOperation(TaskStepDetail step,
                                          DevicePlcVO.OperationResult result,
@@ -223,6 +682,9 @@
            update.set(MultiDeviceTask::getStatus, MultiDeviceTask.Status.FAILED.name());
        }
        multiDeviceTaskMapper.update(null, update);
        // 通知任务状态更新
        notificationService.notifyTaskStatus(task);
    }
    private String determineOperation(DeviceConfig device, Map<String, Object> params) {
@@ -302,12 +764,28 @@
    private void updateContextAfterSuccess(DeviceConfig device,
                                           TaskExecutionContext context,
                                           Map<String, Object> params) {
        List<String> glassIds = extractGlassIds(params);
        switch (device.getDeviceType()) {
            case DeviceConfig.DeviceType.LOAD_VEHICLE:
                context.setLoadedGlassIds(extractGlassIds(params));
                context.setLoadedGlassIds(glassIds);
                // 数据传递:上大车 -> 下一个设备
                if (!CollectionUtils.isEmpty(glassIds)) {
                    Map<String, Object> transferData = new HashMap<>();
                    transferData.put("glassIds", glassIds);
                    transferData.put("sourceDevice", device.getDeviceCode());
                    // 这里简化处理,实际应该找到下一个设备
                    // 在串行模式下,下一个设备会在循环中自动获取
                }
                break;
            case DeviceConfig.DeviceType.LARGE_GLASS:
                context.setProcessedGlassIds(extractGlassIds(params));
                context.setProcessedGlassIds(glassIds);
                // 数据传递:大理片 -> 下一个设备
                if (!CollectionUtils.isEmpty(glassIds)) {
                    Map<String, Object> transferData = new HashMap<>();
                    transferData.put("glassIds", glassIds);
                    transferData.put("sourceDevice", device.getDeviceCode());
                }
                break;
            default:
                break;