package com.mes.task.service;
|
|
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.mes.device.entity.DeviceConfig;
|
import com.mes.device.entity.DeviceGroupConfig;
|
import com.mes.device.service.DeviceCoordinationService;
|
import com.mes.device.service.DeviceInteractionService;
|
import com.mes.device.service.GlassInfoService;
|
import com.mes.device.vo.DevicePlcVO;
|
import com.mes.interaction.DeviceInteraction;
|
import com.mes.interaction.DeviceInteractionRegistry;
|
import com.mes.interaction.DeviceLogicHandler;
|
import com.mes.interaction.DeviceLogicHandlerFactory;
|
import com.mes.interaction.base.InteractionContext;
|
import com.mes.interaction.base.InteractionResult;
|
import com.mes.task.dto.TaskParameters;
|
import com.mes.task.entity.MultiDeviceTask;
|
import com.mes.task.entity.TaskStepDetail;
|
import com.mes.task.mapper.MultiDeviceTaskMapper;
|
import com.mes.task.mapper.TaskStepDetailMapper;
|
import com.mes.task.model.RetryPolicy;
|
import com.mes.task.model.TaskExecutionContext;
|
import com.mes.task.model.TaskExecutionResult;
|
import lombok.RequiredArgsConstructor;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.stereotype.Component;
|
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.StringUtils;
|
|
import java.util.*;
|
import java.util.concurrent.*;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
/**
|
* 多设备任务执行引擎
|
* 支持串行和并行两种执行模式
|
*/
|
@Slf4j
|
@Component
|
@RequiredArgsConstructor
|
public class TaskExecutionEngine {
|
|
private static final Map<String, String> DEFAULT_OPERATIONS = new HashMap<>();
|
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<Map<String, Object>>() {};
|
private static final int SCANNER_LOOKBACK_MINUTES = 2;
|
private static final int SCANNER_LOOKBACK_LIMIT = 20;
|
|
// 执行模式常量
|
private static final String EXECUTION_MODE_SERIAL = "SERIAL";
|
private static final String EXECUTION_MODE_PARALLEL = "PARALLEL";
|
|
static {
|
DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.LOAD_VEHICLE, "feedGlass");
|
DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.LARGE_GLASS, "processGlass");
|
DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.WORKSTATION_SCANNER, "scanOnce");
|
DEFAULT_OPERATIONS.put(DeviceConfig.DeviceType.WORKSTATION_TRANSFER, "checkAndProcess");
|
}
|
|
private final TaskStepDetailMapper taskStepDetailMapper;
|
private final MultiDeviceTaskMapper multiDeviceTaskMapper;
|
private final DeviceInteractionService deviceInteractionService;
|
private final DeviceInteractionRegistry interactionRegistry;
|
private final DeviceLogicHandlerFactory handlerFactory;
|
private final DeviceCoordinationService deviceCoordinationService;
|
private final TaskStatusNotificationService notificationService;
|
private final ObjectMapper objectMapper;
|
@Qualifier("deviceGlassInfoService")
|
private final GlassInfoService glassInfoService;
|
|
// 线程池用于并行执行
|
private final ExecutorService executorService = Executors.newCachedThreadPool(r -> {
|
Thread t = new Thread(r, "TaskExecutionEngine-Parallel");
|
t.setDaemon(true);
|
return t;
|
});
|
|
// 定时器线程池:用于设备定时扫描
|
private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(10, r -> {
|
Thread t = new Thread(r, "TaskExecutionEngine-Scheduled");
|
t.setDaemon(true);
|
return t;
|
});
|
|
// 存储每个任务的定时器任务:taskId -> List<ScheduledFuture>
|
private final Map<String, List<ScheduledFuture<?>>> taskScheduledTasks = new ConcurrentHashMap<>();
|
// 记录正在运行任务的上下文,便于取消任务时访问
|
private final Map<String, TaskExecutionContext> runningTaskContexts = new ConcurrentHashMap<>();
|
|
public TaskExecutionResult execute(MultiDeviceTask task,
|
DeviceGroupConfig groupConfig,
|
List<DeviceConfig> devices,
|
TaskParameters parameters) {
|
|
if (CollectionUtils.isEmpty(devices)) {
|
return TaskExecutionResult.failure("设备组未配置设备,无法执行任务", Collections.emptyMap());
|
}
|
|
TaskExecutionContext context = new TaskExecutionContext(parameters);
|
runningTaskContexts.put(task.getTaskId(), context);
|
// 将本次任务涉及的设备列表存入上下文,便于取消任务时做设备级收尾(如停止大车内部监控定时器)
|
context.getSharedData().put("devices", devices);
|
|
task.setTotalSteps(devices.size());
|
task.setStatus(MultiDeviceTask.Status.RUNNING.name());
|
multiDeviceTaskMapper.updateById(task);
|
|
// 通知任务开始执行
|
notificationService.notifyTaskStatus(task);
|
|
// 确定执行模式
|
String executionMode = determineExecutionMode(groupConfig);
|
Integer maxConcurrent = getMaxConcurrentDevices(groupConfig);
|
|
log.info("任务执行模式: {}, 最大并发数: {}, 设备数: {}", executionMode, maxConcurrent, devices.size());
|
|
List<Map<String, Object>> stepSummaries;
|
boolean success;
|
String failureMessage;
|
|
if (EXECUTION_MODE_PARALLEL.equals(executionMode)) {
|
// 并行执行模式
|
stepSummaries = new ArrayList<>(Collections.nCopies(devices.size(), null));
|
Pair<Boolean, String> result = executeParallel(task, devices, context, stepSummaries, maxConcurrent);
|
success = result.getFirst();
|
failureMessage = result.getSecond();
|
} else {
|
// 串行执行模式(默认)
|
stepSummaries = new ArrayList<>();
|
success = true;
|
failureMessage = null;
|
|
TaskParameters params = context.getParameters();
|
boolean hasGlassIds = !CollectionUtils.isEmpty(params.getGlassIds());
|
boolean triggerFirst = Boolean.TRUE.equals(params.getTriggerRequestFirst());
|
|
int currentOrder = 1;
|
// 统计大车设备数量,用于区分进片大车和出片大车
|
int loadVehicleCount = 0;
|
for (DeviceConfig device : devices) {
|
if (DeviceConfig.DeviceType.LOAD_VEHICLE.equals(device.getDeviceType())) {
|
loadVehicleCount++;
|
}
|
}
|
int currentLoadVehicleIndex = 0;
|
|
for (DeviceConfig device : devices) {
|
String deviceType = device.getDeviceType();
|
log.debug("处理设备: deviceId={}, deviceType={}, deviceName={}, WORKSTATION_SCANNER常量={}, equals={}",
|
device.getId(), deviceType, device.getDeviceName(),
|
DeviceConfig.DeviceType.WORKSTATION_SCANNER,
|
DeviceConfig.DeviceType.WORKSTATION_SCANNER.equals(deviceType));
|
boolean isLoadVehicle = DeviceConfig.DeviceType.LOAD_VEHICLE.equals(deviceType);
|
boolean isScanner = DeviceConfig.DeviceType.WORKSTATION_SCANNER.equals(deviceType)
|
|| (deviceType != null && (deviceType.contains("扫码") || deviceType.contains("SCANNER")));
|
boolean isLargeGlass = DeviceConfig.DeviceType.LARGE_GLASS.equals(deviceType);
|
boolean isTransfer = DeviceConfig.DeviceType.WORKSTATION_TRANSFER.equals(deviceType);
|
log.debug("设备类型判断: deviceId={}, isLoadVehicle={}, isScanner={}, isLargeGlass={}, isTransfer={}",
|
device.getId(), isLoadVehicle, isScanner, isLargeGlass, isTransfer);
|
|
// 1. 卧转立扫码设备:启动定时器扫描(每10秒处理一个玻璃ID)
|
if (isScanner) {
|
log.debug("检测到扫码设备,准备启动定时器: deviceId={}, deviceType={}, deviceName={}",
|
device.getId(), device.getDeviceType(), device.getDeviceName());
|
TaskStepDetail step = createStepRecord(task, device, currentOrder);
|
|
ScheduledFuture<?> scannerTask = startScannerTimer(task, step, device, context);
|
if (scannerTask != null) {
|
registerScheduledTask(task.getTaskId(), scannerTask);
|
stepSummaries.add(createStepSummary(device.getDeviceName(), true, "定时器已启动,每10秒扫描一次"));
|
log.debug("扫码设备定时器启动成功: deviceId={}, taskId={}", device.getId(), task.getTaskId());
|
} else {
|
log.warn("扫码设备定时器启动失败,glassIds可能为空: deviceId={}, taskId={}, contextParams={}",
|
device.getId(), task.getTaskId(), context.getParameters());
|
stepSummaries.add(createStepSummary(device.getDeviceName(), false, "启动定时器失败"));
|
success = false;
|
failureMessage = "卧转立扫码设备启动定时器失败";
|
break;
|
}
|
currentOrder++;
|
continue;
|
}
|
|
// 2. 卧转立设备:启动定时器定期检查并处理(中转设备)
|
if (isTransfer) {
|
log.debug("检测到卧转立设备,准备启动定时器: deviceId={}, deviceType={}, deviceName={}",
|
device.getId(), device.getDeviceType(), device.getDeviceName());
|
TaskStepDetail step = createStepRecord(task, device, currentOrder);
|
|
ScheduledFuture<?> transferTask = startTransferTimer(task, step, device, context);
|
if (transferTask != null) {
|
registerScheduledTask(task.getTaskId(), transferTask);
|
stepSummaries.add(createStepSummary(device.getDeviceName(), true, "定时器已启动,定期检查并处理玻璃批次"));
|
log.debug("卧转立设备定时器启动成功: deviceId={}, taskId={}", device.getId(), task.getTaskId());
|
} else {
|
log.warn("卧转立设备定时器启动失败: deviceId={}, taskId={}", device.getId(), task.getTaskId());
|
stepSummaries.add(createStepSummary(device.getDeviceName(), false, "启动定时器失败"));
|
success = false;
|
failureMessage = "卧转立设备启动定时器失败";
|
break;
|
}
|
currentOrder++;
|
continue;
|
}
|
|
// 3. 进片大车设备:启动定时器持续监控容量(第一个大车设备)
|
if (isLoadVehicle) {
|
currentLoadVehicleIndex++;
|
boolean isInboundVehicle = currentLoadVehicleIndex == 1; // 第一个大车是进片大车
|
|
TaskStepDetail step = createStepRecord(task, device, currentOrder);
|
|
ScheduledFuture<?> vehicleTask;
|
if (isInboundVehicle) {
|
// 进片大车:监控容量,动态判断
|
vehicleTask = startInboundVehicleTimer(task, step, device, context);
|
if (vehicleTask != null) {
|
registerScheduledTask(task.getTaskId(), vehicleTask);
|
stepSummaries.add(createStepSummary(device.getDeviceName(), true, "进片大车定时器已启动,持续监控容量"));
|
} else {
|
stepSummaries.add(createStepSummary(device.getDeviceName(), false, "启动定时器失败"));
|
success = false;
|
failureMessage = "进片大车设备启动定时器失败";
|
break;
|
}
|
} else {
|
// 出片大车:启动定时器监控出片任务
|
vehicleTask = startOutboundVehicleTimer(task, step, device, context);
|
if (vehicleTask != null) {
|
registerScheduledTask(task.getTaskId(), vehicleTask);
|
stepSummaries.add(createStepSummary(device.getDeviceName(), true, "出片大车定时器已启动,持续监控出片任务"));
|
} else {
|
stepSummaries.add(createStepSummary(device.getDeviceName(), false, "启动定时器失败"));
|
success = false;
|
failureMessage = "出片大车设备启动定时器失败";
|
break;
|
}
|
}
|
currentOrder++;
|
continue;
|
}
|
|
// 4. 大理片笼设备:启动定时器逻辑处理(不涉及PLC交互,只负责逻辑处理)
|
if (isLargeGlass) {
|
TaskStepDetail step = createStepRecord(task, device, currentOrder);
|
|
ScheduledFuture<?> largeGlassTask = startLargeGlassTimer(task, step, device, context);
|
if (largeGlassTask != null) {
|
registerScheduledTask(task.getTaskId(), largeGlassTask);
|
stepSummaries.add(createStepSummary(device.getDeviceName(), true, "大理片笼定时器已启动,逻辑处理中"));
|
} else {
|
stepSummaries.add(createStepSummary(device.getDeviceName(), false, "启动定时器失败"));
|
success = false;
|
failureMessage = "大理片笼设备启动定时器失败";
|
break;
|
}
|
currentOrder++;
|
continue;
|
}
|
|
// 其他设备:正常执行
|
TaskStepDetail step = createStepRecord(task, device, currentOrder);
|
StepResult stepResult = executeStep(task, step, device, context);
|
stepSummaries.add(stepResult.toSummary());
|
if (!stepResult.isSuccess()) {
|
success = false;
|
failureMessage = stepResult.getMessage();
|
break;
|
}
|
currentOrder++;
|
}
|
|
// 如果所有设备都是定时器模式,任务保持运行状态,不等待完成
|
// 定时器会在后台持续运行,直到手动停止或超时
|
boolean hasScheduledTasks = !CollectionUtils.isEmpty(taskScheduledTasks.get(task.getTaskId()));
|
if (hasScheduledTasks) {
|
log.debug("任务已启动所有定时器,保持运行状态: taskId={}, scheduledTasksCount={}",
|
task.getTaskId(), taskScheduledTasks.get(task.getTaskId()).size());
|
// 任务保持 RUNNING 状态,定时器在后台运行
|
// 不更新任务状态为 COMPLETED,让任务持续运行
|
Map<String, Object> payload = new HashMap<>();
|
payload.put("steps", stepSummaries);
|
payload.put("groupId", groupConfig.getId());
|
payload.put("deviceCount", devices.size());
|
payload.put("executionMode", executionMode);
|
payload.put("message", "任务已启动,定时器在后台运行中");
|
|
// 通知任务状态(保持 RUNNING)
|
notificationService.notifyTaskStatus(task);
|
|
if (success) {
|
return TaskExecutionResult.success(payload);
|
}
|
return TaskExecutionResult.failure(failureMessage != null ? failureMessage : "任务执行失败", payload);
|
}
|
|
// 如果没有定时器任务,等待所有步骤完成
|
// 这种情况通常不会发生,因为所有设备都是定时器模式
|
}
|
|
Map<String, Object> payload = new HashMap<>();
|
payload.put("steps", stepSummaries);
|
payload.put("groupId", groupConfig.getId());
|
payload.put("deviceCount", devices.size());
|
payload.put("executionMode", executionMode);
|
|
// 停止所有定时器任务
|
stopScheduledTasks(task.getTaskId());
|
|
boolean cancelled = isTaskCancelled(context);
|
// 更新任务最终状态
|
if (cancelled) {
|
task.setStatus(MultiDeviceTask.Status.CANCELLED.name());
|
task.setErrorMessage("任务已取消");
|
} else if (success) {
|
task.setStatus(MultiDeviceTask.Status.COMPLETED.name());
|
} else {
|
task.setStatus(MultiDeviceTask.Status.FAILED.name());
|
task.setErrorMessage(failureMessage);
|
}
|
task.setEndTime(new Date());
|
multiDeviceTaskMapper.updateById(task);
|
|
// 通知任务完成
|
notificationService.notifyTaskStatus(task);
|
|
if (success) {
|
return TaskExecutionResult.success(payload);
|
}
|
return TaskExecutionResult.failure(failureMessage != null ? failureMessage : "任务执行失败", payload);
|
}
|
|
/**
|
* 请求取消任务:停止所有定时器并标记上下文
|
*/
|
public void requestTaskCancellation(String taskId) {
|
TaskExecutionContext context = runningTaskContexts.get(taskId);
|
if (context != null) {
|
context.getSharedData().put("taskCancelled", true);
|
log.warn("已标记任务取消: taskId={}", taskId);
|
|
// 同时通知相关设备逻辑处理器执行取消收尾逻辑(例如停止大车内部的监控定时器)
|
try {
|
Map<String, Object> cancelParams = new HashMap<>();
|
cancelParams.put("_taskContext", context);
|
Object devicesObj = context.getSharedData().get("devices");
|
if (devicesObj instanceof List) {
|
@SuppressWarnings("unchecked")
|
List<DeviceConfig> devices = (List<DeviceConfig>) devicesObj;
|
for (DeviceConfig device : devices) {
|
if (device == null) {
|
continue;
|
}
|
try {
|
DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType());
|
if (handler != null) {
|
// 目前大车逻辑处理器会在reset/clear等操作中停止内部监控定时器
|
// 这里统一调用一次“reset”作为任务取消时的收尾动作
|
handler.execute(device, "reset", cancelParams);
|
}
|
} catch (Exception e) {
|
log.warn("任务取消时执行设备收尾(reset)失败: taskId={}, deviceId={}, error={}",
|
taskId, device.getId(), e.getMessage());
|
}
|
}
|
}
|
} catch (Exception e) {
|
log.warn("任务取消时执行设备收尾逻辑异常: taskId={}, error={}", taskId, e.getMessage());
|
}
|
} else {
|
log.warn("请求取消任务但未找到上下文: taskId={}", taskId);
|
}
|
stopScheduledTasks(taskId);
|
}
|
|
/**
|
* 启动卧转立扫码设备定时器:每10秒处理一个玻璃ID
|
*/
|
private ScheduledFuture<?> startScannerTimer(MultiDeviceTask task,
|
TaskStepDetail step,
|
DeviceConfig device,
|
TaskExecutionContext context) {
|
try {
|
TaskParameters params = context.getParameters();
|
List<String> glassIds = params.getGlassIds();
|
log.debug("卧转立扫码定时器初始化: taskId={}, deviceId={}, glassIds={}, glassIdsSize={}, isEmpty={}",
|
task.getTaskId(), device.getId(), glassIds,
|
glassIds != null ? glassIds.size() : 0,
|
CollectionUtils.isEmpty(glassIds));
|
if (CollectionUtils.isEmpty(glassIds)) {
|
log.warn("卧转立扫码设备没有玻璃ID,定时器不启动: deviceId={}", device.getId());
|
return null;
|
}
|
|
// 创建待处理玻璃ID队列
|
Queue<String> glassIdQueue = new ConcurrentLinkedQueue<>(glassIds);
|
AtomicInteger processedCount = new AtomicInteger(0);
|
AtomicInteger successCount = new AtomicInteger(0);
|
AtomicInteger failCount = new AtomicInteger(0);
|
|
// 从设备配置中获取扫码间隔,默认10秒
|
Map<String, Object> logicParams = parseLogicParams(device);
|
Integer scanIntervalMs = getLogicParam(logicParams, "scanIntervalMs", 10_000);
|
|
log.debug("启动卧转立扫码定时器: taskId={}, deviceId={}, glassCount={}, interval={}ms, glassIds={}",
|
task.getTaskId(), device.getId(), glassIds.size(), scanIntervalMs, glassIds);
|
|
// 启动定时任务
|
ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(() -> {
|
try {
|
if (isTaskCancelled(context)) {
|
log.debug("任务已取消,停止卧转立扫码定时器: taskId={}, deviceId={}",
|
task.getTaskId(), device.getId());
|
return;
|
}
|
// 定时器第一次执行时,将设备状态从 WAITING 设置为 RUNNING
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.RUNNING, context);
|
ensureStepRunning(step, task.getTaskId());
|
// 检查是否需要暂停
|
if (shouldPauseScanner(context)) {
|
log.debug("卧转立扫码定时器暂停: taskId={}, deviceId={}", task.getTaskId(), device.getId());
|
return;
|
}
|
|
// 检查是否还有待处理的玻璃ID
|
String glassId = glassIdQueue.poll();
|
if (glassId == null) {
|
log.debug("卧转立扫码定时器完成: taskId={}, deviceId={}, processed={}/{}, success={}, fail={}",
|
task.getTaskId(), device.getId(), processedCount.get(), glassIds.size(),
|
successCount.get(), failCount.get());
|
|
// 清空plcRequest和plcGlassId(确保PLC状态清理)
|
try {
|
DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType());
|
if (handler != null) {
|
Map<String, Object> clearParams = new HashMap<>();
|
clearParams.put("_taskContext", context);
|
handler.execute(device, "clearPlc", clearParams);
|
log.debug("卧转立扫码定时器完成,已清空PLC请求字段: taskId={}, deviceId={}",
|
task.getTaskId(), device.getId());
|
}
|
} catch (Exception e) {
|
log.warn("卧转立扫码定时器完成时清空PLC失败: taskId={}, deviceId={}, error={}",
|
task.getTaskId(), device.getId(), e.getMessage());
|
}
|
|
// 若之前未出现失败,再将状态置为完成
|
boolean alreadyFailed = TaskStepDetail.Status.FAILED.name().equals(step.getStatus());
|
if (!alreadyFailed) {
|
step.setStatus(TaskStepDetail.Status.COMPLETED.name());
|
step.setSuccessMessage(String.format("已完成扫描: 成功=%d, 失败=%d", successCount.get(), failCount.get()));
|
if (step.getEndTime() == null) {
|
step.setEndTime(new Date());
|
}
|
taskStepDetailMapper.updateById(step);
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
// 扫码设备完成后尝试自动收尾整个任务
|
checkAndCompleteTaskIfDone(step.getTaskId());
|
}
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.COMPLETED, context);
|
return;
|
}
|
|
int currentIndex = processedCount.incrementAndGet();
|
log.debug("卧转立扫码定时器处理第{}/{}个玻璃: taskId={}, deviceId={}, glassId={}",
|
currentIndex, glassIds.size(), task.getTaskId(), device.getId(), glassId);
|
|
// 执行单次扫描
|
Map<String, Object> scanParams = new HashMap<>();
|
scanParams.put("glassId", glassId);
|
scanParams.put("_taskContext", context);
|
log.debug("卧转立扫码定时器准备执行: taskId={}, deviceId={}, glassId={}, scanParams={}",
|
task.getTaskId(), device.getId(), glassId, scanParams);
|
|
DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType());
|
if (handler != null) {
|
// 将logicParams合并到scanParams中(使用已定义的logicParams变量)
|
if (logicParams != null && !logicParams.isEmpty()) {
|
scanParams.put("_logicParams", logicParams);
|
}
|
log.debug("卧转立扫码定时器调用handler.execute: taskId={}, deviceId={}, glassId={}, operation=scanOnce, scanParamsKeys={}, scanParams={}",
|
task.getTaskId(), device.getId(), glassId, scanParams.keySet(), scanParams);
|
DevicePlcVO.OperationResult result = handler.execute(device, "scanOnce", scanParams);
|
log.debug("卧转立扫码定时器handler.execute返回: taskId={}, deviceId={}, glassId={}, success={}",
|
task.getTaskId(), device.getId(), glassId, result.getSuccess());
|
|
if (Boolean.TRUE.equals(result.getSuccess())) {
|
successCount.incrementAndGet();
|
log.debug("卧转立扫码定时器处理成功: taskId={}, deviceId={}, glassId={}",
|
task.getTaskId(), device.getId(), glassId);
|
} else {
|
failCount.incrementAndGet();
|
log.warn("卧转立扫码定时器处理失败: taskId={}, deviceId={}, glassId={}, error={}",
|
task.getTaskId(), device.getId(), glassId, result.getMessage());
|
}
|
|
// 更新步骤状态(显示进度,保持RUNNING状态直到所有玻璃处理完成)
|
updateStepStatusForScanner(step, result, currentIndex, glassIds.size(),
|
successCount.get(), failCount.get());
|
// 通知步骤更新(让前端实时看到步骤状态和进度)
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
boolean opSuccess = Boolean.TRUE.equals(result.getSuccess());
|
updateTaskProgress(task, step.getStepOrder(), opSuccess);
|
if (!opSuccess) {
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
}
|
}
|
} catch (Exception e) {
|
log.error("卧转立扫码定时器执行异常: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e);
|
failCount.incrementAndGet();
|
}
|
}, 0, scanIntervalMs, TimeUnit.MILLISECONDS);
|
|
// 在串行执行模式下,扫码设备是第一个,应该立即设置为 RUNNING
|
// 其他设备保持 WAITING,直到它们真正开始工作
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.RUNNING, context);
|
return future;
|
} catch (Exception e) {
|
log.error("启动卧转立扫码定时器失败: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e);
|
return null;
|
}
|
}
|
|
/**
|
* 启动卧转立设备定时器:定期检查并处理玻璃批次
|
*/
|
private ScheduledFuture<?> startTransferTimer(MultiDeviceTask task,
|
TaskStepDetail step,
|
DeviceConfig device,
|
TaskExecutionContext context) {
|
try {
|
// 从设备配置中获取监控间隔,默认5秒
|
Map<String, Object> logicParams = parseLogicParams(device);
|
Integer monitorIntervalMs = getLogicParam(logicParams, "monitorIntervalMs", 5_000);
|
|
log.debug("启动卧转立设备定时器: taskId={}, deviceId={}, interval={}ms",
|
task.getTaskId(), device.getId(), monitorIntervalMs);
|
|
// 启动定时任务
|
// 使用AtomicBoolean标记是否第一次执行
|
final java.util.concurrent.atomic.AtomicBoolean firstExecution = new java.util.concurrent.atomic.AtomicBoolean(true);
|
|
ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(() -> {
|
try {
|
if (isTaskCancelled(context)) {
|
log.debug("任务已取消,停止卧转立设备定时器: taskId={}, deviceId={}",
|
task.getTaskId(), device.getId());
|
return;
|
}
|
|
// 如果步骤已经完成,不再执行后续逻辑(避免状态被重置)
|
if (TaskStepDetail.Status.COMPLETED.name().equals(step.getStatus())) {
|
log.debug("卧转立设备步骤已完成,停止定时器执行: taskId={}, deviceId={}",
|
task.getTaskId(), device.getId());
|
return;
|
}
|
|
// 构建参数
|
Map<String, Object> params = new HashMap<>();
|
params.put("_taskContext", context);
|
if (logicParams != null && !logicParams.isEmpty()) {
|
params.put("_logicParams", logicParams);
|
}
|
|
// 调用handler执行checkAndProcess
|
DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType());
|
if (handler != null) {
|
DevicePlcVO.OperationResult result = handler.execute(device, "checkAndProcess", params);
|
|
// 检查是否有数据:如果有数据或正在处理,设置为RUNNING;如果缓冲队列为空且无待处理玻璃,保持PENDING
|
String message = result.getMessage();
|
boolean hasData = result.getSuccess() != null && result.getSuccess()
|
&& message != null && !message.contains("缓冲队列为空,无待处理玻璃");
|
|
// 如果当前是PENDING状态,且检测到有数据,则设置为RUNNING(设备开始工作)
|
boolean isPending = TaskStepDetail.Status.PENDING.name().equals(step.getStatus());
|
|
if (hasData && isPending) {
|
// 检测到数据且当前是等待状态,设备开始工作,设置为RUNNING
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.RUNNING, context);
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
if (step.getStartTime() == null) {
|
step.setStartTime(new Date());
|
}
|
taskStepDetailMapper.updateById(step);
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
log.debug("卧转立设备定时器检测到数据,从PENDING转为RUNNING: taskId={}, deviceId={}, message={}",
|
task.getTaskId(), device.getId(), message);
|
} else if (!hasData) {
|
// 没有数据,保持PENDING状态,等待扫码设备输出
|
if (firstExecution.compareAndSet(true, false)) {
|
// 第一次执行,确保状态是PENDING
|
if (!TaskStepDetail.Status.PENDING.name().equals(step.getStatus())) {
|
step.setStatus(TaskStepDetail.Status.PENDING.name());
|
step.setSuccessMessage("等待扫码设备输出数据");
|
taskStepDetailMapper.updateById(step);
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
}
|
log.debug("卧转立设备定时器第一次执行,无数据,保持PENDING: taskId={}, deviceId={}, message={}",
|
task.getTaskId(), device.getId(), message);
|
}
|
return; // 不执行后续逻辑,等待下一次定时器触发
|
}
|
|
// 更新步骤状态(区分等待中和真正完成)
|
updateStepStatusForTransfer(step, result, device, context);
|
// 通知步骤更新(让前端实时看到步骤状态)
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
boolean opSuccess = Boolean.TRUE.equals(result.getSuccess());
|
updateTaskProgress(task, step.getStepOrder(), opSuccess);
|
|
// 根据执行结果更新设备状态
|
// 注意:设备已经开始工作(定时器已执行),所以应该保持RUNNING状态
|
// 只有在真正完成时才设置为COMPLETED
|
if (opSuccess) {
|
// 设备正在工作(等待缓冲、处理中等),保持RUNNING状态
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.RUNNING, context);
|
if (message != null && message.contains("批次已写入PLC")) {
|
log.debug("卧转立设备定时器执行成功(已写入PLC): taskId={}, deviceId={}, message={}",
|
task.getTaskId(), device.getId(), message);
|
} else {
|
log.debug("卧转立设备定时器工作中(等待缓冲): taskId={}, deviceId={}, message={}",
|
task.getTaskId(), device.getId(), message);
|
}
|
} else {
|
log.warn("卧转立设备定时器执行失败: taskId={}, deviceId={}, message={}",
|
task.getTaskId(), device.getId(), result.getMessage());
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
}
|
}
|
} catch (Exception e) {
|
log.error("卧转立设备定时器执行异常: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e);
|
}
|
}, 0, monitorIntervalMs, TimeUnit.MILLISECONDS);
|
|
// 在串行执行模式下,设备启动定时器时先设置为 WAITING,定时器第一次执行时再设置为 RUNNING
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.WAITING, context);
|
return future;
|
} catch (Exception e) {
|
log.error("启动卧转立设备定时器失败: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e);
|
return null;
|
}
|
}
|
|
/**
|
* 启动进片大车设备定时器:持续监控容量,动态判断
|
*/
|
private ScheduledFuture<?> startInboundVehicleTimer(MultiDeviceTask task,
|
TaskStepDetail step,
|
DeviceConfig device,
|
TaskExecutionContext context) {
|
try {
|
final long MONITOR_INTERVAL_MS = 2_000; // 2秒监控一次
|
final AtomicInteger lastProcessedCount = new AtomicInteger(0);
|
|
log.debug("启动进片大车设备定时器: taskId={}, deviceId={}, interval={}s",
|
task.getTaskId(), device.getId(), MONITOR_INTERVAL_MS / 1000);
|
|
// 启动定时任务
|
ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(() -> {
|
try {
|
if (isTaskCancelled(context)) {
|
log.debug("任务已取消,停止进片大车定时器: taskId={}, deviceId={}",
|
task.getTaskId(), device.getId());
|
return;
|
}
|
// 进片大车设备:只有在真正开始处理时才设置为RUNNING
|
// 检查是否有卧转立主体已输出、准备上大车的玻璃信息
|
List<String> readyGlassIds = getTransferReadyGlassIds(context);
|
|
// 如果当前没有新的玻璃,无论步骤是否已进入RUNNING,都应该轮询MES任务/确认状态
|
if (CollectionUtils.isEmpty(readyGlassIds)) {
|
// 轮询MES任务/确认,避免错过MES侧后写入的任务
|
pollMesForVehicle(task, step, device, context);
|
// 如果仍然没有卧转立输出的玻璃,保持/更新为PENDING提示
|
if (!TaskStepDetail.Status.RUNNING.name().equals(step.getStatus())
|
&& !TaskStepDetail.Status.PENDING.name().equals(step.getStatus())) {
|
step.setStatus(TaskStepDetail.Status.PENDING.name());
|
step.setSuccessMessage("等待卧转立输出玻璃");
|
taskStepDetailMapper.updateById(step);
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
}
|
return;
|
}
|
|
// 如果玻璃ID数量没有变化,说明没有新的玻璃
|
int currentCount = readyGlassIds.size();
|
if (currentCount == lastProcessedCount.get()) {
|
// 玻璃数量没有变化:没有新玻璃,但仍需轮询MES任务/确认,避免错过MES侧的变化
|
pollMesForVehicle(task, step, device, context);
|
log.debug("大车设备定时器:玻璃ID数量未变化,继续等待: taskId={}, deviceId={}, count={}",
|
task.getTaskId(), device.getId(), currentCount);
|
return;
|
}
|
|
log.debug("进片大车设备定时器检测到卧转立输出的玻璃信息: taskId={}, deviceId={}, glassCount={}",
|
task.getTaskId(), device.getId(), currentCount);
|
|
// 检查容量
|
Map<String, Object> checkParams = new HashMap<>();
|
checkParams.put("glassIds", new ArrayList<>(readyGlassIds));
|
checkParams.put("_taskContext", context);
|
|
DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType());
|
if (handler != null) {
|
// 将logicParams合并到checkParams中
|
Map<String, Object> logicParams = parseLogicParams(device);
|
if (logicParams != null && !logicParams.isEmpty()) {
|
checkParams.put("_logicParams", logicParams);
|
}
|
// 第一步:写入大车上料请求
|
DevicePlcVO.OperationResult feedResult = handler.execute(device, "feedGlass", checkParams);
|
|
if (Boolean.TRUE.equals(feedResult.getSuccess())) {
|
// 真正开始处理,设置为RUNNING
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.RUNNING, context);
|
// 步骤状态也设置为RUNNING
|
if (!TaskStepDetail.Status.RUNNING.name().equals(step.getStatus())) {
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
if (step.getStartTime() == null) {
|
step.setStartTime(new Date());
|
}
|
taskStepDetailMapper.updateById(step);
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
}
|
log.debug("进片大车设备定时器执行成功: taskId={}, deviceId={}, glassCount={}",
|
task.getTaskId(), device.getId(), readyGlassIds.size());
|
// 将已装载的玻璃ID保存到共享数据中(供大理片笼使用)
|
setLoadedGlassIds(context, new ArrayList<>(readyGlassIds));
|
// 清空卧转立输出的玻璃ID列表(已处理)
|
clearTransferReadyGlassIds(context);
|
lastProcessedCount.set(0);
|
// 确保卧转立扫码继续运行
|
setScannerPause(context, false);
|
|
// feedGlass成功后,先检查MES任务(checkMesTask)来开始执行任务
|
DevicePlcVO.OperationResult mesTaskResult = null;
|
try {
|
mesTaskResult = handler.execute(device, "checkMesTask", Collections.emptyMap());
|
if (mesTaskResult != null && Boolean.TRUE.equals(mesTaskResult.getSuccess())) {
|
log.info("进片大车设备已检查MES任务并开始执行: taskId={}, deviceId={}, message={}",
|
task.getTaskId(), device.getId(), mesTaskResult.getMessage());
|
}
|
} catch (Exception e) {
|
log.warn("进片大车设备检查MES任务异常: taskId={}, deviceId={}, error={}",
|
task.getTaskId(), device.getId(), e.getMessage());
|
}
|
} else {
|
// 装不下,保持WAITING状态和PENDING步骤状态
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.WAITING, context);
|
if (!TaskStepDetail.Status.PENDING.name().equals(step.getStatus())) {
|
step.setStatus(TaskStepDetail.Status.PENDING.name());
|
step.setSuccessMessage("容量不足,等待中");
|
taskStepDetailMapper.updateById(step);
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
}
|
// 装不下,记录容量不足(是否需要影响扫码由工艺再决定)
|
log.warn("进片大车设备定时器容量不足: taskId={}, deviceId={}, message={}",
|
task.getTaskId(), device.getId(), feedResult.getMessage());
|
lastProcessedCount.set(currentCount); // 记录当前数量,避免重复检查
|
}
|
|
// 第二步:检查MES确认状态(如果大车处理器支持的话)
|
// 只有在任务已开始执行(有任务记录)时才检查MES确认
|
DevicePlcVO.OperationResult mesResult = null;
|
try {
|
mesResult = handler.execute(device, "checkMesConfirm", Collections.emptyMap());
|
} catch (Exception e) {
|
log.warn("进片大车设备检查MES确认状态异常: taskId={}, deviceId={}, error={}",
|
task.getTaskId(), device.getId(), e.getMessage());
|
}
|
|
// 更新步骤状态(大车设备保持RUNNING,直到MES确认完成或任务取消)
|
if (mesResult != null) {
|
updateStepStatusForVehicle(step, mesResult);
|
boolean opSuccess = Boolean.TRUE.equals(mesResult.getSuccess());
|
updateTaskProgress(task, step.getStepOrder(), opSuccess);
|
if (!opSuccess) {
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
}
|
} else {
|
updateStepStatusForVehicle(step, feedResult);
|
boolean opSuccess = Boolean.TRUE.equals(feedResult.getSuccess());
|
updateTaskProgress(task, step.getStepOrder(), opSuccess);
|
if (!opSuccess) {
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
}
|
}
|
}
|
} catch (Exception e) {
|
log.error("进片大车设备定时器执行异常: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e);
|
}
|
}, 0, MONITOR_INTERVAL_MS, TimeUnit.MILLISECONDS);
|
|
// 在串行执行模式下,设备启动定时器时先设置为 WAITING,定时器第一次执行时再设置为 RUNNING
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.WAITING, context);
|
return future;
|
} catch (Exception e) {
|
log.error("启动进片大车设备定时器失败: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e);
|
return null;
|
}
|
}
|
|
/**
|
* 启动出片大车设备定时器:持续监控出片任务
|
*/
|
private ScheduledFuture<?> startOutboundVehicleTimer(MultiDeviceTask task,
|
TaskStepDetail step,
|
DeviceConfig device,
|
TaskExecutionContext context) {
|
try {
|
final long MONITOR_INTERVAL_MS = 2_000; // 2秒监控一次
|
|
log.debug("启动出片大车设备定时器: taskId={}, deviceId={}, interval={}s",
|
task.getTaskId(), device.getId(), MONITOR_INTERVAL_MS / 1000);
|
|
// 启动定时任务
|
ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(() -> {
|
try {
|
if (isTaskCancelled(context)) {
|
log.debug("任务已取消,停止出片大车定时器: taskId={}, deviceId={}",
|
task.getTaskId(), device.getId());
|
return;
|
}
|
// 出片大车设备:只有在真正开始处理时才设置为RUNNING
|
// 检查是否有已处理的玻璃信息(从大理片笼来的)
|
List<String> processedGlassIds = getProcessedGlassIds(context);
|
boolean isRunning = TaskStepDetail.Status.RUNNING.name().equals(step.getStatus());
|
|
// 如果设备已经在运行中,即使没有新玻璃,也要继续监控MES任务和确认状态
|
if (CollectionUtils.isEmpty(processedGlassIds)) {
|
if (isRunning) {
|
// 设备正在运行中,先检查MES任务,然后监控MES确认状态
|
DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType());
|
if (handler != null) {
|
Map<String, Object> logicParams = parseLogicParams(device);
|
|
// 先检查MES任务(如果mesSend=1,会创建任务并开始执行)
|
DevicePlcVO.OperationResult mesTaskResult = null;
|
try {
|
mesTaskResult = handler.execute(device, "checkMesTask", Collections.emptyMap());
|
if (mesTaskResult != null && Boolean.TRUE.equals(mesTaskResult.getSuccess())) {
|
log.info("出片大车设备已检查MES任务并开始执行: taskId={}, deviceId={}, message={}",
|
task.getTaskId(), device.getId(), mesTaskResult.getMessage());
|
}
|
} catch (Exception e) {
|
log.warn("出片大车设备检查MES任务异常: taskId={}, deviceId={}, error={}",
|
task.getTaskId(), device.getId(), e.getMessage());
|
}
|
|
// 然后检查MES确认状态(只有在任务已开始执行时才检查)
|
DevicePlcVO.OperationResult mesResult = null;
|
try {
|
mesResult = handler.execute(device, "checkMesConfirm", Collections.emptyMap());
|
} catch (Exception e) {
|
log.warn("出片大车设备检查MES确认状态异常: taskId={}, deviceId={}, error={}",
|
task.getTaskId(), device.getId(), e.getMessage());
|
}
|
|
// 更新步骤状态(大车设备保持RUNNING,直到MES确认完成或任务取消)
|
if (mesResult != null) {
|
updateStepStatusForVehicle(step, mesResult);
|
boolean opSuccess = Boolean.TRUE.equals(mesResult.getSuccess());
|
updateTaskProgress(task, step.getStepOrder(), opSuccess);
|
if (!opSuccess) {
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
}
|
}
|
}
|
return;
|
} else {
|
// 没有数据,且设备未运行,保持PENDING状态
|
if (!TaskStepDetail.Status.PENDING.name().equals(step.getStatus())) {
|
step.setStatus(TaskStepDetail.Status.PENDING.name());
|
step.setSuccessMessage("等待大理片笼处理完成");
|
taskStepDetailMapper.updateById(step);
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
}
|
log.debug("出片大车设备定时器:暂无已处理的玻璃信息: taskId={}, deviceId={}",
|
task.getTaskId(), device.getId());
|
return;
|
}
|
}
|
|
log.debug("出片大车设备定时器检测到已处理的玻璃信息: taskId={}, deviceId={}, glassCount={}",
|
task.getTaskId(), device.getId(), processedGlassIds.size());
|
|
// 执行出片操作
|
Map<String, Object> checkParams = new HashMap<>();
|
checkParams.put("glassIds", new ArrayList<>(processedGlassIds));
|
checkParams.put("_taskContext", context);
|
|
DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType());
|
if (handler != null) {
|
// 将logicParams合并到checkParams中
|
Map<String, Object> logicParams = parseLogicParams(device);
|
if (logicParams != null && !logicParams.isEmpty()) {
|
checkParams.put("_logicParams", logicParams);
|
}
|
// 第一步:写入大车出片请求
|
DevicePlcVO.OperationResult feedResult = handler.execute(device, "feedGlass", checkParams);
|
|
if (Boolean.TRUE.equals(feedResult.getSuccess())) {
|
// 真正开始处理,设置为RUNNING
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.RUNNING, context);
|
// 步骤状态也设置为RUNNING
|
if (!TaskStepDetail.Status.RUNNING.name().equals(step.getStatus())) {
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
if (step.getStartTime() == null) {
|
step.setStartTime(new Date());
|
}
|
taskStepDetailMapper.updateById(step);
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
}
|
log.debug("出片大车设备定时器执行成功: taskId={}, deviceId={}, glassCount={}",
|
task.getTaskId(), device.getId(), processedGlassIds.size());
|
// 清空已处理的玻璃ID列表(已处理)
|
clearProcessedGlassIds(context);
|
|
// feedGlass成功后,先检查MES任务(checkMesTask)来开始执行任务
|
DevicePlcVO.OperationResult mesTaskResult = null;
|
try {
|
mesTaskResult = handler.execute(device, "checkMesTask", Collections.emptyMap());
|
if (mesTaskResult != null && Boolean.TRUE.equals(mesTaskResult.getSuccess())) {
|
log.info("出片大车设备已检查MES任务并开始执行: taskId={}, deviceId={}, message={}",
|
task.getTaskId(), device.getId(), mesTaskResult.getMessage());
|
}
|
} catch (Exception e) {
|
log.warn("出片大车设备检查MES任务异常: taskId={}, deviceId={}, error={}",
|
task.getTaskId(), device.getId(), e.getMessage());
|
}
|
} else {
|
// 没有数据,保持WAITING状态和PENDING步骤状态
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.WAITING, context);
|
if (!TaskStepDetail.Status.PENDING.name().equals(step.getStatus())) {
|
step.setStatus(TaskStepDetail.Status.PENDING.name());
|
step.setSuccessMessage("等待中");
|
taskStepDetailMapper.updateById(step);
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
}
|
log.debug("出片大车设备定时器执行失败: taskId={}, deviceId={}, message={}",
|
task.getTaskId(), device.getId(), feedResult.getMessage());
|
}
|
|
// 第二步:检查MES确认状态(如果大车处理器支持的话)
|
// 只有在任务已开始执行(有任务记录)时才检查MES确认
|
DevicePlcVO.OperationResult mesResult = null;
|
try {
|
mesResult = handler.execute(device, "checkMesConfirm", Collections.emptyMap());
|
} catch (Exception e) {
|
log.warn("出片大车设备检查MES确认状态异常: taskId={}, deviceId={}, error={}",
|
task.getTaskId(), device.getId(), e.getMessage());
|
}
|
|
// 更新步骤状态(大车设备保持RUNNING,直到MES确认完成或任务取消)
|
if (mesResult != null) {
|
updateStepStatusForVehicle(step, mesResult);
|
boolean opSuccess = Boolean.TRUE.equals(mesResult.getSuccess());
|
updateTaskProgress(task, step.getStepOrder(), opSuccess);
|
if (!opSuccess) {
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
}
|
} else {
|
updateStepStatusForVehicle(step, feedResult);
|
boolean opSuccess = Boolean.TRUE.equals(feedResult.getSuccess());
|
updateTaskProgress(task, step.getStepOrder(), opSuccess);
|
if (!opSuccess) {
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
}
|
}
|
}
|
} catch (Exception e) {
|
log.error("出片大车设备定时器执行异常: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e);
|
}
|
}, 0, MONITOR_INTERVAL_MS, TimeUnit.MILLISECONDS);
|
|
// 在串行执行模式下,设备启动定时器时先设置为 WAITING,定时器第一次执行时再设置为 RUNNING
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.WAITING, context);
|
return future;
|
} catch (Exception e) {
|
log.error("启动出片大车设备定时器失败: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e);
|
return null;
|
}
|
}
|
|
/**
|
* 启动大理片笼设备定时器:逻辑处理(不涉及PLC交互,只负责逻辑处理,比如多久给任务汇报)
|
*/
|
private ScheduledFuture<?> startLargeGlassTimer(MultiDeviceTask task,
|
TaskStepDetail step,
|
DeviceConfig device,
|
TaskExecutionContext context) {
|
try {
|
// 从设备配置中获取处理时间(默认30秒)
|
Map<String, Object> logicParams = parseLogicParams(device);
|
Integer processTimeSeconds = getLogicParam(logicParams, "processTimeSeconds", 30);
|
final long PROCESS_TIME_MS = processTimeSeconds * 1000;
|
|
log.debug("启动大理片笼设备定时器: taskId={}, deviceId={}, processTime={}s",
|
task.getTaskId(), device.getId(), processTimeSeconds);
|
|
// 启动定时任务
|
ScheduledFuture<?> future = scheduledExecutor.scheduleWithFixedDelay(() -> {
|
try {
|
if (isTaskCancelled(context)) {
|
log.debug("任务已取消,停止大理片笼定时器: taskId={}, deviceId={}",
|
task.getTaskId(), device.getId());
|
return;
|
}
|
// 大理片笼设备:只有在真正开始处理时才设置为RUNNING
|
// 检查是否有已装载的玻璃信息(从进片大车来的)
|
List<String> loadedGlassIds = getLoadedGlassIds(context);
|
if (CollectionUtils.isEmpty(loadedGlassIds)) {
|
// 没有数据,保持WAITING状态和PENDING步骤状态
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.WAITING, context);
|
if (!TaskStepDetail.Status.PENDING.name().equals(step.getStatus())) {
|
step.setStatus(TaskStepDetail.Status.PENDING.name());
|
step.setSuccessMessage("等待进片大车装载玻璃");
|
taskStepDetailMapper.updateById(step);
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
}
|
log.debug("大理片笼设备定时器:暂无已装载的玻璃信息: taskId={}, deviceId={}",
|
task.getTaskId(), device.getId());
|
return;
|
}
|
|
// 有数据,设置为RUNNING
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.RUNNING, context);
|
|
// 检查玻璃是否已经处理完成(通过处理时间判断)
|
Long processStartTime = getProcessStartTime(context);
|
if (processStartTime == null) {
|
// 第一次检测到玻璃,记录开始处理时间
|
setProcessStartTime(context, System.currentTimeMillis());
|
log.debug("大理片笼设备开始处理: taskId={}, deviceId={}, glassCount={}, processTime={}s",
|
task.getTaskId(), device.getId(), loadedGlassIds.size(), processTimeSeconds);
|
return;
|
}
|
|
long elapsed = System.currentTimeMillis() - processStartTime;
|
if (elapsed < PROCESS_TIME_MS) {
|
// 处理时间未到,继续等待
|
log.debug("大理片笼设备处理中: taskId={}, deviceId={}, elapsed={}s, remaining={}s",
|
task.getTaskId(), device.getId(), elapsed / 1000, (PROCESS_TIME_MS - elapsed) / 1000);
|
return;
|
}
|
|
// 处理时间已到,完成任务汇报
|
log.debug("大理片笼设备处理完成: taskId={}, deviceId={}, glassCount={}, processTime={}s",
|
task.getTaskId(), device.getId(), loadedGlassIds.size(), processTimeSeconds);
|
|
// 将已处理的玻璃ID转移到已处理列表(供出片大车使用)
|
setProcessedGlassIds(context, new ArrayList<>(loadedGlassIds));
|
clearLoadedGlassIds(context);
|
clearProcessStartTime(context);
|
|
// 更新步骤状态
|
step.setStatus(TaskStepDetail.Status.COMPLETED.name());
|
step.setErrorMessage(null);
|
step.setOutputData(toJson(Collections.singletonMap("glassIds", loadedGlassIds)));
|
taskStepDetailMapper.updateById(step);
|
// 大理片笼完成后尝试自动收尾整个任务
|
checkAndCompleteTaskIfDone(step.getTaskId());
|
|
} catch (Exception e) {
|
log.error("大理片笼设备定时器执行异常: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e);
|
}
|
}, 0, 1_000, TimeUnit.MILLISECONDS); // 每秒检查一次
|
|
// 在串行执行模式下,设备启动定时器时先设置为 WAITING,定时器第一次执行时再设置为 RUNNING
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.WAITING, context);
|
return future;
|
} catch (Exception e) {
|
log.error("启动大理片笼设备定时器失败: taskId={}, deviceId={}", task.getTaskId(), device.getId(), e);
|
return null;
|
}
|
}
|
|
/**
|
* 获取逻辑参数
|
*/
|
@SuppressWarnings("unchecked")
|
private <T> T getLogicParam(Map<String, Object> logicParams, String key, T defaultValue) {
|
if (logicParams == null) {
|
return defaultValue;
|
}
|
Object value = logicParams.get(key);
|
if (value == null) {
|
return defaultValue;
|
}
|
try {
|
return (T) value;
|
} catch (ClassCastException e) {
|
return defaultValue;
|
}
|
}
|
|
/**
|
* 获取已装载的玻璃ID列表
|
*/
|
@SuppressWarnings("unchecked")
|
private List<String> getLoadedGlassIds(TaskExecutionContext context) {
|
if (context == null) {
|
return Collections.emptyList();
|
}
|
Object glassIds = context.getSharedData().get("loadedGlassIds");
|
if (glassIds instanceof List) {
|
return new ArrayList<>((List<String>) glassIds);
|
}
|
return Collections.emptyList();
|
}
|
|
/**
|
* 设置已装载的玻璃ID列表
|
*/
|
private void setLoadedGlassIds(TaskExecutionContext context, List<String> glassIds) {
|
if (context != null) {
|
context.getSharedData().put("loadedGlassIds", new ArrayList<>(glassIds));
|
}
|
}
|
|
/**
|
* 清空已装载的玻璃ID列表
|
*/
|
private void clearLoadedGlassIds(TaskExecutionContext context) {
|
if (context != null) {
|
context.getSharedData().put("loadedGlassIds", new ArrayList<>());
|
}
|
}
|
|
/**
|
* 获取已处理的玻璃ID列表
|
*/
|
@SuppressWarnings("unchecked")
|
private List<String> getProcessedGlassIds(TaskExecutionContext context) {
|
if (context == null) {
|
return Collections.emptyList();
|
}
|
Object glassIds = context.getSharedData().get("processedGlassIds");
|
if (glassIds instanceof List) {
|
return new ArrayList<>((List<String>) glassIds);
|
}
|
return Collections.emptyList();
|
}
|
|
/**
|
* 设置已处理的玻璃ID列表
|
*/
|
private void setProcessedGlassIds(TaskExecutionContext context, List<String> glassIds) {
|
if (context != null) {
|
context.getSharedData().put("processedGlassIds", new ArrayList<>(glassIds));
|
}
|
}
|
|
/**
|
* 清空已处理的玻璃ID列表
|
*/
|
private void clearProcessedGlassIds(TaskExecutionContext context) {
|
if (context != null) {
|
context.getSharedData().put("processedGlassIds", new ArrayList<>());
|
}
|
}
|
|
/**
|
* 获取处理开始时间
|
*/
|
private Long getProcessStartTime(TaskExecutionContext context) {
|
if (context == null) {
|
return null;
|
}
|
Object time = context.getSharedData().get("processStartTime");
|
if (time instanceof Number) {
|
return ((Number) time).longValue();
|
}
|
return null;
|
}
|
|
/**
|
* 设置处理开始时间
|
*/
|
private void setProcessStartTime(TaskExecutionContext context, long time) {
|
if (context != null) {
|
context.getSharedData().put("processStartTime", time);
|
}
|
}
|
|
/**
|
* 清空处理开始时间
|
*/
|
private void clearProcessStartTime(TaskExecutionContext context) {
|
if (context != null) {
|
context.getSharedData().remove("processStartTime");
|
}
|
}
|
|
/**
|
* 设置卧转立扫码暂停标志
|
*/
|
private void setScannerPause(TaskExecutionContext context, boolean pause) {
|
if (context != null) {
|
context.getSharedData().put("scannerPause", pause);
|
}
|
}
|
|
private boolean isTaskCancelled(TaskExecutionContext context) {
|
if (context == null) {
|
return false;
|
}
|
Object cancelled = context.getSharedData().get("taskCancelled");
|
return cancelled instanceof Boolean && (Boolean) cancelled;
|
}
|
|
/**
|
* 检查是否需要暂停卧转立扫码
|
*/
|
private boolean shouldPauseScanner(TaskExecutionContext context) {
|
if (context == null) {
|
return false;
|
}
|
Object pauseFlag = context.getSharedData().get("scannerPause");
|
return pauseFlag instanceof Boolean && (Boolean) pauseFlag;
|
}
|
|
/**
|
* 获取已扫描的玻璃ID列表
|
*/
|
@SuppressWarnings("unchecked")
|
private List<String> getScannedGlassIds(TaskExecutionContext context) {
|
if (context == null) {
|
return Collections.emptyList();
|
}
|
Object glassIds = context.getSharedData().get("scannedGlassIds");
|
if (glassIds instanceof List) {
|
return new ArrayList<>((List<String>) glassIds);
|
}
|
return Collections.emptyList();
|
}
|
|
/**
|
* 清空已扫描的玻璃ID列表
|
*/
|
private void clearScannedGlassIds(TaskExecutionContext context) {
|
if (context != null) {
|
context.getSharedData().put("scannedGlassIds", new ArrayList<>());
|
}
|
}
|
|
/**
|
* 获取卧转立主体已输出、准备上大车的玻璃ID列表
|
*/
|
@SuppressWarnings("unchecked")
|
private List<String> getTransferReadyGlassIds(TaskExecutionContext context) {
|
if (context == null) {
|
return Collections.emptyList();
|
}
|
Object glassIds = context.getSharedData().get("transferReadyGlassIds");
|
if (glassIds instanceof List) {
|
return new ArrayList<>((List<String>) glassIds);
|
}
|
return Collections.emptyList();
|
}
|
|
/**
|
* 清空卧转立主体已输出的玻璃ID列表
|
*/
|
private void clearTransferReadyGlassIds(TaskExecutionContext context) {
|
if (context != null) {
|
context.getSharedData().put("transferReadyGlassIds", new ArrayList<>());
|
}
|
}
|
|
/**
|
* 注册定时器任务
|
*/
|
private void registerScheduledTask(String taskId, ScheduledFuture<?> future) {
|
taskScheduledTasks.computeIfAbsent(taskId, k -> new ArrayList<>()).add(future);
|
}
|
|
/**
|
* 停止所有定时器任务
|
*/
|
private void stopScheduledTasks(String taskId) {
|
List<ScheduledFuture<?>> futures = taskScheduledTasks.remove(taskId);
|
if (futures != null) {
|
for (ScheduledFuture<?> future : futures) {
|
if (future != null && !future.isCancelled()) {
|
future.cancel(false);
|
}
|
}
|
log.debug("已停止任务的所有定时器: taskId={}, count={}", taskId, futures.size());
|
}
|
runningTaskContexts.remove(taskId);
|
}
|
|
/**
|
* 等待定时器任务完成(带超时)
|
*/
|
private void waitForScheduledTasks(String taskId, TaskExecutionContext context) {
|
// 获取任务超时时间(默认30分钟)
|
TaskParameters params = context.getParameters();
|
long timeoutMinutes = params != null && params.getTimeoutMinutes() != null
|
? params.getTimeoutMinutes() : 30;
|
long timeoutMs = timeoutMinutes * 60 * 1000;
|
long deadline = System.currentTimeMillis() + timeoutMs;
|
|
log.debug("等待定时器任务完成: taskId={}, timeout={}分钟", taskId, timeoutMinutes);
|
|
while (System.currentTimeMillis() < deadline) {
|
List<ScheduledFuture<?>> futures = taskScheduledTasks.get(taskId);
|
if (futures == null || futures.isEmpty()) {
|
break;
|
}
|
|
// 检查是否所有任务都已完成
|
boolean allDone = true;
|
for (ScheduledFuture<?> future : futures) {
|
if (future != null && !future.isDone()) {
|
allDone = false;
|
break;
|
}
|
}
|
|
if (allDone) {
|
break;
|
}
|
|
try {
|
Thread.sleep(1000);
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
break;
|
}
|
}
|
|
log.debug("定时器任务等待完成: taskId={}", taskId);
|
}
|
|
/**
|
* 当某个步骤可能完成时,检查任务是否所有步骤都已完成,如果是则自动将任务标记为已完成
|
*/
|
private void checkAndCompleteTaskIfDone(String taskId) {
|
if (taskId == null) {
|
return;
|
}
|
try {
|
MultiDeviceTask task = multiDeviceTaskMapper.selectOne(
|
Wrappers.<MultiDeviceTask>lambdaQuery()
|
.eq(MultiDeviceTask::getTaskId, taskId)
|
);
|
if (task == null) {
|
return;
|
}
|
// 仅在任务仍为RUNNING时才尝试自动收尾
|
if (!MultiDeviceTask.Status.RUNNING.name().equals(task.getStatus())) {
|
return;
|
}
|
|
int totalSteps = task.getTotalSteps() != null ? task.getTotalSteps() : 0;
|
if (totalSteps <= 0) {
|
return;
|
}
|
|
int completedSteps = countCompletedSteps(taskId);
|
if (completedSteps < totalSteps) {
|
return;
|
}
|
|
// 所有步骤都已完成,收尾任务
|
task.setStatus(MultiDeviceTask.Status.COMPLETED.name());
|
task.setEndTime(new Date());
|
multiDeviceTaskMapper.updateById(task);
|
|
// 停止所有定时器
|
stopScheduledTasks(taskId);
|
|
// 通知任务完成
|
notificationService.notifyTaskStatus(task);
|
|
log.info("所有步骤已完成,自动将任务标记为已完成: taskId={}, totalSteps={}", taskId, totalSteps);
|
} catch (Exception e) {
|
log.warn("检查并自动完成任务失败: taskId={}", taskId, e);
|
}
|
}
|
|
/**
|
* 更新步骤状态
|
*/
|
private void updateStepStatus(TaskStepDetail step, DevicePlcVO.OperationResult result) {
|
if (step == null || result == null) {
|
return;
|
}
|
boolean success = Boolean.TRUE.equals(result.getSuccess());
|
step.setStatus(success
|
? TaskStepDetail.Status.COMPLETED.name()
|
: TaskStepDetail.Status.FAILED.name());
|
// 设置消息:成功时如果有消息也保存,失败时保存错误消息
|
String message = result.getMessage();
|
if (success) {
|
// 成功时,如果有消息则保存(用于提示信息),否则清空
|
step.setSuccessMessage(StringUtils.hasText(message) ? message : null);
|
// 如果状态变为完成,设置结束时间
|
if (TaskStepDetail.Status.COMPLETED.name().equals(step.getStatus()) && step.getEndTime() == null) {
|
step.setEndTime(new Date());
|
}
|
} else {
|
// 失败时保存错误消息
|
step.setErrorMessage(message);
|
// 如果状态变为失败,设置结束时间
|
if (TaskStepDetail.Status.FAILED.name().equals(step.getStatus()) && step.getEndTime() == null) {
|
step.setEndTime(new Date());
|
}
|
}
|
step.setOutputData(toJson(result));
|
taskStepDetailMapper.updateById(step);
|
}
|
|
/**
|
* 确保步骤进入RUNNING状态(仅在第一次真正执行前调用)
|
*/
|
private void ensureStepRunning(TaskStepDetail step, String taskId) {
|
if (step == null) {
|
return;
|
}
|
if (!TaskStepDetail.Status.RUNNING.name().equals(step.getStatus())) {
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
if (step.getStartTime() == null) {
|
step.setStartTime(new Date());
|
}
|
taskStepDetailMapper.updateById(step);
|
notificationService.notifyStepUpdate(taskId, step);
|
}
|
}
|
|
/**
|
* 更新扫码设备步骤状态(显示进度,保持RUNNING状态直到所有玻璃处理完成)
|
*/
|
private void updateStepStatusForScanner(TaskStepDetail step, DevicePlcVO.OperationResult result,
|
int currentIndex, int totalCount,
|
int successCount, int failCount) {
|
if (step == null || result == null) {
|
return;
|
}
|
|
boolean success = Boolean.TRUE.equals(result.getSuccess());
|
|
// 保持RUNNING状态,直到所有玻璃处理完成(在定时器完成时再设置为COMPLETED)
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
|
// 更新时间和耗时,前端可以实时看到执行耗时
|
Date now = new Date();
|
if (step.getStartTime() == null) {
|
step.setStartTime(now);
|
}
|
if (step.getStartTime() != null) {
|
step.setDurationMs(now.getTime() - step.getStartTime().getTime());
|
}
|
|
// 更新进度信息
|
String progressMessage = String.format("正在处理 %d/%d (成功:%d, 失败:%d)",
|
currentIndex, totalCount, successCount, failCount);
|
|
if (success) {
|
// 成功时显示进度和成功消息
|
String resultMessage = result.getMessage();
|
if (StringUtils.hasText(resultMessage)) {
|
step.setSuccessMessage(progressMessage + " - " + resultMessage);
|
} else {
|
step.setSuccessMessage(progressMessage);
|
}
|
step.setErrorMessage(null);
|
} else {
|
// 失败时显示进度和错误消息
|
String errorMessage = result.getMessage();
|
step.setErrorMessage(progressMessage + " - " + (StringUtils.hasText(errorMessage) ? errorMessage : "处理失败"));
|
step.setSuccessMessage(null);
|
}
|
|
step.setOutputData(toJson(result));
|
taskStepDetailMapper.updateById(step);
|
}
|
|
/**
|
* 更新大车设备步骤状态(保持RUNNING,直到手动停止或任务取消;失败时标记为FAILED)
|
*/
|
private void updateStepStatusForVehicle(TaskStepDetail step, DevicePlcVO.OperationResult result) {
|
if (step == null || result == null) {
|
return;
|
}
|
boolean success = Boolean.TRUE.equals(result.getSuccess());
|
boolean completed = false;
|
if (result.getData() != null && result.getData().get("completed") != null) {
|
Object flag = result.getData().get("completed");
|
if (flag instanceof Boolean) {
|
completed = (Boolean) flag;
|
} else {
|
completed = "true".equalsIgnoreCase(String.valueOf(flag));
|
}
|
}
|
Date now = new Date();
|
|
// 初始化开始时间
|
if (step.getStartTime() == null) {
|
step.setStartTime(now);
|
}
|
|
boolean waiting = false;
|
String waitingReason = null;
|
if (result.getData() != null) {
|
Object waitingFlag = result.getData().get("waiting");
|
if (waitingFlag instanceof Boolean) {
|
waiting = (Boolean) waitingFlag;
|
} else if (waitingFlag != null) {
|
waiting = "true".equalsIgnoreCase(String.valueOf(waitingFlag));
|
}
|
Object reason = result.getData().get("waitingReason");
|
if (reason != null) {
|
waitingReason = String.valueOf(reason);
|
}
|
}
|
|
if (success && !completed) {
|
// 成功但未完成:根据waiting状态决定显示为等待还是执行中
|
if (waiting) {
|
step.setStatus(TaskStepDetail.Status.PENDING.name());
|
} else {
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
}
|
String message = result.getMessage();
|
if (!StringUtils.hasText(message) && waiting) {
|
message = "大车设备等待中" + (StringUtils.hasText(waitingReason) ? "(" + waitingReason + ")" : "");
|
}
|
step.setSuccessMessage(StringUtils.hasText(message) ? message : (waiting ? "大车设备等待中" : "大车设备运行中"));
|
step.setErrorMessage(null);
|
if (step.getStartTime() != null) {
|
step.setDurationMs(now.getTime() - step.getStartTime().getTime());
|
}
|
} else if (success && completed) {
|
// 成功且MES已确认完成:标记为COMPLETED并记录结束时间
|
step.setStatus(TaskStepDetail.Status.COMPLETED.name());
|
String message = result.getMessage();
|
step.setSuccessMessage(StringUtils.hasText(message) ? message : "大车设备任务已完成");
|
step.setErrorMessage(null);
|
if (step.getEndTime() == null) {
|
step.setEndTime(now);
|
}
|
if (step.getStartTime() != null && step.getEndTime() != null) {
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
}
|
// 尝试自动收尾整个任务
|
checkAndCompleteTaskIfDone(step.getTaskId());
|
} else {
|
// 失败:标记为FAILED并记录结束时间
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
String message = result.getMessage();
|
step.setErrorMessage(message);
|
if (step.getEndTime() == null) {
|
step.setEndTime(now);
|
}
|
if (step.getStartTime() != null && step.getEndTime() != null) {
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
}
|
}
|
|
step.setOutputData(toJson(result));
|
taskStepDetailMapper.updateById(step);
|
}
|
|
/**
|
* 轮询大车设备的MES任务和确认状态
|
* 无论步骤当前是否为RUNNING/PENDING,都可以调用,用于避免错过MES端后写入的任务
|
*/
|
private void pollMesForVehicle(MultiDeviceTask task,
|
TaskStepDetail step,
|
DeviceConfig device,
|
TaskExecutionContext context) {
|
try {
|
DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType());
|
if (handler == null) {
|
return;
|
}
|
|
Map<String, Object> logicParams = parseLogicParams(device);
|
|
// 先检查MES任务(如果mesSend=1,会创建任务并开始执行)
|
DevicePlcVO.OperationResult mesTaskResult = null;
|
try {
|
mesTaskResult = handler.execute(device, "checkMesTask", Collections.emptyMap());
|
if (mesTaskResult != null && Boolean.TRUE.equals(mesTaskResult.getSuccess())) {
|
log.info("大车设备已检查MES任务并开始执行: taskId={}, deviceId={}, message={}",
|
task.getTaskId(), device.getId(), mesTaskResult.getMessage());
|
}
|
} catch (Exception e) {
|
log.warn("大车设备检查MES任务异常: taskId={}, deviceId={}, error={}",
|
task.getTaskId(), device.getId(), e.getMessage());
|
}
|
|
// 然后检查MES确认状态
|
DevicePlcVO.OperationResult mesResult = null;
|
try {
|
mesResult = handler.execute(device, "checkMesConfirm", Collections.emptyMap());
|
} catch (Exception e) {
|
log.warn("大车设备检查MES确认状态异常: taskId={}, deviceId={}, error={}",
|
task.getTaskId(), device.getId(), e.getMessage());
|
}
|
|
// 更新步骤状态
|
if (mesResult != null) {
|
updateStepStatusForVehicle(step, mesResult);
|
boolean opSuccess = Boolean.TRUE.equals(mesResult.getSuccess());
|
updateTaskProgress(task, step.getStepOrder(), opSuccess);
|
if (!opSuccess) {
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
}
|
}
|
} catch (Exception e) {
|
log.warn("轮询大车设备MES任务/确认状态异常: taskId={}, deviceId={}, error={}",
|
task.getTaskId(), device.getId(), e.getMessage());
|
}
|
}
|
|
/**
|
* 更新卧转立设备步骤状态(区分等待中和真正完成)
|
*/
|
private void updateStepStatusForTransfer(TaskStepDetail step, DevicePlcVO.OperationResult result,
|
DeviceConfig device, TaskExecutionContext context) {
|
if (step == null || result == null) {
|
return;
|
}
|
boolean success = Boolean.TRUE.equals(result.getSuccess());
|
String message = result.getMessage();
|
|
// 判断是否真正完成:
|
// 1. 写入PLC成功且缓冲已清空(表示所有玻璃已处理完,无新玻璃)
|
// 注意:缓冲队列为空且无待处理玻璃,在任务刚开始时也可能出现,不应该立即标记为完成
|
// 只有当任务已经运行一段时间,且确实没有玻璃需要处理时,才标记为完成
|
boolean isRealCompleted = success && message != null && (
|
(message.contains("批次已写入PLC") && message.contains("缓冲已清空,任务完成"))
|
);
|
|
if (isRealCompleted) {
|
// 真正完成:设置为完成状态,并设置结束时间
|
// 注意:一旦标记为完成,状态不应该再被改变
|
if (!TaskStepDetail.Status.COMPLETED.name().equals(step.getStatus())) {
|
step.setStatus(TaskStepDetail.Status.COMPLETED.name());
|
step.setSuccessMessage(message);
|
if (step.getEndTime() == null) {
|
step.setEndTime(new Date());
|
}
|
// 计算耗时
|
if (step.getStartTime() != null && step.getEndTime() != null) {
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
}
|
log.info("卧转立设备步骤已完成: stepId={}, durationMs={}, message={}",
|
step.getId(), step.getDurationMs(), message);
|
// 卧转立主体完成后尝试自动收尾整个任务
|
checkAndCompleteTaskIfDone(step.getTaskId());
|
// 更新设备状态为已完成
|
if (device != null) {
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.COMPLETED, context);
|
}
|
}
|
} else if (success && message != null && message.contains("批次已写入PLC")) {
|
// 写入PLC成功但缓冲还有玻璃(车满情况),继续运行
|
if (!TaskStepDetail.Status.RUNNING.name().equals(step.getStatus())) {
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
}
|
step.setSuccessMessage(message);
|
// 确保开始时间已设置
|
if (step.getStartTime() == null) {
|
step.setStartTime(new Date());
|
}
|
} else if (success) {
|
// 设备正在工作(等待缓冲、等待更多玻璃等),保持RUNNING状态
|
// 因为定时器已经执行,说明设备已经开始工作了
|
if (!TaskStepDetail.Status.RUNNING.name().equals(step.getStatus())) {
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
}
|
step.setSuccessMessage(message);
|
// 确保开始时间已设置(设备已经开始工作)
|
if (step.getStartTime() == null) {
|
step.setStartTime(new Date());
|
}
|
} else {
|
// 失败:设置为失败状态,并设置结束时间
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(message);
|
if (step.getEndTime() == null) {
|
step.setEndTime(new Date());
|
}
|
// 计算耗时
|
if (step.getStartTime() != null && step.getEndTime() != null) {
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
}
|
}
|
|
step.setOutputData(toJson(result));
|
taskStepDetailMapper.updateById(step);
|
}
|
|
/**
|
* 创建步骤摘要
|
*/
|
private Map<String, Object> createStepSummary(String deviceName, boolean success, String message) {
|
Map<String, Object> summary = new HashMap<>();
|
summary.put("deviceName", deviceName);
|
summary.put("success", success);
|
summary.put("message", message);
|
return summary;
|
}
|
|
/**
|
* 解析设备逻辑参数
|
*/
|
@SuppressWarnings("unchecked")
|
private Map<String, Object> parseLogicParams(DeviceConfig device) {
|
String extraParams = device.getExtraParams();
|
if (!StringUtils.hasText(extraParams)) {
|
return Collections.emptyMap();
|
}
|
try {
|
Map<String, Object> extraParamsMap = objectMapper.readValue(extraParams, MAP_TYPE);
|
Object deviceLogic = extraParamsMap.get("deviceLogic");
|
if (deviceLogic instanceof Map) {
|
return (Map<String, Object>) deviceLogic;
|
}
|
return Collections.emptyMap();
|
} catch (Exception e) {
|
log.warn("解析设备逻辑参数失败: deviceId={}", device.getId(), e);
|
return Collections.emptyMap();
|
}
|
}
|
|
/**
|
* 并行执行多个设备操作
|
*/
|
private Pair<Boolean, String> executeParallel(MultiDeviceTask task,
|
List<DeviceConfig> devices,
|
TaskExecutionContext context,
|
List<Map<String, Object>> stepSummaries,
|
Integer maxConcurrent) {
|
int concurrency = maxConcurrent != null && maxConcurrent > 0
|
? Math.min(maxConcurrent, devices.size())
|
: devices.size();
|
|
// 创建所有步骤记录
|
List<TaskStepDetail> steps = new ArrayList<>();
|
for (int i = 0; i < devices.size(); i++) {
|
DeviceConfig device = devices.get(i);
|
int order = i + 1;
|
TaskStepDetail step = createStepRecord(task, device, order);
|
steps.add(step);
|
}
|
|
// 使用信号量控制并发数
|
Semaphore semaphore = new Semaphore(concurrency);
|
List<CompletableFuture<StepResult>> futures = new ArrayList<>();
|
|
for (int i = 0; i < devices.size(); i++) {
|
final int index = i;
|
final DeviceConfig device = devices.get(index);
|
final TaskStepDetail step = steps.get(index);
|
|
CompletableFuture<StepResult> future = CompletableFuture.supplyAsync(() -> {
|
try {
|
semaphore.acquire();
|
try {
|
return executeStep(task, step, device, context);
|
} finally {
|
semaphore.release();
|
}
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
log.error("并行执行被中断, deviceId={}", device.getId(), e);
|
return StepResult.failure(device.getDeviceName(), "执行被中断");
|
} catch (Exception e) {
|
log.error("并行执行异常, deviceId={}", device.getId(), e);
|
return StepResult.failure(device.getDeviceName(), e.getMessage());
|
}
|
}, executorService);
|
|
final int finalIndex = index;
|
future.whenComplete((result, throwable) -> {
|
if (throwable != null) {
|
log.error("并行执行完成时异常, deviceId={}", device.getId(), throwable);
|
stepSummaries.set(finalIndex, StepResult.failure(device.getDeviceName(),
|
throwable.getMessage()).toSummary());
|
} else if (result != null) {
|
stepSummaries.set(finalIndex, result.toSummary());
|
}
|
});
|
|
futures.add(future);
|
}
|
|
// 等待所有任务完成
|
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
|
futures.toArray(new CompletableFuture[0])
|
);
|
|
try {
|
allFutures.get(30, TimeUnit.MINUTES); // 最多等待30分钟
|
} catch (TimeoutException e) {
|
log.error("并行执行超时, taskId={}", task.getTaskId(), e);
|
return Pair.of(false, "任务执行超时");
|
} catch (Exception e) {
|
log.error("等待并行执行完成时异常, taskId={}", task.getTaskId(), e);
|
return Pair.of(false, "等待执行完成时发生异常: " + e.getMessage());
|
}
|
|
// 检查所有步骤的执行结果
|
boolean allSuccess = true;
|
String firstFailureMessage = null;
|
for (int i = 0; i < futures.size(); i++) {
|
try {
|
StepResult result = futures.get(i).get();
|
if (result != null && !result.isSuccess()) {
|
allSuccess = false;
|
if (firstFailureMessage == null) {
|
firstFailureMessage = result.getMessage();
|
}
|
}
|
} catch (Exception e) {
|
log.error("获取步骤执行结果异常, stepIndex={}", i, e);
|
allSuccess = false;
|
if (firstFailureMessage == null) {
|
firstFailureMessage = "获取执行结果异常: " + e.getMessage();
|
}
|
}
|
}
|
|
return Pair.of(allSuccess, firstFailureMessage);
|
}
|
|
/**
|
* 确定执行模式
|
*/
|
private String determineExecutionMode(DeviceGroupConfig groupConfig) {
|
if (groupConfig == null) {
|
return EXECUTION_MODE_SERIAL; // 默认串行
|
}
|
|
// 从extraConfig中读取executionMode
|
String extraConfig = groupConfig.getExtraConfig();
|
if (StringUtils.hasText(extraConfig)) {
|
try {
|
Map<String, Object> config = objectMapper.readValue(extraConfig, MAP_TYPE);
|
Object mode = config.get("executionMode");
|
if (mode != null) {
|
String modeStr = String.valueOf(mode).toUpperCase();
|
if (EXECUTION_MODE_PARALLEL.equals(modeStr) || EXECUTION_MODE_SERIAL.equals(modeStr)) {
|
return modeStr;
|
}
|
}
|
} catch (Exception e) {
|
log.warn("解析设备组执行模式失败, groupId={}", groupConfig.getId(), e);
|
}
|
}
|
|
// 如果有maxConcurrentDevices且大于1,默认使用并行模式
|
if (groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 1) {
|
return EXECUTION_MODE_PARALLEL;
|
}
|
|
return EXECUTION_MODE_SERIAL; // 默认串行
|
}
|
|
/**
|
* 获取最大并发设备数
|
*/
|
private Integer getMaxConcurrentDevices(DeviceGroupConfig groupConfig) {
|
if (groupConfig == null) {
|
return 1;
|
}
|
|
// 从extraConfig中读取maxConcurrent
|
String extraConfig = groupConfig.getExtraConfig();
|
if (StringUtils.hasText(extraConfig)) {
|
try {
|
Map<String, Object> config = objectMapper.readValue(extraConfig, MAP_TYPE);
|
Object maxConcurrent = config.get("maxConcurrent");
|
if (maxConcurrent instanceof Number) {
|
return ((Number) maxConcurrent).intValue();
|
}
|
} catch (Exception e) {
|
log.warn("解析设备组最大并发数失败, groupId={}", groupConfig.getId(), e);
|
}
|
}
|
|
// 使用实体字段
|
return groupConfig.getMaxConcurrentDevices() != null && groupConfig.getMaxConcurrentDevices() > 0
|
? groupConfig.getMaxConcurrentDevices()
|
: 1;
|
}
|
|
/**
|
* 简单的Pair类用于返回两个值
|
*/
|
private static class Pair<T, U> {
|
private final T first;
|
private final U second;
|
|
private Pair(T first, U second) {
|
this.first = first;
|
this.second = second;
|
}
|
|
public static <T, U> Pair<T, U> of(T first, U second) {
|
return new Pair<>(first, second);
|
}
|
|
public T getFirst() {
|
return first;
|
}
|
|
public U getSecond() {
|
return second;
|
}
|
}
|
|
/**
|
* 分批执行大车设备玻璃上料(当玻璃ID数量超过6个时)
|
*/
|
private StepResult executeLoadVehicleWithBatches(MultiDeviceTask task,
|
DeviceConfig device,
|
int order,
|
TaskExecutionContext context,
|
List<Map<String, Object>> stepSummaries) {
|
List<String> allGlassIds = context.getParameters().getGlassIds();
|
int batchSize = 6; // 每批最多6个玻璃ID
|
|
// 分批处理
|
int totalBatches = (allGlassIds.size() + batchSize - 1) / batchSize;
|
log.debug("大车设备分批上料: deviceId={}, totalGlassIds={}, batchSize={}, totalBatches={}",
|
device.getId(), allGlassIds.size(), batchSize, totalBatches);
|
|
for (int batchIndex = 0; batchIndex < totalBatches; batchIndex++) {
|
int startIndex = batchIndex * batchSize;
|
int endIndex = Math.min(startIndex + batchSize, allGlassIds.size());
|
List<String> batchGlassIds = allGlassIds.subList(startIndex, endIndex);
|
|
// 创建临时参数,只包含当前批次的玻璃ID
|
TaskParameters batchParams = new TaskParameters();
|
batchParams.setGlassIds(new ArrayList<>(batchGlassIds));
|
batchParams.setPositionCode(context.getParameters().getPositionCode());
|
batchParams.setPositionValue(context.getParameters().getPositionValue());
|
|
// 创建临时上下文
|
TaskExecutionContext batchContext = new TaskExecutionContext(batchParams);
|
|
// 创建步骤记录
|
TaskStepDetail step = createStepRecord(task, device, order);
|
step.setStepName(step.getStepName() + String.format(" (批次 %d/%d)", batchIndex + 1, totalBatches));
|
|
// 执行当前批次
|
StepResult stepResult = executeStep(task, step, device, batchContext);
|
stepSummaries.add(stepResult.toSummary());
|
|
if (!stepResult.isSuccess()) {
|
log.error("大车设备分批上料失败: deviceId={}, batchIndex={}/{}, error={}",
|
device.getId(), batchIndex + 1, totalBatches, stepResult.getMessage());
|
return stepResult;
|
}
|
|
log.debug("大车设备分批上料成功: deviceId={}, batchIndex={}/{}, glassIds={}",
|
device.getId(), batchIndex + 1, totalBatches, batchGlassIds);
|
}
|
|
// 更新上下文中的已加载玻璃ID
|
context.setLoadedGlassIds(new ArrayList<>(allGlassIds));
|
|
return StepResult.success(device.getDeviceName(), "分批上料完成,共" + totalBatches + "批");
|
}
|
|
private TaskStepDetail createStepRecord(MultiDeviceTask task, DeviceConfig device, int order) {
|
TaskStepDetail step = new TaskStepDetail();
|
step.setTaskId(task.getTaskId());
|
step.setStepOrder(order);
|
step.setDeviceId(String.valueOf(device.getId()));
|
step.setStepName(device.getDeviceName());
|
step.setStatus(TaskStepDetail.Status.PENDING.name());
|
step.setRetryCount(0);
|
taskStepDetailMapper.insert(step);
|
return step;
|
}
|
|
private StepResult executeStep(MultiDeviceTask task,
|
TaskStepDetail step,
|
DeviceConfig device,
|
TaskExecutionContext context) {
|
DeviceCoordinationService.DependencyCheckResult dependencyResult =
|
deviceCoordinationService.checkDependencies(device, context);
|
if (!dependencyResult.isSatisfied()) {
|
log.warn("设备依赖未满足: deviceId={}, message={}", device.getId(), dependencyResult.getMessage());
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(dependencyResult.getMessage());
|
step.setStartTime(new Date());
|
step.setEndTime(new Date());
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
return StepResult.failure(device.getDeviceName(), dependencyResult.getMessage());
|
}
|
return executeStepWithRetry(task, step, device, context, getRetryPolicy(device));
|
}
|
|
/**
|
* 带重试的步骤执行
|
*/
|
private StepResult executeStepWithRetry(MultiDeviceTask task,
|
TaskStepDetail step,
|
DeviceConfig device,
|
TaskExecutionContext context,
|
RetryPolicy retryPolicy) {
|
Date startTime = new Date();
|
step.setStartTime(startTime);
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
step.setRetryCount(0);
|
|
DeviceInteraction deviceInteraction = interactionRegistry.getInteraction(device.getDeviceType());
|
if (deviceInteraction != null) {
|
return executeInteractionStepWithRetry(task, step, device, context, deviceInteraction, retryPolicy);
|
}
|
|
Map<String, Object> params = buildOperationParams(device, context);
|
// 将context引用放入params,供设备处理器使用(用于设备协调)
|
params.put("_taskContext", context);
|
log.debug("executeStepWithRetry构建参数: deviceId={}, deviceType={}, operation={}, paramsKeys={}, params={}",
|
device.getId(), device.getDeviceType(), determineOperation(device, params), params.keySet(), params);
|
step.setInputData(toJson(params));
|
taskStepDetailMapper.updateById(step);
|
|
String operation = determineOperation(device, params);
|
DeviceLogicHandler handler = handlerFactory.getHandler(device.getDeviceType());
|
|
int retryAttempt = 0;
|
Exception lastException = null;
|
|
while (retryAttempt <= retryPolicy.getMaxRetryCount()) {
|
try {
|
if (retryAttempt > 0) {
|
// 重试前等待
|
long waitTime = retryPolicy.calculateRetryInterval(retryAttempt);
|
log.debug("步骤执行重试: deviceId={}, operation={}, retryAttempt={}/{}, waitTime={}ms",
|
device.getId(), operation, retryAttempt, retryPolicy.getMaxRetryCount(), waitTime);
|
Thread.sleep(waitTime);
|
|
// 更新步骤状态
|
step.setRetryCount(retryAttempt);
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
step.setStartTime(new Date());
|
taskStepDetailMapper.updateById(step);
|
}
|
|
DevicePlcVO.OperationResult result;
|
if (handler == null) {
|
result = deviceInteractionService.executeOperation(device.getId(), operation, params);
|
} else {
|
result = handler.execute(device, operation, params);
|
}
|
|
boolean opSuccess = Boolean.TRUE.equals(result.getSuccess());
|
updateStepAfterOperation(step, result, opSuccess);
|
updateTaskProgress(task, step.getStepOrder(), opSuccess);
|
|
// 通知步骤更新
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
|
if (opSuccess) {
|
updateContextAfterSuccess(device, context, params, result);
|
|
// 同步设备状态
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.COMPLETED, context);
|
|
return StepResult.success(device.getDeviceName(), result.getMessage());
|
} else {
|
// 业务失败,判断是否可重试
|
if (retryAttempt < retryPolicy.getMaxRetryCount() && isRetryableFailure(result)) {
|
retryAttempt++;
|
lastException = new RuntimeException(result.getMessage());
|
log.warn("步骤执行失败,准备重试: deviceId={}, operation={}, retryAttempt={}, message={}",
|
device.getId(), operation, retryAttempt, result.getMessage());
|
continue;
|
}
|
|
// 同步失败状态
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
|
return StepResult.failure(device.getDeviceName(), result.getMessage());
|
}
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
log.error("步骤执行被中断, deviceId={}, operation={}", device.getId(), operation, e);
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage("执行被中断: " + e.getMessage());
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
step.setRetryCount(retryAttempt);
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
return StepResult.failure(device.getDeviceName(), "执行被中断");
|
} catch (Exception e) {
|
lastException = e;
|
log.error("设备操作异常, deviceId={}, operation={}, retryAttempt={}",
|
device.getId(), operation, retryAttempt, e);
|
|
// 判断是否可重试
|
if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) {
|
retryAttempt++;
|
log.warn("步骤执行异常,准备重试: deviceId={}, operation={}, retryAttempt={}, exception={}",
|
device.getId(), operation, retryAttempt, e.getClass().getSimpleName());
|
continue;
|
}
|
|
// 不可重试或达到最大重试次数
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(e.getMessage());
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
step.setRetryCount(retryAttempt);
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
|
// 通知步骤更新
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
|
// 同步失败状态
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
|
String errorMsg = retryAttempt > 0
|
? String.format("执行失败(已重试%d次): %s", retryAttempt, e.getMessage())
|
: e.getMessage();
|
return StepResult.failure(device.getDeviceName(), errorMsg);
|
}
|
}
|
|
// 达到最大重试次数
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(lastException != null ? lastException.getMessage() : "执行失败");
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
step.setRetryCount(retryAttempt);
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
|
// 通知步骤更新
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
|
return StepResult.failure(device.getDeviceName(),
|
String.format("执行失败(已重试%d次)", retryAttempt));
|
}
|
|
/**
|
* 执行一次简单的设备操作步骤(不走交互引擎),用于触发请求等场景
|
*/
|
private StepResult executeDirectOperationStep(MultiDeviceTask task,
|
TaskStepDetail step,
|
DeviceConfig device,
|
TaskExecutionContext context,
|
String operation,
|
Map<String, Object> params) {
|
Date startTime = new Date();
|
step.setStartTime(startTime);
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
step.setRetryCount(0);
|
step.setInputData(toJson(params));
|
taskStepDetailMapper.updateById(step);
|
|
try {
|
DeviceCoordinationService.DependencyCheckResult dependencyResult =
|
deviceCoordinationService.checkDependencies(device, context);
|
if (!dependencyResult.isSatisfied()) {
|
log.warn("直接操作依赖未满足: deviceId={}, message={}", device.getId(), dependencyResult.getMessage());
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(dependencyResult.getMessage());
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
return StepResult.failure(device.getDeviceName(), dependencyResult.getMessage());
|
}
|
|
DevicePlcVO.OperationResult result = deviceInteractionService.executeOperation(
|
device.getId(), operation, params);
|
|
boolean opSuccess = Boolean.TRUE.equals(result.getSuccess());
|
updateStepAfterOperation(step, result, opSuccess);
|
updateTaskProgress(task, step.getStepOrder(), opSuccess);
|
|
if (opSuccess) {
|
updateContextAfterSuccess(device, context, params, result);
|
// 简单同步设备状态为已完成
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.COMPLETED, context);
|
return StepResult.success(device.getDeviceName(), result.getMessage());
|
} else {
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
return StepResult.failure(device.getDeviceName(), result.getMessage());
|
}
|
} catch (Exception e) {
|
log.error("直接设备操作异常, deviceId={}, operation={}", device.getId(), operation, e);
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(e.getMessage());
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
return StepResult.failure(device.getDeviceName(), e.getMessage());
|
}
|
}
|
|
/**
|
* 带重试的交互步骤执行
|
*/
|
private StepResult executeInteractionStepWithRetry(MultiDeviceTask task,
|
TaskStepDetail step,
|
DeviceConfig device,
|
TaskExecutionContext context,
|
DeviceInteraction deviceInteraction,
|
RetryPolicy retryPolicy) {
|
int retryAttempt = 0;
|
Exception lastException = null;
|
|
while (retryAttempt <= retryPolicy.getMaxRetryCount()) {
|
try {
|
if (retryAttempt > 0) {
|
long waitTime = retryPolicy.calculateRetryInterval(retryAttempt);
|
log.debug("交互步骤执行重试: deviceId={}, retryAttempt={}/{}, waitTime={}ms",
|
device.getId(), retryAttempt, retryPolicy.getMaxRetryCount(), waitTime);
|
Thread.sleep(waitTime);
|
|
step.setRetryCount(retryAttempt);
|
step.setStatus(TaskStepDetail.Status.RUNNING.name());
|
step.setStartTime(new Date());
|
taskStepDetailMapper.updateById(step);
|
}
|
|
InteractionContext interactionContext = new InteractionContext(device, context);
|
step.setInputData(toJson(context.getParameters()));
|
InteractionResult interactionResult = deviceInteraction.execute(interactionContext);
|
boolean success = interactionResult != null && interactionResult.isSuccess();
|
updateStepAfterInteraction(step, interactionResult);
|
updateTaskProgress(task, step.getStepOrder(), success);
|
|
if (success) {
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.COMPLETED, context);
|
return StepResult.success(device.getDeviceName(), interactionResult.getMessage());
|
} else {
|
if (retryAttempt < retryPolicy.getMaxRetryCount()) {
|
retryAttempt++;
|
continue;
|
}
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
String message = interactionResult != null ? interactionResult.getMessage() : "交互执行失败";
|
return StepResult.failure(device.getDeviceName(), message);
|
}
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
log.error("交互步骤执行被中断, deviceId={}", device.getId(), e);
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage("执行被中断: " + e.getMessage());
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
step.setRetryCount(retryAttempt);
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
return StepResult.failure(device.getDeviceName(), "执行被中断");
|
} catch (Exception e) {
|
lastException = e;
|
log.error("交互执行异常, deviceId={}, retryAttempt={}", device.getId(), retryAttempt, e);
|
|
if (retryAttempt < retryPolicy.getMaxRetryCount() && retryPolicy.isRetryable(e)) {
|
retryAttempt++;
|
continue;
|
}
|
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(e.getMessage());
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
step.setRetryCount(retryAttempt);
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
|
// 通知步骤更新
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
|
String errorMsg = retryAttempt > 0
|
? String.format("执行失败(已重试%d次): %s", retryAttempt, e.getMessage())
|
: e.getMessage();
|
return StepResult.failure(device.getDeviceName(), errorMsg);
|
}
|
}
|
|
step.setStatus(TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(lastException != null ? lastException.getMessage() : "交互执行失败");
|
step.setEndTime(new Date());
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
step.setRetryCount(retryAttempt);
|
taskStepDetailMapper.updateById(step);
|
updateTaskProgress(task, step.getStepOrder(), false);
|
|
// 通知步骤更新
|
notificationService.notifyStepUpdate(task.getTaskId(), step);
|
|
deviceCoordinationService.syncDeviceStatus(device,
|
DeviceCoordinationService.DeviceStatus.FAILED, context);
|
|
return StepResult.failure(device.getDeviceName(),
|
String.format("执行失败(已重试%d次)", retryAttempt));
|
}
|
|
/**
|
* 获取重试策略
|
*/
|
private RetryPolicy getRetryPolicy(DeviceConfig device) {
|
// 可以从设备配置中读取重试策略
|
// 暂时使用默认策略
|
return RetryPolicy.defaultPolicy();
|
}
|
|
/**
|
* 判断业务失败是否可重试
|
*/
|
private boolean isRetryableFailure(DevicePlcVO.OperationResult result) {
|
if (result == null || result.getMessage() == null) {
|
return false;
|
}
|
String message = result.getMessage().toLowerCase();
|
// 网络错误、超时错误可重试
|
return message.contains("timeout") ||
|
message.contains("connection") ||
|
message.contains("网络") ||
|
message.contains("超时");
|
}
|
|
|
private void updateStepAfterOperation(TaskStepDetail step,
|
DevicePlcVO.OperationResult result,
|
boolean success) {
|
step.setEndTime(new Date());
|
if (step.getStartTime() != null) {
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
}
|
step.setStatus(success ? TaskStepDetail.Status.COMPLETED.name() : TaskStepDetail.Status.FAILED.name());
|
// 设置消息:成功时如果有消息也保存,失败时保存错误消息
|
String message = result != null ? result.getMessage() : null;
|
if (success) {
|
// 成功时,如果有消息则保存(用于提示信息),否则清空
|
step.setErrorMessage(StringUtils.hasText(message) ? message : null);
|
} else {
|
// 失败时保存错误消息
|
step.setErrorMessage(message);
|
}
|
step.setOutputData(toJson(result));
|
taskStepDetailMapper.updateById(step);
|
}
|
|
private void updateStepAfterInteraction(TaskStepDetail step,
|
InteractionResult result) {
|
step.setEndTime(new Date());
|
if (step.getStartTime() != null) {
|
step.setDurationMs(step.getEndTime().getTime() - step.getStartTime().getTime());
|
}
|
boolean success = result != null && result.isSuccess();
|
step.setStatus(success ? TaskStepDetail.Status.COMPLETED.name() : TaskStepDetail.Status.FAILED.name());
|
step.setErrorMessage(success ? null : (result != null ? result.getMessage() : "交互执行失败"));
|
step.setOutputData(result != null ? toJson(result.getData()) : "{}");
|
taskStepDetailMapper.updateById(step);
|
}
|
|
private void updateTaskProgress(MultiDeviceTask task, int currentStep, boolean success) {
|
if (!success) {
|
task.setStatus(MultiDeviceTask.Status.FAILED.name());
|
}
|
|
// 计算已完成的步骤数(用于进度显示)
|
int completedSteps = countCompletedSteps(task.getTaskId());
|
int progressStep = success
|
? completedSteps
|
: Math.max(completedSteps, currentStep); // 失败时至少显示当前步骤
|
|
LambdaUpdateWrapper<MultiDeviceTask> update = Wrappers.<MultiDeviceTask>lambdaUpdate()
|
.eq(MultiDeviceTask::getId, task.getId())
|
.set(MultiDeviceTask::getCurrentStep, progressStep);
|
if (!success) {
|
update.set(MultiDeviceTask::getStatus, MultiDeviceTask.Status.FAILED.name());
|
}
|
multiDeviceTaskMapper.update(null, update);
|
|
// 更新任务对象的进度,用于通知
|
task.setCurrentStep(progressStep);
|
|
// 通知任务状态更新(包含进度信息)
|
notificationService.notifyTaskStatus(task);
|
}
|
|
/**
|
* 统计已完成的步骤数
|
*/
|
private int countCompletedSteps(String taskId) {
|
if (taskId == null) {
|
return 0;
|
}
|
try {
|
return taskStepDetailMapper.selectCount(
|
Wrappers.<TaskStepDetail>lambdaQuery()
|
.eq(TaskStepDetail::getTaskId, taskId)
|
.eq(TaskStepDetail::getStatus, TaskStepDetail.Status.COMPLETED.name())
|
).intValue();
|
} catch (Exception e) {
|
log.warn("统计已完成步骤数失败: taskId={}", taskId, e);
|
return 0;
|
}
|
}
|
|
private String determineOperation(DeviceConfig device, Map<String, Object> params) {
|
if (params != null && params.containsKey("operation")) {
|
Object op = params.get("operation");
|
if (op != null) {
|
return String.valueOf(op);
|
}
|
}
|
return DEFAULT_OPERATIONS.getOrDefault(device.getDeviceType(), "feedGlass");
|
}
|
|
private Map<String, Object> buildOperationParams(DeviceConfig device, TaskExecutionContext context) {
|
Map<String, Object> params = new HashMap<>();
|
TaskParameters taskParams = context.getParameters();
|
|
switch (device.getDeviceType()) {
|
case DeviceConfig.DeviceType.LOAD_VEHICLE:
|
params.put("glassIds", new ArrayList<>(taskParams.getGlassIds()));
|
if (StringUtils.hasText(taskParams.getPositionCode())) {
|
params.put("positionCode", taskParams.getPositionCode());
|
}
|
if (taskParams.getPositionValue() != null) {
|
params.put("positionValue", taskParams.getPositionValue());
|
}
|
params.put("triggerRequest", true);
|
break;
|
case DeviceConfig.DeviceType.LARGE_GLASS:
|
List<String> source = context.getSafeLoadedGlassIds();
|
if (CollectionUtils.isEmpty(source)) {
|
source = taskParams.getGlassIds();
|
}
|
if (!CollectionUtils.isEmpty(source)) {
|
params.put("glassId", source.get(0));
|
params.put("glassIds", new ArrayList<>(source));
|
}
|
params.put("processType", taskParams.getProcessType() != null ? taskParams.getProcessType() : 1);
|
params.put("triggerRequest", true);
|
break;
|
case DeviceConfig.DeviceType.WORKSTATION_SCANNER:
|
// 卧转立扫码设备:从任务参数中获取玻璃ID列表,取第一个作为当前要测试的玻璃ID
|
// 注意:扫码设备通常通过定时器执行,但如果通过executeStep执行,也需要传递glassId
|
log.debug("buildOperationParams处理扫码设备: deviceId={}, taskParams.glassIds={}, isEmpty={}",
|
device.getId(), taskParams.getGlassIds(),
|
CollectionUtils.isEmpty(taskParams.getGlassIds()));
|
if (!CollectionUtils.isEmpty(taskParams.getGlassIds())) {
|
params.put("glassId", taskParams.getGlassIds().get(0));
|
params.put("glassIds", new ArrayList<>(taskParams.getGlassIds()));
|
log.debug("buildOperationParams为扫码设备添加glassId: deviceId={}, glassId={}, glassIdsSize={}",
|
device.getId(), taskParams.getGlassIds().get(0), taskParams.getGlassIds().size());
|
} else {
|
log.warn("buildOperationParams扫码设备glassIds为空: deviceId={}, taskParams.glassIds={}, taskParams={}",
|
device.getId(), taskParams.getGlassIds(), taskParams);
|
}
|
break;
|
default:
|
if (!CollectionUtils.isEmpty(taskParams.getExtra())) {
|
params.putAll(taskParams.getExtra());
|
}
|
}
|
|
mergeOverrides(device, taskParams, params);
|
return params;
|
}
|
|
private void mergeOverrides(DeviceConfig device, TaskParameters taskParameters, Map<String, Object> params) {
|
if (CollectionUtils.isEmpty(taskParameters.getDeviceOverrides())) {
|
return;
|
}
|
Map<String, Object> override = taskParameters.getDeviceOverrides().get(device.getDeviceType());
|
if (override == null && StringUtils.hasText(device.getDeviceCode())) {
|
override = taskParameters.getDeviceOverrides().get(device.getDeviceCode());
|
}
|
if (override != null) {
|
params.putAll(override);
|
}
|
}
|
|
private void updateContextAfterSuccess(DeviceConfig device,
|
TaskExecutionContext context,
|
Map<String, Object> params,
|
DevicePlcVO.OperationResult result) {
|
List<String> glassIds = extractGlassIds(params);
|
|
switch (device.getDeviceType()) {
|
case DeviceConfig.DeviceType.WORKSTATION_SCANNER:
|
handleScannerSuccess(context, result);
|
break;
|
case DeviceConfig.DeviceType.LOAD_VEHICLE:
|
context.setLoadedGlassIds(glassIds);
|
// 数据传递:大车设备 -> 下一个设备
|
if (!CollectionUtils.isEmpty(glassIds)) {
|
Map<String, Object> transferData = new HashMap<>();
|
transferData.put("glassIds", glassIds);
|
transferData.put("sourceDevice", device.getDeviceCode());
|
// 这里简化处理,实际应该找到下一个设备
|
// 在串行模式下,下一个设备会在循环中自动获取
|
}
|
break;
|
case DeviceConfig.DeviceType.LARGE_GLASS:
|
context.setProcessedGlassIds(glassIds);
|
// 数据传递:大理片 -> 下一个设备
|
if (!CollectionUtils.isEmpty(glassIds)) {
|
Map<String, Object> transferData = new HashMap<>();
|
transferData.put("glassIds", glassIds);
|
transferData.put("sourceDevice", device.getDeviceCode());
|
}
|
break;
|
default:
|
break;
|
}
|
}
|
|
private void handleScannerSuccess(TaskExecutionContext context,
|
DevicePlcVO.OperationResult result) {
|
List<String> scannerGlassIds = extractGlassIdsFromResult(result);
|
if (CollectionUtils.isEmpty(scannerGlassIds)) {
|
String workLine = resolveWorkLineFromResult(result, context.getParameters());
|
scannerGlassIds = glassInfoService.getRecentScannedGlassIds(
|
SCANNER_LOOKBACK_MINUTES, SCANNER_LOOKBACK_LIMIT, workLine);
|
}
|
if (!CollectionUtils.isEmpty(scannerGlassIds)) {
|
context.getParameters().setGlassIds(new ArrayList<>(scannerGlassIds));
|
context.setLoadedGlassIds(new ArrayList<>(scannerGlassIds));
|
log.debug("卧转立扫码获取到玻璃ID: {}", scannerGlassIds);
|
} else {
|
log.warn("卧转立扫码未获取到玻璃ID,后续设备可能无法执行");
|
}
|
}
|
|
private List<String> extractGlassIds(Map<String, Object> params) {
|
if (params == null) {
|
return Collections.emptyList();
|
}
|
Object glassIds = params.get("glassIds");
|
if (glassIds instanceof List) {
|
@SuppressWarnings("unchecked")
|
List<String> cast = (List<String>) glassIds;
|
return new ArrayList<>(cast);
|
}
|
Object glassId = params.get("glassId");
|
if (glassId != null) {
|
return Collections.singletonList(String.valueOf(glassId));
|
}
|
return Collections.emptyList();
|
}
|
|
@SuppressWarnings("unchecked")
|
private List<String> extractGlassIdsFromResult(DevicePlcVO.OperationResult result) {
|
if (result == null || result.getData() == null) {
|
return Collections.emptyList();
|
}
|
Object data = result.getData().get("glassIds");
|
if (data instanceof List) {
|
List<Object> raw = (List<Object>) data;
|
List<String> converted = new ArrayList<>();
|
for (Object item : raw) {
|
if (item != null) {
|
converted.add(String.valueOf(item));
|
}
|
}
|
return converted;
|
}
|
if (data instanceof String && StringUtils.hasText((String) data)) {
|
return Collections.singletonList((String) data);
|
}
|
return Collections.emptyList();
|
}
|
|
private String resolveWorkLineFromResult(DevicePlcVO.OperationResult result,
|
TaskParameters parameters) {
|
if (result != null && result.getData() != null) {
|
Object workLine = result.getData().get("workLine");
|
if (workLine != null && StringUtils.hasText(String.valueOf(workLine))) {
|
return String.valueOf(workLine);
|
}
|
}
|
if (parameters != null && !CollectionUtils.isEmpty(parameters.getExtra())) {
|
Object extraWorkLine = parameters.getExtra().get("workLine");
|
if (extraWorkLine != null) {
|
return String.valueOf(extraWorkLine);
|
}
|
}
|
return null;
|
}
|
|
private String toJson(Object value) {
|
try {
|
return objectMapper.writeValueAsString(value);
|
} catch (JsonProcessingException e) {
|
return "{}";
|
}
|
}
|
|
private static class StepResult {
|
private final boolean success;
|
private final String message;
|
private final String deviceName;
|
|
private StepResult(boolean success, String message, String deviceName) {
|
this.success = success;
|
this.message = message;
|
this.deviceName = deviceName;
|
}
|
|
public static StepResult success(String deviceName, String message) {
|
return new StepResult(true, message, deviceName);
|
}
|
|
public static StepResult failure(String deviceName, String message) {
|
return new StepResult(false, message, deviceName);
|
}
|
|
public boolean isSuccess() {
|
return success;
|
}
|
|
public String getMessage() {
|
return message;
|
}
|
|
public Map<String, Object> toSummary() {
|
Map<String, Object> summary = new HashMap<>();
|
summary.put("deviceName", deviceName);
|
summary.put("success", success);
|
summary.put("message", message);
|
return summary;
|
}
|
}
|
}
|