package com.mes.task.service;
|
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.mes.device.entity.DeviceConfig;
|
import com.mes.device.entity.DeviceGroupConfig;
|
import com.mes.interaction.DeviceInteraction;
|
import com.mes.interaction.DeviceInteractionRegistry;
|
import com.mes.interaction.DeviceLogicHandler;
|
import com.mes.interaction.DeviceLogicHandlerFactory;
|
import com.mes.interaction.base.InteractionContext;
|
import com.mes.interaction.base.InteractionResult;
|
import com.mes.device.service.DeviceCoordinationService;
|
import com.mes.device.service.DeviceInteractionService;
|
import com.mes.task.dto.TaskParameters;
|
import com.mes.task.entity.MultiDeviceTask;
|
import com.mes.task.entity.TaskStepDetail;
|
import com.mes.task.mapper.MultiDeviceTaskMapper;
|
import com.mes.task.mapper.TaskStepDetailMapper;
|
import com.mes.task.model.RetryPolicy;
|
import com.mes.task.model.TaskExecutionContext;
|
import com.mes.task.model.TaskExecutionResult;
|
import com.mes.task.service.TaskStatusNotificationService;
|
import com.mes.device.vo.DevicePlcVO;
|
import lombok.RequiredArgsConstructor;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.stereotype.Component;
|
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.StringUtils;
|
|
import java.util.*;
|
import java.util.concurrent.*;
|
|
/**
|
* 多设备任务执行引擎
|
* 支持串行和并行两种执行模式
|
*/
|
@Slf4j
|
@Component
|
@RequiredArgsConstructor
|
public class TaskExecutionEngine {
|
|
private static final Map<String, String> DEFAULT_OPERATIONS = new HashMap<>();
|
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<Map<String, Object>>() {};
|
|
// 执行模式常量
|
private static final String EXECUTION_MODE_SERIAL = "SERIAL";
|
private static final String EXECUTION_MODE_PARALLEL = "PARALLEL";
|
|
static {
|
DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.LOAD_VEHICLE, "feedGlass");
|
DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.LARGE_GLASS, "processGlass");
|
DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.GLASS_STORAGE, "storeGlass");
|
}
|
|
private final TaskStepDetailMapper taskStepDetailMapper;
|
private final MultiDeviceTaskMapper multiDeviceTaskMapper;
|
private final DeviceInteractionService deviceInteractionService;
|
private final DeviceInteractionRegistry interactionRegistry;
|
private final DeviceLogicHandlerFactory handlerFactory;
|
private final DeviceCoordinationService deviceCoordinationService;
|
private final TaskStatusNotificationService notificationService;
|
private final ObjectMapper objectMapper;
|
|
// 线程池用于并行执行
|
private final ExecutorService executorService = Executors.newCachedThreadPool(r -> {
|
Thread t = new Thread(r, "TaskExecutionEngine-Parallel");
|
t.setDaemon(true);
|
return t;
|
});
|
|
public TaskExecutionResult execute(MultiDeviceTask task,
|
DeviceGroupConfig groupConfig,
|
List<DeviceConfig> devices,
|
TaskParameters parameters) {
|
|
if (CollectionUtils.isEmpty(devices)) {
|
return TaskExecutionResult.failure("设备组未配置设备,无法执行任务", Collections.emptyMap());
|
}
|
|
TaskExecutionContext context = new TaskExecutionContext(parameters);
|
|
// 设备协调:检查依赖关系和执行条件
|
DeviceCoordinationService.CoordinationResult coordinationResult =
|
deviceCoordinationService.coordinateExecution(groupConfig, devices, context);
|
if (!coordinationResult.canExecute()) {
|
log.warn("设备协调失败: {}", coordinationResult.getMessage());
|
return TaskExecutionResult.failure(coordinationResult.getMessage(), Collections.emptyMap());
|
}
|
log.info("设备协调成功: {}", coordinationResult.getMessage());
|
|
task.setTotalSteps(devices.size());
|
task.setStatus(MultiDeviceTask.Status.RUNNING.name());
|
multiDeviceTaskMapper.updateById(task);
|
|
// 通知任务开始执行
|
notificationService.notifyTaskStatus(task);
|
|
// 确定执行模式
|
String executionMode = determineExecutionMode(groupConfig);
|
Integer maxConcurrent = getMaxConcurrentDevices(groupConfig);
|
|
log.info("任务执行模式: {}, 最大并发数: {}, 设备数: {}", executionMode, maxConcurrent, devices.size());
|
|
List<Map<String, Object>> stepSummaries;
|
boolean success;
|
String failureMessage;
|
|
if (EXECUTION_MODE_PARALLEL.equals(executionMode)) {
|
// 并行执行模式
|
stepSummaries = new ArrayList<>(Collections.nCopies(devices.size(), null));
|
Pair<Boolean, String> result = executeParallel(task, devices, context, stepSummaries, maxConcurrent);
|
success = result.getFirst();
|
failureMessage = result.getSecond();
|
} else {
|
// 串行执行模式(默认)
|
stepSummaries = new ArrayList<>();
|
success = true;
|
failureMessage = null;
|
for (int i = 0; i < devices.size(); i++) {
|
DeviceConfig device = devices.get(i);
|
int order = i + 1;
|
TaskStepDetail step = createStepRecord(task, device, order);
|
StepResult stepResult = executeStep(task, step, device, context);
|
stepSummaries.add(stepResult.toSummary());
|
if (!stepResult.isSuccess()) {
|
success = false;
|
failureMessage = stepResult.getMessage();
|
break;
|
}
|
}
|
}
|
|
Map<String, Object> payload = new HashMap<>();
|
payload.put("steps", stepSummaries);
|
payload.put("groupId", groupConfig.getId());
|
payload.put("deviceCount", devices.size());
|
payload.put("executionMode", executionMode);
|
|
// 更新任务最终状态
|
if (success) {
|
task.setStatus(MultiDeviceTask.Status.COMPLETED.name());
|
} else {
|
task.setStatus(MultiDeviceTask.Status.FAILED.name());
|
task.setErrorMessage(failureMessage);
|
}
|
task.setEndTime(new Date());
|
multiDeviceTaskMapper.updateById(task);
|
|
// 通知任务完成
|
notificationService.notifyTaskStatus(task);
|
|
if (success) {
|
return TaskExecutionResult.success(payload);
|
}
|
return TaskExecutionResult.failure(failureMessage != null ? failureMessage : "任务执行失败", payload);
|
}
|
|
/**
|
* 并行执行多个设备操作
|
*/
|
private Pair<Boolean, String> executeParallel(MultiDeviceTask task,
|
List<DeviceConfig> devices,
|
TaskExecutionContext context,
|
List<Map<String, Object>> stepSummaries,
|
Integer maxConcurrent) {
|
int concurrency = maxConcurrent != null && maxConcurrent > 0
|
? Math.min(maxConcurrent, devices.size())
|
: devices.size();
|
|
// 创建所有步骤记录
|
List<TaskStepDetail> steps = new ArrayList<>();
|
for (int i = 0; i < devices.size(); i++) {
|
DeviceConfig device = devices.get(i);
|
int order = i + 1;
|
TaskStepDetail step = createStepRecord(task, device, order);
|
steps.add(step);
|
}
|
|
// 使用信号量控制并发数
|
Semaphore semaphore = new Semaphore(concurrency);
|
List<CompletableFuture<StepResult>> futures = new ArrayList<>();
|
|
for (int i = 0; i < devices.size(); i++) {
|
final int index = i;
|
final DeviceConfig device = devices.get(index);
|
final TaskStepDetail step = steps.get(index);
|
|
CompletableFuture<StepResult> future = CompletableFuture.supplyAsync(() -> {
|
try {
|
semaphore.acquire();
|
try {
|
return executeStep(task, step, device, context);
|
} finally {
|
semaphore.release();
|
}
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
log.error("并行执行被中断, deviceId={}", device.getId(), e);
|
return StepResult.failure(device.getDeviceName(), "执行被中断");
|
} catch (Exception e) {
|
log.error("并行执行异常, deviceId={}", device.getId(), e);
|
return StepResult.failure(device.getDeviceName(), e.getMessage());
|
}
|
}, executorService);
|
|
final int finalIndex = index;
|
future.whenComplete((result, throwable) -> {
|
if (throwable != null) {
|
log.error("并行执行完成时异常, deviceId={}", device.getId(), throwable);
|
stepSummaries.set(finalIndex, StepResult.failure(device.getDeviceName(),
|
throwable.getMessage()).toSummary());
|
} else if (result != null) {
|
stepSummaries.set(finalIndex, result.toSummary());
|
}
|
});
|
|
futures.add(future);
|
}
|
|
// 等待所有任务完成
|
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
|
futures.toArray(new CompletableFuture[0])
|
);
|
|
try {
|
allFutures.get(30, TimeUnit.MINUTES); // 最多等待30分钟
|
} catch (TimeoutException e) {
|
log.error("并行执行超时, taskId={}", task.getTaskId(), e);
|
return Pair.of(false, "任务执行超时");
|
} catch (Exception e) {
|
log.error("等待并行执行完成时异常, taskId={}", task.getTaskId(), e);
|
return Pair.of(false, "等待执行完成时发生异常: " + e.getMessage());
|
}
|
|
// 检查所有步骤的执行结果
|
boolean allSuccess = true;
|
String firstFailureMessage = null;
|
for (int i = 0; i < futures.size(); i++) {
|
try {
|
StepResult result = futures.get(i).get();
|
if (result != null && !result.isSuccess()) {
|
allSuccess = false;
|
if (firstFailureMessage == null) {
|
firstFailureMessage = result.getMessage();
|
}
|
}
|
} catch (Exception e) {
|
log.error("获取步骤执行结果异常, stepIndex={}", i, e);
|
allSuccess = false;
|
if (firstFailureMessage == null) {
|
firstFailureMessage = "获取执行结果异常: " + e.getMessage();
|
}
|
}
|
}
|
|
return Pair.of(allSuccess, firstFailureMessage);
|
}
|
|
/**
|
* 确定执行模式
|
*/
|
private String determineExecutionMode(DeviceGroupConfig groupConfig) {
|
if (groupConfig == null) {
|
return EXECUTION_MODE_SERIAL; // 默认串行
|
}
|
|
// 从extraConfig中读取executionMode
|
String extraConfig = groupConfig.getExtraConfig();
|
if (StringUtils.hasText(extraConfig)) {
|
try {
|
Map<String, Object> config = objectMapper.readValue(extraConfig, MAP_TYPE);
|
Object mode = config.get("executionMode");
|
if (mode != null) {
|
String modeStr = String.valueOf(mode).toUpperCase();
|
if (EXECUTION_MODE_PARALLEL.equals(modeStr) || EXECUTION_MODE_SERIAL.equals(modeStr)) {
|
return modeStr;
|
}
|
}
|
} catch (Exception e) {
|
log.warn("解析设备组执行模式失败, groupId={}", groupConfig.getId(), e);
|
}
|
}
|
|
// 如果有maxConcurrentDevices且大于1,默认使用并行模式
|
if (groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 1) {
|
return EXECUTION_MODE_PARALLEL;
|
}
|
|
return EXECUTION_MODE_SERIAL; // 默认串行
|
}
|
|
/**
|
* 获取最大并发设备数
|
*/
|
private Integer getMaxConcurrentDevices(DeviceGroupConfig groupConfig) {
|
if (groupConfig == null) {
|
return 1;
|
}
|
|
// 从extraConfig中读取maxConcurrent
|
String extraConfig = groupConfig.getExtraConfig();
|
if (StringUtils.hasText(extraConfig)) {
|
try {
|
Map<String, Object> config = objectMapper.readValue(extraConfig, MAP_TYPE);
|
Object maxConcurrent = config.get("maxConcurrent");
|
if (maxConcurrent instanceof Number) {
|
return ((Number) maxConcurrent).intValue();
|
}
|
} catch (Exception e) {
|
log.warn("解析设备组最大并发数失败, groupId={}", groupConfig.getId(), e);
|
}
|
}
|
|
// 使用实体字段
|
return groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 0
|
? groupConfig.getMaxConcurrentDevices()
|
: 1;
|
}
|
|
/**
|
* 简单的Pair类用于返回两个值
|
*/
|
private static class Pair<T, U> {
|
private final T first;
|
private final U second;
|
|
private Pair(T first, U second) {
|
this.first = first;
|
this.second = second;
|
}
|
|
public static <T, U> Pair<T, U> of(T first, U second) {
|
return new Pair<>(first, second);
|
}
|
|
public T getFirst() {
|
return first;
|
}
|
|
public U getSecond() {
|
return second;
|
}
|
}
|
|
private TaskStepDetail createStepRecord(MultiDeviceTask task, DeviceConfig device, int order) {
|
TaskStepDetail step = new TaskStepDetail();
|
step.setTaskId(task.getTaskId());
|
step.setStepOrder(order);
|
step.setDeviceId(String.valueOf(device.getId()));
|
step.setStepName(device.getDeviceName());
|
step.setStatus(TaskStepDetail.Status.PENDING.name());
|
step.setRetryCount(0);
|
taskStepDetailMapper.insert(step);
|
return step;
|
}
|
|
private StepResult executeStep(MultiDeviceTask task,
|
TaskStepDetail step,
|
DeviceConfig device,
|
TaskExecutionContext context) {
|
return executeStepWithRetry(task, step, device, context, getRetryPolicy(device));
|
}
|
|
/**
|
* 带重试的步骤执行
|
*/
|
private StepResult executeStepWithRetry(MultiDeviceTask task,
|
TaskStepDetail step,
|
DeviceConfig device,
|
TaskExecutionContext context,
|
RetryPolicy retryPolicy) {
|
Date startTime = new Date();
|
step.setStartTime(startTime);
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
step.setRetryCount(0);
|
|
DeviceInteraction deviceInteraction = interactionRegistry.getInteraction(device.getDeviceType());
|
if (deviceInteraction != null) {
|
return executeInteractionStepWithRetry(task, step, device, context, deviceInteraction, retryPolicy);
|
}
|
|
Map<String, Object> params = buildOperationParams(device, context);
|
step.setInputData(toJson(params));
|
taskStepDetailMapper.updateById(step);
|
|
String operation = determineOperation(device, params);
|
DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType());
|
|
int retryAttempt = 0;
|
Exception lastException = null;
|
|
while (retryAttempt <= retryPolicy.getMaxRetryCount()) {
|
try {
|
if (retryAttempt > 0) {
|
// 重试前等待
|
long waitTime = retryPolicy.calculateRetryInterval(retryAttempt);
|
log.info("步骤执行重试: deviceId={}, operation={}, retryAttempt={}/{}, waitTime={}ms",
|
device.getId(), operation, retryAttempt, retryPolicy.getMaxRetryCount(), waitTime);
|
Thread.sleep(waitTime);
|
|
// 更新步骤状态
|
step.setRetryCount(retryAttempt);
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
step.setStartTime(new Date());
|
taskStepDetailMapper.updateById(step);
|
}
|
|
DevicePlcVO.OperationResult result;
|
if (handler == null) {
|
result = deviceInteractionService.executeOperation(device.getId(), operation, params);
|
} else {
|
result = handler.execute(device, operation, params);
|
}
|
|
boolean opSuccess = Boolean.TRUE.equals(result.getSuccess());
|
updateStepAfterOperation(step, result, opSuccess);
|
updateTaskProgress(task, step.getStepOrder(), opSuccess);
|
|
// 通知步骤更新
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
|
if (opSuccess) {
|
updateContextAfterSuccess(device, context, params);
|
|
// 同步设备状态
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.COMPLETED, context);
|
|
return StepResult.success(device.getDeviceName(), result.getMessage());
|
} else {
|
// 业务失败,判断是否可重试
|
if (retryAttempt < retryPolicy.getMaxRetryCount() && isRetryableFailure(result)) {
|
retryAttempt++;
|
lastException = new RuntimeException(result.getMessage());
|
log.warn("步骤执行失败,准备重试: deviceId={}, operation={}, retryAttempt={}, message={}",
|
device.getId(), operation, retryAttempt, result.getMessage());
|
continue;
|
}
|
|
// 同步失败状态
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
|
return StepResult.failure(device.getDeviceName(), result.getMessage());
|
}
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
log.error("步骤执行被中断, deviceId={}, operation={}", device.getId(), operation, e);
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage("执行被中断: " + e.getMessage());
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
step.setRetryCount(retryAttempt);
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
return StepResult.failure(device.getDeviceName(), "执行被中断");
|
} catch (Exception e) {
|
lastException = e;
|
log.error("设备操作异常, deviceId={}, operation={}, retryAttempt={}",
|
device.getId(), operation, retryAttempt, e);
|
|
// 判断是否可重试
|
if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) {
|
retryAttempt++;
|
log.warn("步骤执行异常,准备重试: deviceId={}, operation={}, retryAttempt={}, exception={}",
|
device.getId(), operation, retryAttempt, e.getClass().getSimpleName());
|
continue;
|
}
|
|
// 不可重试或达到最大重试次数
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(e.getMessage());
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
step.setRetryCount(retryAttempt);
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
|
// 通知步骤更新
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
|
// 同步失败状态
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
|
String errorMsg = retryAttempt > 0
|
? String.format("执行失败(已重试%d次): %s", retryAttempt, e.getMessage())
|
: e.getMessage();
|
return StepResult.failure(device.getDeviceName(), errorMsg);
|
}
|
}
|
|
// 达到最大重试次数
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(lastException != null ? lastException.getMessage() : "执行失败");
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
step.setRetryCount(retryAttempt);
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
|
// 通知步骤更新
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
|
return StepResult.failure(device.getDeviceName(),
|
String.format("执行失败(已重试%d次)", retryAttempt));
|
}
|
|
/**
|
* 带重试的交互步骤执行
|
*/
|
private StepResult executeInteractionStepWithRetry(MultiDeviceTask task,
|
TaskStepDetail step,
|
DeviceConfig device,
|
TaskExecutionContext context,
|
DeviceInteraction deviceInteraction,
|
RetryPolicy retryPolicy) {
|
int retryAttempt = 0;
|
Exception lastException = null;
|
|
while (retryAttempt <= retryPolicy.getMaxRetryCount()) {
|
try {
|
if (retryAttempt > 0) {
|
long waitTime = retryPolicy.calculateRetryInterval(retryAttempt);
|
log.info("交互步骤执行重试: deviceId={}, retryAttempt={}/{}, waitTime={}ms",
|
device.getId(), retryAttempt, retryPolicy.getMaxRetryCount(), waitTime);
|
Thread.sleep(waitTime);
|
|
step.setRetryCount(retryAttempt);
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
step.setStartTime(new Date());
|
taskStepDetailMapper.updateById(step);
|
}
|
|
InteractionContext interactionContext = new InteractionContext(device, context);
|
step.setInputData(toJson(context.getParameters()));
|
InteractionResult interactionResult = deviceInteraction.execute(interactionContext);
|
boolean success = interactionResult != null && interactionResult.isSuccess();
|
updateStepAfterInteraction(step, interactionResult);
|
updateTaskProgress(task, step.getStepOrder(), success);
|
|
if (success) {
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.COMPLETED, context);
|
return StepResult.success(device.getDeviceName(), interactionResult.getMessage());
|
} else {
|
if (retryAttempt < retryPolicy.getMaxRetryCount()) {
|
retryAttempt++;
|
continue;
|
}
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
String message = interactionResult != null ? interactionResult.getMessage() : "交互执行失败";
|
return StepResult.failure(device.getDeviceName(), message);
|
}
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
log.error("交互步骤执行被中断, deviceId={}", device.getId(), e);
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage("执行被中断: " + e.getMessage());
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
step.setRetryCount(retryAttempt);
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
return StepResult.failure(device.getDeviceName(), "执行被中断");
|
} catch (Exception e) {
|
lastException = e;
|
log.error("交互执行异常, deviceId={}, retryAttempt={}", device.getId(), retryAttempt, e);
|
|
if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) {
|
retryAttempt++;
|
continue;
|
}
|
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(e.getMessage());
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
step.setRetryCount(retryAttempt);
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
|
// 通知步骤更新
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
|
String errorMsg = retryAttempt > 0
|
? String.format("执行失败(已重试%d次): %s", retryAttempt, e.getMessage())
|
: e.getMessage();
|
return StepResult.failure(device.getDeviceName(), errorMsg);
|
}
|
}
|
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(lastException != null ? lastException.getMessage() : "交互执行失败");
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
step.setRetryCount(retryAttempt);
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
|
// 通知步骤更新
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
|
return StepResult.failure(device.getDeviceName(),
|
String.format("执行失败(已重试%d次)", retryAttempt));
|
}
|
|
/**
|
* 获取重试策略
|
*/
|
private RetryPolicy getRetryPolicy(DeviceConfig device) {
|
// 可以从设备配置中读取重试策略
|
// 暂时使用默认策略
|
return RetryPolicy.defaultPolicy();
|
}
|
|
/**
|
* 判断业务失败是否可重试
|
*/
|
private boolean isRetryableFailure(DevicePlcVO.OperationResult result) {
|
if (result == null || result.getMessage() == null) {
|
return false;
|
}
|
String message = result.getMessage().toLowerCase();
|
// 网络错误、超时错误可重试
|
return message.contains("timeout") ||
|
message.contains("connection") ||
|
message.contains("网络") ||
|
message.contains("超时");
|
}
|
|
|
private void updateStepAfterOperation(TaskStepDetail step,
|
DevicePlcVO.OperationResult result,
|
boolean success) {
|
step.setEndTime(new Date());
|
if (step.getStartTime() != null) {
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
}
|
step.setStatus(success ? TaskStepDetail.Status.COMPLETED.name() : TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(success ? null : result.getMessage());
|
step.setOutputData(toJson(result));
|
taskStepDetailMapper.updateById(step);
|
}
|
|
private void updateStepAfterInteraction(TaskStepDetail step,
|
InteractionResult result) {
|
step.setEndTime(new Date());
|
if (step.getStartTime() != null) {
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
}
|
boolean success = result != null && result.isSuccess();
|
step.setStatus(success ? TaskStepDetail.Status.COMPLETED.name() : TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(success ? null : (result != null ? result.getMessage() : "交互执行失败"));
|
step.setOutputData(result != null ? toJson(result.getData()) : "{}");
|
taskStepDetailMapper.updateById(step);
|
}
|
|
private void updateTaskProgress(MultiDeviceTask task, int currentStep, boolean success) {
|
task.setCurrentStep(currentStep);
|
if (!success) {
|
task.setStatus(MultiDeviceTask.Status.FAILED.name());
|
}
|
LambdaUpdateWrapper<MultiDeviceTask> update = Wrappers.<MultiDeviceTask>lambdaUpdate()
|
.eq(MultiDeviceTask::getId, task.getId())
|
.set(MultiDeviceTask::getCurrentStep, currentStep);
|
if (!success) {
|
update.set(MultiDeviceTask::getStatus, MultiDeviceTask.Status.FAILED.name());
|
}
|
multiDeviceTaskMapper.update(null, update);
|
|
// 通知任务状态更新
|
notificationService.notifyTaskStatus(task);
|
}
|
|
private String determineOperation(DeviceConfig device, Map<String, Object> params) {
|
if (params != null && params.containsKey("operation")) {
|
Object op = params.get("operation");
|
if (op != null) {
|
return String.valueOf(op);
|
}
|
}
|
return DEFAULT_OPERATIONS.getOrDefault(device.getDeviceType(), "feedGlass");
|
}
|
|
private Map<String, Object> buildOperationParams(DeviceConfig device, TaskExecutionContext context) {
|
Map<String, Object> params = new HashMap<>();
|
TaskParameters taskParams = context.getParameters();
|
|
switch (device.getDeviceType()) {
|
case DeviceConfig.DeviceType.LOAD_VEHICLE:
|
params.put("glassIds", new ArrayList<>(taskParams.getGlassIds()));
|
if (StringUtils.hasText(taskParams.getPositionCode())) {
|
params.put("positionCode", taskParams.getPositionCode());
|
}
|
if (taskParams.getPositionValue() != null) {
|
params.put("positionValue", taskParams.getPositionValue());
|
}
|
params.put("triggerRequest", true);
|
break;
|
case DeviceConfig.DeviceType.LARGE_GLASS:
|
List<String> source = context.getSafeLoadedGlassIds();
|
if (CollectionUtils.isEmpty(source)) {
|
source = taskParams.getGlassIds();
|
}
|
if (!CollectionUtils.isEmpty(source)) {
|
params.put("glassId", source.get(0));
|
params.put("glassIds", new ArrayList<>(source));
|
}
|
params.put("processType", taskParams.getProcessType() != null ? taskParams.getProcessType() : 1);
|
params.put("triggerRequest", true);
|
break;
|
case DeviceConfig.DeviceType.GLASS_STORAGE:
|
List<String> processed = context.getSafeProcessedGlassIds();
|
if (CollectionUtils.isEmpty(processed)) {
|
processed = context.getSafeLoadedGlassIds();
|
}
|
if (!CollectionUtils.isEmpty(processed)) {
|
params.put("glassId", processed.get(0));
|
params.put("glassIds", new ArrayList<>(processed));
|
}
|
if (taskParams.getStoragePosition() != null) {
|
params.put("storagePosition", taskParams.getStoragePosition());
|
}
|
params.put("triggerRequest", true);
|
break;
|
default:
|
if (!CollectionUtils.isEmpty(taskParams.getExtra())) {
|
params.putAll(taskParams.getExtra());
|
}
|
}
|
|
mergeOverrides(device, taskParams, params);
|
return params;
|
}
|
|
private void mergeOverrides(DeviceConfig device, TaskParameters taskParameters, Map<String, Object> params) {
|
if (CollectionUtils.isEmpty(taskParameters.getDeviceOverrides())) {
|
return;
|
}
|
Map<String, Object> override = taskParameters.getDeviceOverrides().get(device.getDeviceType());
|
if (override == null && StringUtils.hasText(device.getDeviceCode())) {
|
override = taskParameters.getDeviceOverrides().get(device.getDeviceCode());
|
}
|
if (override != null) {
|
params.putAll(override);
|
}
|
}
|
|
private void updateContextAfterSuccess(DeviceConfig device,
|
TaskExecutionContext context,
|
Map<String, Object> params) {
|
List<String> glassIds = extractGlassIds(params);
|
|
switch (device.getDeviceType()) {
|
case DeviceConfig.DeviceType.LOAD_VEHICLE:
|
context.setLoadedGlassIds(glassIds);
|
// 数据传递:上大车 -> 下一个设备
|
if (!CollectionUtils.isEmpty(glassIds)) {
|
Map<String, Object> transferData = new HashMap<>();
|
transferData.put("glassIds", glassIds);
|
transferData.put("sourceDevice", device.getDeviceCode());
|
// 这里简化处理,实际应该找到下一个设备
|
// 在串行模式下,下一个设备会在循环中自动获取
|
}
|
break;
|
case DeviceConfig.DeviceType.LARGE_GLASS:
|
context.setProcessedGlassIds(glassIds);
|
// 数据传递:大理片 -> 下一个设备
|
if (!CollectionUtils.isEmpty(glassIds)) {
|
Map<String, Object> transferData = new HashMap<>();
|
transferData.put("glassIds", glassIds);
|
transferData.put("sourceDevice", device.getDeviceCode());
|
}
|
break;
|
default:
|
break;
|
}
|
}
|
|
private List<String> extractGlassIds(Map<String, Object> params) {
|
if (params == null) {
|
return Collections.emptyList();
|
}
|
Object glassIds = params.get("glassIds");
|
if (glassIds instanceof List) {
|
@SuppressWarnings("unchecked")
|
List<String> cast = (List<String>) glassIds;
|
return new ArrayList<>(cast);
|
}
|
Object glassId = params.get("glassId");
|
if (glassId != null) {
|
return Collections.singletonList(String.valueOf(glassId));
|
}
|
return Collections.emptyList();
|
}
|
|
private String toJson(Object value) {
|
try {
|
return objectMapper.writeValueAsString(value);
|
} catch (JsonProcessingException e) {
|
return "{}";
|
}
|
}
|
|
private static class StepResult {
|
private final boolean success;
|
private final String message;
|
private final String deviceName;
|
|
private StepResult(boolean success, String message, String deviceName) {
|
this.success = success;
|
this.message = message;
|
this.deviceName = deviceName;
|
}
|
|
public static StepResult success(String deviceName, String message) {
|
return new StepResult(true, message, deviceName);
|
}
|
|
public static StepResult failure(String deviceName, String message) {
|
return new StepResult(false, message, deviceName);
|
}
|
|
public boolean isSuccess() {
|
return success;
|
}
|
|
public String getMessage() {
|
return message;
|
}
|
|
public Map<String, Object> toSummary() {
|
Map<String, Object> summary = new HashMap<>();
|
summary.put("deviceName", deviceName);
|
summary.put("success", success);
|
summary.put("message", message);
|
return summary;
|
}
|
}
|
}
|