| | |
| | | import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; |
| | | import com.baomidou.mybatisplus.core.toolkit.Wrappers; |
| | | import com.fasterxml.jackson.core.JsonProcessingException; |
| | | import com.fasterxml.jackson.core.type.TypeReference; |
| | | import com.fasterxml.jackson.databind.ObjectMapper; |
| | | import com.mes.device.entity.DeviceConfig; |
| | | import com.mes.device.entity.DeviceGroupConfig; |
| | |
| | | import com.mes.interaction.DeviceLogicHandlerFactory; |
| | | import com.mes.interaction.base.InteractionContext; |
| | | import com.mes.interaction.base.InteractionResult; |
| | | import com.mes.device.service.DeviceCoordinationService; |
| | | import com.mes.device.service.DeviceInteractionService; |
| | | import com.mes.task.dto.TaskParameters; |
| | | import com.mes.task.entity.MultiDeviceTask; |
| | | import com.mes.task.entity.TaskStepDetail; |
| | | import com.mes.task.mapper.MultiDeviceTaskMapper; |
| | | import com.mes.task.mapper.TaskStepDetailMapper; |
| | | import com.mes.task.model.RetryPolicy; |
| | | import com.mes.task.model.TaskExecutionContext; |
| | | import com.mes.task.model.TaskExecutionResult; |
| | | import com.mes.task.service.TaskStatusNotificationService; |
| | | import com.mes.device.vo.DevicePlcVO; |
| | | import lombok.RequiredArgsConstructor; |
| | | import lombok.extern.slf4j.Slf4j; |
| | |
| | | import org.springframework.util.StringUtils; |
| | | |
| | | import java.util.*; |
| | | import java.util.concurrent.*; |
| | | |
| | | /** |
| | | * 多设备任务执行引擎 |
| | | * 支持串行和并行两种执行模式 |
| | | */ |
| | | @Slf4j |
| | | @Component |
| | |
| | | public class TaskExecutionEngine { |
| | | |
| | | private static final Map<String, String> DEFAULT_OPERATIONS = new HashMap<>(); |
| | | private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<Map<String, Object>>() {}; |
| | | |
| | | // 执行模式常量 |
| | | private static final String EXECUTION_MODE_SERIAL = "SERIAL"; |
| | | private static final String EXECUTION_MODE_PARALLEL = "PARALLEL"; |
| | | |
| | | static { |
| | | DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.LOAD_VEHICLE, "feedGlass"); |
| | |
| | | private final DeviceInteractionService deviceInteractionService; |
| | | private final DeviceInteractionRegistry interactionRegistry; |
| | | private final DeviceLogicHandlerFactory handlerFactory; |
| | | private final DeviceCoordinationService deviceCoordinationService; |
| | | private final TaskStatusNotificationService notificationService; |
| | | private final ObjectMapper objectMapper; |
| | | |
| | | // 线程池用于并行执行 |
| | | private final ExecutorService executorService = Executors.newCachedThreadPool(r -> { |
| | | Thread t = new Thread(r, "TaskExecutionEngine-Parallel"); |
| | | t.setDaemon(true); |
| | | return t; |
| | | }); |
| | | |
| | | public TaskExecutionResult execute(MultiDeviceTask task, |
| | | DeviceGroupConfig groupConfig, |
| | |
| | | } |
| | | |
| | | TaskExecutionContext context = new TaskExecutionContext(parameters); |
| | | |
| | | // 设备协调:检查依赖关系和执行条件 |
| | | DeviceCoordinationService.CoordinationResult coordinationResult = |
| | | deviceCoordinationService.coordinateExecution(groupConfig, devices, context); |
| | | if (!coordinationResult.canExecute()) { |
| | | log.warn("设备协调失败: {}", coordinationResult.getMessage()); |
| | | return TaskExecutionResult.failure(coordinationResult.getMessage(), Collections.emptyMap()); |
| | | } |
| | | log.info("设备协调成功: {}", coordinationResult.getMessage()); |
| | | |
| | | task.setTotalSteps(devices.size()); |
| | | task.setStatus(MultiDeviceTask.Status.RUNNING.name()); |
| | | multiDeviceTaskMapper.updateById(task); |
| | | |
| | | // 通知任务开始执行 |
| | | notificationService.notifyTaskStatus(task); |
| | | |
| | | // 确定执行模式 |
| | | String executionMode = determineExecutionMode(groupConfig); |
| | | Integer maxConcurrent = getMaxConcurrentDevices(groupConfig); |
| | | |
| | | List<Map<String, Object>> stepSummaries = new ArrayList<>(); |
| | | boolean success = true; |
| | | String failureMessage = null; |
| | | log.info("任务执行模式: {}, 最大并发数: {}, 设备数: {}", executionMode, maxConcurrent, devices.size()); |
| | | |
| | | for (int i = 0; i < devices.size(); i++) { |
| | | DeviceConfig device = devices.get(i); |
| | | int order = i + 1; |
| | | TaskStepDetail step = createStepRecord(task, device, order); |
| | | StepResult stepResult = executeStep(task, step, device, context); |
| | | stepSummaries.add(stepResult.toSummary()); |
| | | if (!stepResult.isSuccess()) { |
| | | success = false; |
| | | failureMessage = stepResult.getMessage(); |
| | | break; |
| | | List<Map<String, Object>> stepSummaries; |
| | | boolean success; |
| | | String failureMessage; |
| | | |
| | | if (EXECUTION_MODE_PARALLEL.equals(executionMode)) { |
| | | // 并行执行模式 |
| | | stepSummaries = new ArrayList<>(Collections.nCopies(devices.size(), null)); |
| | | Pair<Boolean, String> result = executeParallel(task, devices, context, stepSummaries, maxConcurrent); |
| | | success = result.getFirst(); |
| | | failureMessage = result.getSecond(); |
| | | } else { |
| | | // 串行执行模式(默认) |
| | | stepSummaries = new ArrayList<>(); |
| | | success = true; |
| | | failureMessage = null; |
| | | for (int i = 0; i < devices.size(); i++) { |
| | | DeviceConfig device = devices.get(i); |
| | | int order = i + 1; |
| | | TaskStepDetail step = createStepRecord(task, device, order); |
| | | StepResult stepResult = executeStep(task, step, device, context); |
| | | stepSummaries.add(stepResult.toSummary()); |
| | | if (!stepResult.isSuccess()) { |
| | | success = false; |
| | | failureMessage = stepResult.getMessage(); |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | payload.put("steps", stepSummaries); |
| | | payload.put("groupId", groupConfig.getId()); |
| | | payload.put("deviceCount", devices.size()); |
| | | payload.put("executionMode", executionMode); |
| | | |
| | | // 更新任务最终状态 |
| | | if (success) { |
| | | task.setStatus(MultiDeviceTask.Status.COMPLETED.name()); |
| | | } else { |
| | | task.setStatus(MultiDeviceTask.Status.FAILED.name()); |
| | | task.setErrorMessage(failureMessage); |
| | | } |
| | | task.setEndTime(new Date()); |
| | | multiDeviceTaskMapper.updateById(task); |
| | | |
| | | // 通知任务完成 |
| | | notificationService.notifyTaskStatus(task); |
| | | |
| | | if (success) { |
| | | return TaskExecutionResult.success(payload); |
| | | } |
| | | return TaskExecutionResult.failure(failureMessage != null ? failureMessage : "任务执行失败", payload); |
| | | } |
| | | |
| | | /** |
| | | * 并行执行多个设备操作 |
| | | */ |
| | | private Pair<Boolean, String> executeParallel(MultiDeviceTask task, |
| | | List<DeviceConfig> devices, |
| | | TaskExecutionContext context, |
| | | List<Map<String, Object>> stepSummaries, |
| | | Integer maxConcurrent) { |
| | | int concurrency = maxConcurrent != null && maxConcurrent > 0 |
| | | ? Math.min(maxConcurrent, devices.size()) |
| | | : devices.size(); |
| | | |
| | | // 创建所有步骤记录 |
| | | List<TaskStepDetail> steps = new ArrayList<>(); |
| | | for (int i = 0; i < devices.size(); i++) { |
| | | DeviceConfig device = devices.get(i); |
| | | int order = i + 1; |
| | | TaskStepDetail step = createStepRecord(task, device, order); |
| | | steps.add(step); |
| | | } |
| | | |
| | | // 使用信号量控制并发数 |
| | | Semaphore semaphore = new Semaphore(concurrency); |
| | | List<CompletableFuture<StepResult>> futures = new ArrayList<>(); |
| | | |
| | | for (int i = 0; i < devices.size(); i++) { |
| | | final int index = i; |
| | | final DeviceConfig device = devices.get(index); |
| | | final TaskStepDetail step = steps.get(index); |
| | | |
| | | CompletableFuture<StepResult> future = CompletableFuture.supplyAsync(() -> { |
| | | try { |
| | | semaphore.acquire(); |
| | | try { |
| | | return executeStep(task, step, device, context); |
| | | } finally { |
| | | semaphore.release(); |
| | | } |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | log.error("并行执行被中断, deviceId={}", device.getId(), e); |
| | | return StepResult.failure(device.getDeviceName(), "执行被中断"); |
| | | } catch (Exception e) { |
| | | log.error("并行执行异常, deviceId={}", device.getId(), e); |
| | | return StepResult.failure(device.getDeviceName(), e.getMessage()); |
| | | } |
| | | }, executorService); |
| | | |
| | | final int finalIndex = index; |
| | | future.whenComplete((result, throwable) -> { |
| | | if (throwable != null) { |
| | | log.error("并行执行完成时异常, deviceId={}", device.getId(), throwable); |
| | | stepSummaries.set(finalIndex, StepResult.failure(device.getDeviceName(), |
| | | throwable.getMessage()).toSummary()); |
| | | } else if (result != null) { |
| | | stepSummaries.set(finalIndex, result.toSummary()); |
| | | } |
| | | }); |
| | | |
| | | futures.add(future); |
| | | } |
| | | |
| | | // 等待所有任务完成 |
| | | CompletableFuture<Void> allFutures = CompletableFuture.allOf( |
| | | futures.toArray(new CompletableFuture[0]) |
| | | ); |
| | | |
| | | try { |
| | | allFutures.get(30, TimeUnit.MINUTES); // 最多等待30分钟 |
| | | } catch (TimeoutException e) { |
| | | log.error("并行执行超时, taskId={}", task.getTaskId(), e); |
| | | return Pair.of(false, "任务执行超时"); |
| | | } catch (Exception e) { |
| | | log.error("等待并行执行完成时异常, taskId={}", task.getTaskId(), e); |
| | | return Pair.of(false, "等待执行完成时发生异常: " + e.getMessage()); |
| | | } |
| | | |
| | | // 检查所有步骤的执行结果 |
| | | boolean allSuccess = true; |
| | | String firstFailureMessage = null; |
| | | for (int i = 0; i < futures.size(); i++) { |
| | | try { |
| | | StepResult result = futures.get(i).get(); |
| | | if (result != null && !result.isSuccess()) { |
| | | allSuccess = false; |
| | | if (firstFailureMessage == null) { |
| | | firstFailureMessage = result.getMessage(); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("获取步骤执行结果异常, stepIndex={}", i, e); |
| | | allSuccess = false; |
| | | if (firstFailureMessage == null) { |
| | | firstFailureMessage = "获取执行结果异常: " + e.getMessage(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | return Pair.of(allSuccess, firstFailureMessage); |
| | | } |
| | | |
| | | /** |
| | | * 确定执行模式 |
| | | */ |
| | | private String determineExecutionMode(DeviceGroupConfig groupConfig) { |
| | | if (groupConfig == null) { |
| | | return EXECUTION_MODE_SERIAL; // 默认串行 |
| | | } |
| | | |
| | | // 从extraConfig中读取executionMode |
| | | String extraConfig = groupConfig.getExtraConfig(); |
| | | if (StringUtils.hasText(extraConfig)) { |
| | | try { |
| | | Map<String, Object> config = objectMapper.readValue(extraConfig, MAP_TYPE); |
| | | Object mode = config.get("executionMode"); |
| | | if (mode != null) { |
| | | String modeStr = String.valueOf(mode).toUpperCase(); |
| | | if (EXECUTION_MODE_PARALLEL.equals(modeStr) || EXECUTION_MODE_SERIAL.equals(modeStr)) { |
| | | return modeStr; |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | log.warn("解析设备组执行模式失败, groupId={}", groupConfig.getId(), e); |
| | | } |
| | | } |
| | | |
| | | // 如果有maxConcurrentDevices且大于1,默认使用并行模式 |
| | | if (groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 1) { |
| | | return EXECUTION_MODE_PARALLEL; |
| | | } |
| | | |
| | | return EXECUTION_MODE_SERIAL; // 默认串行 |
| | | } |
| | | |
| | | /** |
| | | * 获取最大并发设备数 |
| | | */ |
| | | private Integer getMaxConcurrentDevices(DeviceGroupConfig groupConfig) { |
| | | if (groupConfig == null) { |
| | | return 1; |
| | | } |
| | | |
| | | // 从extraConfig中读取maxConcurrent |
| | | String extraConfig = groupConfig.getExtraConfig(); |
| | | if (StringUtils.hasText(extraConfig)) { |
| | | try { |
| | | Map<String, Object> config = objectMapper.readValue(extraConfig, MAP_TYPE); |
| | | Object maxConcurrent = config.get("maxConcurrent"); |
| | | if (maxConcurrent instanceof Number) { |
| | | return ((Number) maxConcurrent).intValue(); |
| | | } |
| | | } catch (Exception e) { |
| | | log.warn("解析设备组最大并发数失败, groupId={}", groupConfig.getId(), e); |
| | | } |
| | | } |
| | | |
| | | // 使用实体字段 |
| | | return groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 0 |
| | | ? groupConfig.getMaxConcurrentDevices() |
| | | : 1; |
| | | } |
| | | |
| | | /** |
| | | * 简单的Pair类用于返回两个值 |
| | | */ |
| | | private static class Pair<T, U> { |
| | | private final T first; |
| | | private final U second; |
| | | |
| | | private Pair(T first, U second) { |
| | | this.first = first; |
| | | this.second = second; |
| | | } |
| | | |
| | | public static <T, U> Pair<T, U> of(T first, U second) { |
| | | return new Pair<>(first, second); |
| | | } |
| | | |
| | | public T getFirst() { |
| | | return first; |
| | | } |
| | | |
| | | public U getSecond() { |
| | | return second; |
| | | } |
| | | } |
| | | |
| | | private TaskStepDetail createStepRecord(MultiDeviceTask task, DeviceConfig device, int order) { |
| | |
| | | TaskStepDetail step, |
| | | DeviceConfig device, |
| | | TaskExecutionContext context) { |
| | | return executeStepWithRetry(task, step, device, context, getRetryPolicy(device)); |
| | | } |
| | | |
| | | /** |
| | | * 带重试的步骤执行 |
| | | */ |
| | | private StepResult executeStepWithRetry(MultiDeviceTask task, |
| | | TaskStepDetail step, |
| | | DeviceConfig device, |
| | | TaskExecutionContext context, |
| | | RetryPolicy retryPolicy) { |
| | | Date startTime = new Date(); |
| | | step.setStartTime(startTime); |
| | | step.setStatus(TaskStepDetail.Status.RUNNING.name()); |
| | | step.setRetryCount(0); |
| | | |
| | | DeviceInteraction deviceInteraction = interactionRegistry.getInteraction(device.getDeviceType()); |
| | | if (deviceInteraction != null) { |
| | | return executeInteractionStep(task, step, device, context, deviceInteraction); |
| | | return executeInteractionStepWithRetry(task, step, device, context, deviceInteraction, retryPolicy); |
| | | } |
| | | |
| | | Map<String, Object> params = buildOperationParams(device, context); |
| | |
| | | |
| | | String operation = determineOperation(device, params); |
| | | DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType()); |
| | | DevicePlcVO.OperationResult result; |
| | | |
| | | int retryAttempt = 0; |
| | | Exception lastException = null; |
| | | |
| | | while (retryAttempt <= retryPolicy.getMaxRetryCount()) { |
| | | try { |
| | | if (retryAttempt > 0) { |
| | | // 重试前等待 |
| | | long waitTime = retryPolicy.calculateRetryInterval(retryAttempt); |
| | | log.info("步骤执行重试: deviceId={}, operation={}, retryAttempt={}/{}, waitTime={}ms", |
| | | device.getId(), operation, retryAttempt, retryPolicy.getMaxRetryCount(), waitTime); |
| | | Thread.sleep(waitTime); |
| | | |
| | | // 更新步骤状态 |
| | | step.setRetryCount(retryAttempt); |
| | | step.setStatus(TaskStepDetail.Status.RUNNING.name()); |
| | | step.setStartTime(new Date()); |
| | | taskStepDetailMapper.updateById(step); |
| | | } |
| | | |
| | | try { |
| | | if (handler == null) { |
| | | result = deviceInteractionService.executeOperation(device.getId(), operation, params); |
| | | } else { |
| | | result = handler.execute(device, operation, params); |
| | | DevicePlcVO.OperationResult result; |
| | | if (handler == null) { |
| | | result = deviceInteractionService.executeOperation(device.getId(), operation, params); |
| | | } else { |
| | | result = handler.execute(device, operation, params); |
| | | } |
| | | |
| | | boolean opSuccess = Boolean.TRUE.equals(result.getSuccess()); |
| | | updateStepAfterOperation(step, result, opSuccess); |
| | | updateTaskProgress(task, step.getStepOrder(), opSuccess); |
| | | |
| | | // 通知步骤更新 |
| | | notificationService.notifyStepUpdate(task.getTaskId(), step); |
| | | |
| | | if (opSuccess) { |
| | | updateContextAfterSuccess(device, context, params); |
| | | |
| | | // 同步设备状态 |
| | | deviceCoordinationService.syncDeviceStatus(device, |
| | | DeviceCoordinationService.DeviceStatus.COMPLETED, context); |
| | | |
| | | return StepResult.success(device.getDeviceName(), result.getMessage()); |
| | | } else { |
| | | // 业务失败,判断是否可重试 |
| | | if (retryAttempt < retryPolicy.getMaxRetryCount() && isRetryableFailure(result)) { |
| | | retryAttempt++; |
| | | lastException = new RuntimeException(result.getMessage()); |
| | | log.warn("步骤执行失败,准备重试: deviceId={}, operation={}, retryAttempt={}, message={}", |
| | | device.getId(), operation, retryAttempt, result.getMessage()); |
| | | continue; |
| | | } |
| | | |
| | | // 同步失败状态 |
| | | deviceCoordinationService.syncDeviceStatus(device, |
| | | DeviceCoordinationService.DeviceStatus.FAILED, context); |
| | | |
| | | return StepResult.failure(device.getDeviceName(), result.getMessage()); |
| | | } |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | log.error("步骤执行被中断, deviceId={}, operation={}", device.getId(), operation, e); |
| | | step.setStatus(TaskStepDetail.Status.FAILED.name()); |
| | | step.setErrorMessage("执行被中断: " + e.getMessage()); |
| | | step.setEndTime(new Date()); |
| | | step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); |
| | | step.setRetryCount(retryAttempt); |
| | | taskStepDetailMapper.updateById(step); |
| | | updateTaskProgress(task, step.getStepOrder(), false); |
| | | return StepResult.failure(device.getDeviceName(), "执行被中断"); |
| | | } catch (Exception e) { |
| | | lastException = e; |
| | | log.error("设备操作异常, deviceId={}, operation={}, retryAttempt={}", |
| | | device.getId(), operation, retryAttempt, e); |
| | | |
| | | // 判断是否可重试 |
| | | if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) { |
| | | retryAttempt++; |
| | | log.warn("步骤执行异常,准备重试: deviceId={}, operation={}, retryAttempt={}, exception={}", |
| | | device.getId(), operation, retryAttempt, e.getClass().getSimpleName()); |
| | | continue; |
| | | } |
| | | |
| | | // 不可重试或达到最大重试次数 |
| | | step.setStatus(TaskStepDetail.Status.FAILED.name()); |
| | | step.setErrorMessage(e.getMessage()); |
| | | step.setEndTime(new Date()); |
| | | step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); |
| | | step.setRetryCount(retryAttempt); |
| | | taskStepDetailMapper.updateById(step); |
| | | updateTaskProgress(task, step.getStepOrder(), false); |
| | | |
| | | // 通知步骤更新 |
| | | notificationService.notifyStepUpdate(task.getTaskId(), step); |
| | | |
| | | // 同步失败状态 |
| | | deviceCoordinationService.syncDeviceStatus(device, |
| | | DeviceCoordinationService.DeviceStatus.FAILED, context); |
| | | |
| | | String errorMsg = retryAttempt > 0 |
| | | ? String.format("执行失败(已重试%d次): %s", retryAttempt, e.getMessage()) |
| | | : e.getMessage(); |
| | | return StepResult.failure(device.getDeviceName(), errorMsg); |
| | | } |
| | | |
| | | boolean opSuccess = Boolean.TRUE.equals(result.getSuccess()); |
| | | updateStepAfterOperation(step, result, opSuccess); |
| | | updateTaskProgress(task, step.getStepOrder(), opSuccess); |
| | | |
| | | if (opSuccess) { |
| | | updateContextAfterSuccess(device, context, params); |
| | | return StepResult.success(device.getDeviceName(), result.getMessage()); |
| | | } |
| | | return StepResult.failure(device.getDeviceName(), result.getMessage()); |
| | | } catch (Exception e) { |
| | | log.error("设备操作异常, deviceId={}, operation={}", device.getId(), operation, e); |
| | | step.setStatus(TaskStepDetail.Status.FAILED.name()); |
| | | step.setErrorMessage(e.getMessage()); |
| | | step.setEndTime(new Date()); |
| | | step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); |
| | | taskStepDetailMapper.updateById(step); |
| | | updateTaskProgress(task, step.getStepOrder(), false); |
| | | return StepResult.failure(device.getDeviceName(), e.getMessage()); |
| | | } |
| | | |
| | | // 达到最大重试次数 |
| | | step.setStatus(TaskStepDetail.Status.FAILED.name()); |
| | | step.setErrorMessage(lastException != null ? lastException.getMessage() : "执行失败"); |
| | | step.setEndTime(new Date()); |
| | | step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); |
| | | step.setRetryCount(retryAttempt); |
| | | taskStepDetailMapper.updateById(step); |
| | | updateTaskProgress(task, step.getStepOrder(), false); |
| | | |
| | | // 通知步骤更新 |
| | | notificationService.notifyStepUpdate(task.getTaskId(), step); |
| | | |
| | | deviceCoordinationService.syncDeviceStatus(device, |
| | | DeviceCoordinationService.DeviceStatus.FAILED, context); |
| | | |
| | | return StepResult.failure(device.getDeviceName(), |
| | | String.format("执行失败(已重试%d次)", retryAttempt)); |
| | | } |
| | | |
| | | private StepResult executeInteractionStep(MultiDeviceTask task, |
| | | TaskStepDetail step, |
| | | DeviceConfig device, |
| | | TaskExecutionContext context, |
| | | DeviceInteraction deviceInteraction) { |
| | | try { |
| | | InteractionContext interactionContext = new InteractionContext(device, context); |
| | | step.setInputData(toJson(context.getParameters())); |
| | | InteractionResult interactionResult = deviceInteraction.execute(interactionContext); |
| | | boolean success = interactionResult != null && interactionResult.isSuccess(); |
| | | updateStepAfterInteraction(step, interactionResult); |
| | | updateTaskProgress(task, step.getStepOrder(), success); |
| | | /** |
| | | * 带重试的交互步骤执行 |
| | | */ |
| | | private StepResult executeInteractionStepWithRetry(MultiDeviceTask task, |
| | | TaskStepDetail step, |
| | | DeviceConfig device, |
| | | TaskExecutionContext context, |
| | | DeviceInteraction deviceInteraction, |
| | | RetryPolicy retryPolicy) { |
| | | int retryAttempt = 0; |
| | | Exception lastException = null; |
| | | |
| | | while (retryAttempt <= retryPolicy.getMaxRetryCount()) { |
| | | try { |
| | | if (retryAttempt > 0) { |
| | | long waitTime = retryPolicy.calculateRetryInterval(retryAttempt); |
| | | log.info("交互步骤执行重试: deviceId={}, retryAttempt={}/{}, waitTime={}ms", |
| | | device.getId(), retryAttempt, retryPolicy.getMaxRetryCount(), waitTime); |
| | | Thread.sleep(waitTime); |
| | | |
| | | step.setRetryCount(retryAttempt); |
| | | step.setStatus(TaskStepDetail.Status.RUNNING.name()); |
| | | step.setStartTime(new Date()); |
| | | taskStepDetailMapper.updateById(step); |
| | | } |
| | | |
| | | if (success) { |
| | | return StepResult.success(device.getDeviceName(), interactionResult.getMessage()); |
| | | InteractionContext interactionContext = new InteractionContext(device, context); |
| | | step.setInputData(toJson(context.getParameters())); |
| | | InteractionResult interactionResult = deviceInteraction.execute(interactionContext); |
| | | boolean success = interactionResult != null && interactionResult.isSuccess(); |
| | | updateStepAfterInteraction(step, interactionResult); |
| | | updateTaskProgress(task, step.getStepOrder(), success); |
| | | |
| | | if (success) { |
| | | deviceCoordinationService.syncDeviceStatus(device, |
| | | DeviceCoordinationService.DeviceStatus.COMPLETED, context); |
| | | return StepResult.success(device.getDeviceName(), interactionResult.getMessage()); |
| | | } else { |
| | | if (retryAttempt < retryPolicy.getMaxRetryCount()) { |
| | | retryAttempt++; |
| | | continue; |
| | | } |
| | | deviceCoordinationService.syncDeviceStatus(device, |
| | | DeviceCoordinationService.DeviceStatus.FAILED, context); |
| | | String message = interactionResult != null ? interactionResult.getMessage() : "交互执行失败"; |
| | | return StepResult.failure(device.getDeviceName(), message); |
| | | } |
| | | } catch (InterruptedException e) { |
| | | Thread.currentThread().interrupt(); |
| | | log.error("交互步骤执行被中断, deviceId={}", device.getId(), e); |
| | | step.setStatus(TaskStepDetail.Status.FAILED.name()); |
| | | step.setErrorMessage("执行被中断: " + e.getMessage()); |
| | | step.setEndTime(new Date()); |
| | | step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); |
| | | step.setRetryCount(retryAttempt); |
| | | taskStepDetailMapper.updateById(step); |
| | | updateTaskProgress(task, step.getStepOrder(), false); |
| | | return StepResult.failure(device.getDeviceName(), "执行被中断"); |
| | | } catch (Exception e) { |
| | | lastException = e; |
| | | log.error("交互执行异常, deviceId={}, retryAttempt={}", device.getId(), retryAttempt, e); |
| | | |
| | | if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) { |
| | | retryAttempt++; |
| | | continue; |
| | | } |
| | | |
| | | step.setStatus(TaskStepDetail.Status.FAILED.name()); |
| | | step.setErrorMessage(e.getMessage()); |
| | | step.setEndTime(new Date()); |
| | | step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); |
| | | step.setRetryCount(retryAttempt); |
| | | taskStepDetailMapper.updateById(step); |
| | | updateTaskProgress(task, step.getStepOrder(), false); |
| | | |
| | | // 通知步骤更新 |
| | | notificationService.notifyStepUpdate(task.getTaskId(), step); |
| | | |
| | | deviceCoordinationService.syncDeviceStatus(device, |
| | | DeviceCoordinationService.DeviceStatus.FAILED, context); |
| | | |
| | | String errorMsg = retryAttempt > 0 |
| | | ? String.format("执行失败(已重试%d次): %s", retryAttempt, e.getMessage()) |
| | | : e.getMessage(); |
| | | return StepResult.failure(device.getDeviceName(), errorMsg); |
| | | } |
| | | String message = interactionResult != null ? interactionResult.getMessage() : "交互执行失败"; |
| | | return StepResult.failure(device.getDeviceName(), message); |
| | | } catch (Exception e) { |
| | | log.error("交互执行异常, deviceId={}", device.getId(), e); |
| | | step.setStatus(TaskStepDetail.Status.FAILED.name()); |
| | | step.setErrorMessage(e.getMessage()); |
| | | step.setEndTime(new Date()); |
| | | step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); |
| | | taskStepDetailMapper.updateById(step); |
| | | updateTaskProgress(task, step.getStepOrder(), false); |
| | | return StepResult.failure(device.getDeviceName(), e.getMessage()); |
| | | } |
| | | |
| | | step.setStatus(TaskStepDetail.Status.FAILED.name()); |
| | | step.setErrorMessage(lastException != null ? lastException.getMessage() : "交互执行失败"); |
| | | step.setEndTime(new Date()); |
| | | step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime()); |
| | | step.setRetryCount(retryAttempt); |
| | | taskStepDetailMapper.updateById(step); |
| | | updateTaskProgress(task, step.getStepOrder(), false); |
| | | |
| | | // 通知步骤更新 |
| | | notificationService.notifyStepUpdate(task.getTaskId(), step); |
| | | |
| | | deviceCoordinationService.syncDeviceStatus(device, |
| | | DeviceCoordinationService.DeviceStatus.FAILED, context); |
| | | |
| | | return StepResult.failure(device.getDeviceName(), |
| | | String.format("执行失败(已重试%d次)", retryAttempt)); |
| | | } |
| | | |
| | | /** |
| | | * 获取重试策略 |
| | | */ |
| | | private RetryPolicy getRetryPolicy(DeviceConfig device) { |
| | | // 可以从设备配置中读取重试策略 |
| | | // 暂时使用默认策略 |
| | | return RetryPolicy.defaultPolicy(); |
| | | } |
| | | |
| | | /** |
| | | * 判断业务失败是否可重试 |
| | | */ |
| | | private boolean isRetryableFailure(DevicePlcVO.OperationResult result) { |
| | | if (result == null || result.getMessage() == null) { |
| | | return false; |
| | | } |
| | | String message = result.getMessage().toLowerCase(); |
| | | // 网络错误、超时错误可重试 |
| | | return message.contains("timeout") || |
| | | message.contains("connection") || |
| | | message.contains("网络") || |
| | | message.contains("超时"); |
| | | } |
| | | |
| | | |
| | | private void updateStepAfterOperation(TaskStepDetail step, |
| | | DevicePlcVO.OperationResult result, |
| | |
| | | update.set(MultiDeviceTask::getStatus, MultiDeviceTask.Status.FAILED.name()); |
| | | } |
| | | multiDeviceTaskMapper.update(null, update); |
| | | |
| | | // 通知任务状态更新 |
| | | notificationService.notifyTaskStatus(task); |
| | | } |
| | | |
| | | private String determineOperation(DeviceConfig device, Map<String, Object> params) { |
| | |
| | | private void updateContextAfterSuccess(DeviceConfig device, |
| | | TaskExecutionContext context, |
| | | Map<String, Object> params) { |
| | | List<String> glassIds = extractGlassIds(params); |
| | | |
| | | switch (device.getDeviceType()) { |
| | | case DeviceConfig.DeviceType.LOAD_VEHICLE: |
| | | context.setLoadedGlassIds(extractGlassIds(params)); |
| | | context.setLoadedGlassIds(glassIds); |
| | | // 数据传递:上大车 -> 下一个设备 |
| | | if (!CollectionUtils.isEmpty(glassIds)) { |
| | | Map<String, Object> transferData = new HashMap<>(); |
| | | transferData.put("glassIds", glassIds); |
| | | transferData.put("sourceDevice", device.getDeviceCode()); |
| | | // 这里简化处理,实际应该找到下一个设备 |
| | | // 在串行模式下,下一个设备会在循环中自动获取 |
| | | } |
| | | break; |
| | | case DeviceConfig.DeviceType.LARGE_GLASS: |
| | | context.setProcessedGlassIds(extractGlassIds(params)); |
| | | context.setProcessedGlassIds(glassIds); |
| | | // 数据传递:大理片 -> 下一个设备 |
| | | if (!CollectionUtils.isEmpty(glassIds)) { |
| | | Map<String, Object> transferData = new HashMap<>(); |
| | | transferData.put("glassIds", glassIds); |
| | | transferData.put("sourceDevice", device.getDeviceCode()); |
| | | } |
| | | break; |
| | | default: |
| | | break; |